## Imports

In [14]:
import json
import os
import numpy as np
import pandas as pd
from datetime import datetime


path_arr = ['stage','stage/log','stage/raw_converted','stage/extra_data','stage/transformed_db']
for stag_path in path_arr:
    if not os.path.exists(stag_path):
        os.makedirs(stag_path)

log_path = 'stage/log/log_{}.txt'.format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
def log_managment(msg):
    with open(log_path, 'a') as f:
        f.write(msg)

log_managment(('Pipeline intiated{}\n').format(log_path))



## Export

Here the code will be exported from the json files acting as a source into a stagin area and saved into csv files to allow make them easier to handle

Libraries used: 

os - used to scan through local folder for source files

pandas - used to crate backup of source files in csv format 

In [15]:
# data is list
# inside data is a dict 
try:
    dbdict = {}
    sourcedf = {}
    
# load files from source into dictionary 
    for filename in os.scandir('source'):
        if filename.is_file():
            dbdict[filename.name] = open(filename.path)

    # loop through files to convert into a datafram and backup into
    for source_key in dbdict.keys():
        data = json.load(dbdict[source_key])
        newfile_loc = 'stage/raw_converted/{}.csv'.format(source_key)
        sourcedf[source_key] = pd.DataFrame.from_dict(data)  
        sourcedf[source_key].to_csv(newfile_loc,index=False,header=True ,sep=',' )
        write_msg = ('At {} {} was added to dataframe with {} rows\n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S") 
            ,source_key,sourcedf[source_key].size )
        log_managment(write_msg)
        dbdict[source_key].close()
except:
    write_msg = ('At {} Export has failed \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)




## Transform

This transfprmation is done to ensure the data is processed into the database in the manner it is described in the task.

Coulmn check - Going over columns to check for extra or missing columns

Column adjusment - Adding or reducing unnececry columns

Formating check - Checking data formats of columns 

Filling in missing values - Adding values to the columns with missing to allow conversion to right data type.

Converting columns to correct type - Converting the columns to the correct type.

Backup - Backing up data to a csv in case of a system crash.


Libraries used: 

pandas - used for dataframe managment and to crate backup of source files in csv format 

numpy - used to create empty array to fill new column and to set data types

### Coulmn check

In [16]:
# Defining the desired columns names

outlet_coulmns = [ 'id_outlet', 'name', 'address', 'country', 'phone', 'reviews_nr']
user_coulmns = ['user', 'profile' 'reviews_nr','likes']
review_coulmns = ['user', 'id_outlet', 'review', 'rating']
menu_columns  = [ 'id_outlet','brand', 'price', 'volume', 'name']

for df_keys in sourcedf:
    print(sourcedf[df_keys].columns)


Index(['address', 'city', 'country', 'cuisines', 'features', 'id_outlet',
       'lat', 'lon', 'menu', 'name', 'opening_hours', 'phone', 'postal_code',
       'price_level', 'price_range', 'rating', 'region', 'reviews_nr',
       'special_diets', 'street', 'tags', 'url', 'website'],
      dtype='object')
Index(['body', 'date', 'url', 'user', 'rating', 'id_outlet', 'traveler_type'], dtype='object')
Index(['user', 'address', 'reviews', 'likes'], dtype='object')
Index(['id_outlet', 'name', 'brand', 'price', 'volume'], dtype='object')
Index(['id_outlet', 'country', 'name', 'address', 'reviews_nr'], dtype='object')


### Column adjusment 

In [17]:
try:
  def coulmn_drop(df_name,column_names):
    # Get the difrrence between requierd column names and existing ones 
    extra_coulmns = set(sourcedf[df_name].columns).difference(column_names)
    # set path to save extra data in stage area
    csv_filename = 'stage/extra_data/{}.csv'.format(df_name)
    # Save extra information in csv file
    sourcedf[df_name][extra_coulmns].to_csv(csv_filename,index=False,header=True ,sep=',' )
    write_msg = ('At {} {} extra columns were extracted to {} \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"),df_name,csv_filename )
    log_managment(write_msg)
    # Drop extra columns from dataframe
    sourcedf[df_name] = sourcedf[df_name].drop(extra_coulmns , axis =1 )
    write_msg = ('At {} {} has dropped extra columns \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"),df_name )
    log_managment(write_msg)

  # Tripadvisor column name change 
  sourcedf['tripadvisor_reviews.json'].rename(columns={'body':'review'} , inplace=True)
  sourcedf['tripadvisor_user.json'].rename(columns={'reviews':'reviews_nr'}, inplace=True)
  # Tripadvisor outlet reducing columns and saving it in a different file
  coulmn_drop('tripadvisor_outlet.json',outlet_coulmns)
  # Tripadvisor review reducing columns and saving it in a different file
  coulmn_drop('tripadvisor_reviews.json',review_coulmns)
  #Ubereats outlet add missing coulmns 
  extra_coulmns = set(outlet_coulmns).difference(sourcedf['ubereats_outlet.json'].columns)
  write_msg = ('At {} ubereats_outlet.json column phone added \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S") )
  log_managment(write_msg)
  sourcedf['ubereats_outlet.json'][list(extra_coulmns)]  = np.nan

except:
  write_msg = ('At {} Transform Column adjusment has failed \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
  log_managment(write_msg)




 


### Formating check 

In [18]:
for df_keys in sourcedf:
    print( df_keys ,"\n",sourcedf[df_keys].dtypes)


tripadvisor_outlet.json 
 address        object
country        object
id_outlet      object
name           object
phone          object
reviews_nr    float64
dtype: object
tripadvisor_reviews.json 
 review        object
user          object
rating       float64
id_outlet     object
dtype: object
tripadvisor_user.json 
 user           object
address        object
reviews_nr      int64
likes         float64
dtype: object
ubereats_menu.json 
 id_outlet     object
name          object
brand         object
price        float64
volume        object
dtype: object
ubereats_outlet.json 
 id_outlet      object
country        object
name           object
address        object
reviews_nr      int64
phone         float64
dtype: object


### Filling in missing values

In [19]:
# Checking missing entrires in df 

# tripadvisor_outlet reviews_number
print( "tripadvisor_outlet\t reviews_nr\n", "Missing values:\t", sourcedf['tripadvisor_outlet.json']['reviews_nr'].isnull().values.any())
# tripadvisor_user likes
print( "tripadvisor_user\t likes\n", "Missing values:\t", sourcedf['tripadvisor_user.json']['likes'].isnull().values.any())
# ubereats_outlet volume
print( "ubereats_menu\t volume\n", "Missing values:\t", sourcedf['ubereats_menu.json']['volume'].isnull().values.any())
try:
    # Replacing non numeric entries with null
    sourcedf['tripadvisor_outlet.json']['reviews_nr']= pd.to_numeric(sourcedf['tripadvisor_outlet.json']['reviews_nr'], errors='coerce') 
    sourcedf['tripadvisor_user.json']['likes']= pd.to_numeric(sourcedf['tripadvisor_user.json']['likes'], errors='coerce') 
    sourcedf['ubereats_menu.json']['volume']= pd.to_numeric(sourcedf['ubereats_menu.json']['volume'], errors='coerce') 

    # Filling empty values
    sourcedf['tripadvisor_user.json']['likes'] = sourcedf['tripadvisor_user.json']['likes'].fillna(value =0)
    sourcedf['tripadvisor_outlet.json']['reviews_nr']= sourcedf['tripadvisor_outlet.json']['reviews_nr'].fillna(value =0)
    sourcedf['ubereats_menu.json']['volume']= sourcedf['ubereats_menu.json']['volume'].fillna(value =0)
    
    write_msg = ('At {} tripadvisor_outlet.json column reviews_nr edited \n tripadvisor_user.json  column likes edited \n ubereats_menu.json column volume edited\n'
    ).format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)
except:
  write_msg = ('At {} Transform Filling in missing values has failed \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
  log_managment(write_msg)




tripadvisor_outlet	 reviews_nr
 Missing values:	 False
tripadvisor_user	 likes
 Missing values:	 True
ubereats_menu	 volume
 Missing values:	 False


### Converting columns to correct type

In [20]:
#Convert columns to correct format 
#Converting tripadvisor_outlet reviews to int64
try:
    sourcedf['tripadvisor_outlet.json']['reviews_nr'] = sourcedf['tripadvisor_outlet.json']['reviews_nr'].astype(np.int64)
    #Converting tripadvisor_user likes to int64
    sourcedf['tripadvisor_user.json']['likes'] = sourcedf['tripadvisor_user.json']['likes'].astype(np.int64)
    #Converting ubereats_outlet volume to float64
    sourcedf['ubereats_menu.json']['volume'] = sourcedf['ubereats_menu.json']['volume'].astype(np.int64)
    #Converting ubereats_outlet  phone to object
    sourcedf['ubereats_outlet.json']['phone'] = sourcedf['ubereats_outlet.json']['phone'].astype(object)
    write_msg = ("At {} tripadvisor_outlet.json column reviews_nr type change to int64 \n tripadvisor_user.json column likes type change to int64 \n ubereats_menu.json column volume type change to int64\n ubereats_outlet.json column phone type change to object\n").format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)

except:
    write_msg = ('At {} Transform Filling in missing values has failed \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)




### Backup

In [21]:
#Backing up the data in case of a crash in the procedure 
try:
    for key in sourcedf:
        csv_filename = 'stage/transformed_db/{}.csv'.format(key)
        sourcedf[key].to_csv(csv_filename,index=False,header=True ,sep=',')
        write_msg = ('At {} {} saved to {}  \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S") ,key ,csv_filename )
        log_managment(write_msg)

except:
    write_msg = ('At {} Transform Backup has failed \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)



## Load

Loading the tables into a newly created database on MySql 

Libraries used: 

mysql.connector - connect to sql library and  run queeries 

sqlalchemy - used to convert dataframe to SQL table 

### Creating database

In [22]:
import mysql.connector
from sqlalchemy import create_engine, types

try:
#connect to mysql
    db_connector = mysql.connector.connect(host ="localhost" , port = "6603",user = "root",passwd = "root")
    db_cursor = db_connector.cursor(buffered=True)
    # Create tripadvisor database
    db_cursor.execute("CREATE DATABASE IF NOT EXISTS dashmotecstudy")
    # Create engine to connect to sql database 
    engine = create_engine('mysql://root:root@localhost:3306/dashmotecstudy') 
    # Create tables 
    sourcedf['tripadvisor_outlet.json'].to_sql('tripad_outlet',con=engine,index=False,if_exists='replace')
    sourcedf['tripadvisor_user.json'].to_sql('tripad_users',con=engine,index=False,if_exists='replace')
    sourcedf['tripadvisor_reviews.json'].to_sql('tripad_reviews',con=engine,index=False,if_exists='replace')
    sourcedf['ubereats_menu.json'].to_sql('ubeareats_menu',con=engine,index=False,if_exists='replace')
    sourcedf['ubereats_outlet.json'].to_sql('ubeareats_outlet',con=engine,index=False,if_exists='replace')
    write_msg = ('At {} All dataframes have been loaded to database \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)

except:
    write_msg = ('At {} Load creating database has failed \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)






## Transform

Using SQL queeries to modify table entries and to create primay and foreign keys 

### Tripadvisor table setting relations 

In [23]:
# Set primary keys
try:
    db_cursor.execute("USE dashmotecstudy")
    db_cursor.execute("ALTER TABLE tripad_outlet MODIFY id_outlet varchar(255)")
    db_cursor.execute("ALTER TABLE tripad_outlet ADD PRIMARY KEY (id_outlet)")
    db_cursor.execute("ALTER TABLE tripad_users MODIFY user varchar(255)")
    db_cursor.execute("ALTER TABLE tripad_users ADD PRIMARY KEY (user)")
    db_cursor.execute("ALTER TABLE tripad_reviews MODIFY user varchar(255)")
    db_cursor.execute("ALTER TABLE tripad_reviews MODIFY id_outlet varchar(255)")
    db_cursor.execute("ALTER TABLE tripad_reviews ADD CONSTRAINT PK_reviwes PRIMARY KEY (user,id_outlet)")
    write_msg = ('At {} Tripadvisor Primary keys set  \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)

    # Set foregin keys
    db_cursor.execute("ALTER TABLE tripad_reviews ADD FOREIGN KEY (id_outlet) REFERENCES tripad_outlet(id_outlet)")
    db_cursor.execute("ALTER TABLE tripad_reviews ADD FOREIGN KEY (user) REFERENCES tripad_users(user)")
    write_msg = ('At {} Tripadvisor foregin keys set  \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)

except:
    write_msg = ('At {} Load Tripadvisor table setting relations  has failed \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)




### Ubereats database relation setting

In [24]:
# Set primary keys
try:
    db_cursor.execute("ALTER TABLE ubeareats_outlet MODIFY id_outlet varchar(255)")
    db_cursor.execute("ALTER TABLE ubeareats_outlet ADD PRIMARY KEY (id_outlet)")
    db_cursor.execute("ALTER TABLE ubeareats_menu MODIFY name varchar(255)")
    db_cursor.execute("ALTER TABLE ubeareats_menu MODIFY id_outlet varchar(255)")
    db_cursor.execute("ALTER TABLE ubeareats_menu MODIFY brand varchar(255)")
    db_cursor.execute("ALTER TABLE ubeareats_menu ADD CONSTRAINT PK_reviwes PRIMARY KEY (name,id_outlet,brand)")
    write_msg = ('At {} Ubereats Primary keys set  \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)

    #Set foreign keys
    db_cursor.execute("ALTER TABLE ubeareats_menu ADD FOREIGN KEY (id_outlet) REFERENCES ubeareats_outlet(id_outlet)")
    write_msg = ('At {} Ubereats foregin keys set  \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)
except:
    write_msg = ('At {} Load Ubereats database relation setting has failed \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)

### Small column adjusting

In [25]:
#Fixing address formating
try:
    db_cursor.execute("USE dashmotecstudy")
    affected_rows = db_cursor.execute("UPDATE dashmotecstudy.tripad_outlet SET address = REPLACE(address,'|',',') , address = REPLACE(address,',,',',')  WHERE id_outlet IS NOT NULL;")
    db_cursor.execute("UPDATE dashmotecstudy.ubeareats_outlet SET country = REPLACE(country,'NL','Netherlands') WHERE id_outlet IS NOT NULL;")
    db_connector.commit()
    write_msg = ('At {}  table adjusments set \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)
except:
    write_msg = ('At {} Load Small column adjusting has failed \n').format(datetime.now().strftime("%m_%d_%Y_%H-%M-%S"))
    log_managment(write_msg)
