# Project 1a - ETL Data Processor
Ashley Kim 
3/28/22

Your data processor should be able to ingest a pre-defined data source and perform at least three of these operations: 
1.	Fetch / download / retrieve a remote data file by URL, or ingest a local file mounted. Suggestions for remote data sources are listed at the end of this document. 
2.	Convert the general format and data structure of the data source (from JSON to CSV, from CSV to JSON, from JSON into a SQL database table, etc. I want the option to convert any source to any target. So, if I get a CSV as an input, I want the user to choose an output)
    - EXTRA – Use an API (like twitter) to pull information realtime.
3.	Modify the number of columns from the source to the destination, reducing or adding columns. 
4.	The converted (new) file should be written to disk (local file) or written to a SQL database. 
5.	Generate a brief summary of the data file ingestion including: 
    -	Number of records 
    -   Number of columns 
    
    
**For this project, my data processor ingests a local file mounted, a csv of athlete information from the 2022 Beijing Winter Games and converts the general format and the data structure of the data source from a csv into a JSON. From there I populated the data into MongoDB and SQL. I modified the number of columns from the source to the destination by dropping columns I didn't feel were relevant within the dataset. An 'athlete_id' column was added to serve as the primary key when writing it to the SQL database.**

#### Importing Necessary Libraries

In [4]:
import os
import csv
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [5]:
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "root"
pwd = "Password0!"

src_dbname = "olympics"
dst_dbname = "olympics_dw"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [6]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    # Create a connection to the MySQL database
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    # Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(user_id, pwd, host_name, port, db_name, collection, query):
    # Create a connection to MongoDB, with or without authentication credentials
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    # Query MongoDB, and fill a python list with documents to create a DataFrame
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe
    
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    # Create a connection to the MySQL database
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    # Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Loading Data from a csv File
#### Converting the general format and data structure of the data source (from JSON to CSV, from CSV to JSON, from JSON into a SQL database table, etc.

In [4]:
# Was having trouble with original code from class that was writing entire JSON file and every element
# into one complete list, so instead used the following code (from https://pythonexamples.org/python-csv-to-json/)

def csv_to_json(csvFilePath, jsonFilePath):
    jsonArray = []
      
    with open(r'/Users/ashleykim/Documents/Y3 2021-2022/Spring 2022/DS 3002/athletes.csv', encoding='utf-8') as csvf: 

        csvReader = csv.DictReader(csvf) 

        for row in csvReader: 
            jsonArray.append(row)
  
    with open(r'/Users/ashleykim/Documents/Y3 2021-2022/Spring 2022/DS 3002/data/olympic_athletes.json', 'w', encoding='utf-8') as jsonf: 
        jsonString = json.dumps(jsonArray, indent=4)
        jsonf.write(jsonString)
          
csvFilePath = r'/Users/ashleykim/Documents/Y3 2021-2022/Spring 2022/DS 3002/athletes.csv'
jsonFilePath = r'/Users/ashleykim/Documents/Y3 2021-2022/Spring 2022/DS 3002/data/olympic_athletes.json'
csv_to_json(csvFilePath, jsonFilePath)

#### Loading Data From a csv File (using Pandas)

In [7]:
# Loading data from a local csv file 
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, r'/Users/ashleykim/Documents/Y3 2021-2022/Spring 2022/DS 3002/athletes.csv') 

df_athletes = pd.read_csv(data_file, header=0, index_col=0)

df_athletes.head()

Unnamed: 0_level_0,short_name,gender,birth_date,birth_place,birth_country,country,country_code,discipline,discipline_code,residence_place,residence_country,height_m/ft,url
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
AAGAARD Mikkel,AAGAARD M,Male,1995-10-18,FREDERIKSHAVN,Denmark,Denmark,DEN,Ice Hockey,IHO,ORNSKOLDSVIK,Sweden,1.84/6'0'',../../../en/results/ice-hockey/athlete-profile...
AALTO Antti,AALTO A,Male,1995-04-02,KITEE,Finland,Finland,FIN,Ski Jumping,SJP,KUOPIO,Finland,,../../../en/results/ski-jumping/athlete-profil...
AALTONEN Miro,AALTONEN M,Male,1993-06-07,JOENSUU,Finland,Finland,FIN,Ice Hockey,IHO,PODOLSK,Russian Federation,1.80/5'10'',../../../en/results/ice-hockey/athlete-profile...
ABDELKADER Justin,ABDELKADER J,Male,1987-02-25,"MUSKEGON, MI",United States of America,United States of America,USA,Ice Hockey,IHO,"GRAND RAPIDS, MI",United States of America,1.87/6'1'',../../../en/results/ice-hockey/athlete-profile...
ABDI Fayik,ABDI F,Male,1997-10-07,"SAN DIEGO, CA",United States of America,Saudi Arabia,KSA,Alpine Skiing,ALP,DHAHRAN,Saudi Arabia,,../../../en/results/alpine-skiing/athlete-prof...


### Data in MongoDB (NoSQL)

#### Populate MongoDB with Source Data
Be certain you run this cell **ONLY ONCE!**  Otherwise, you will fill your MongoDB database with duplicate records which will cause duplicate key errors when you attempt to create and populate the MySQL data warehouse dimension and fact tables.

In [8]:
port = ports["mongo"]
conn_str = f"mongodb://{host_name}:{port}/"
client = pymongo.MongoClient(conn_str)
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"athletes" : 'olympic_athletes.json'}

for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

client.close()        

#### Extracting Data from the Source MongoDB Collections Into DataFrames

In [9]:
query = {}
port = ports["mongo"]
collection = "athletes"

df_athletes = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_athletes.head(2)

Unnamed: 0,name,short_name,gender,birth_date,birth_place,birth_country,country,country_code,discipline,discipline_code,residence_place,residence_country,height_m/ft,url
0,AAGAARD Mikkel,AAGAARD M,Male,1995-10-18,FREDERIKSHAVN,Denmark,Denmark,DEN,Ice Hockey,IHO,ORNSKOLDSVIK,Sweden,1.84/6'0'',../../../en/results/ice-hockey/athlete-profile...
1,AALTO Antti,AALTO A,Male,1995-04-02,KITEE,Finland,Finland,FIN,Ski Jumping,SJP,KUOPIO,Finland,,../../../en/results/ski-jumping/athlete-profil...


#### Performing Any Necessary Transformations to the DataFrames

In [10]:
# Reducing columns using pandas drop command
drop_cols = ['short_name','residence_place','url'] # dropping columns that don't seem that important to overall data
df_athletes.drop(drop_cols, axis=1, inplace=True)

df_athletes.head(4)

Unnamed: 0,name,gender,birth_date,birth_place,birth_country,country,country_code,discipline,discipline_code,residence_country,height_m/ft
0,AAGAARD Mikkel,Male,1995-10-18,FREDERIKSHAVN,Denmark,Denmark,DEN,Ice Hockey,IHO,Sweden,1.84/6'0''
1,AALTO Antti,Male,1995-04-02,KITEE,Finland,Finland,FIN,Ski Jumping,SJP,Finland,
2,AALTONEN Miro,Male,1993-06-07,JOENSUU,Finland,Finland,FIN,Ice Hockey,IHO,Russian Federation,1.80/5'10''
3,ABDELKADER Justin,Male,1987-02-25,"MUSKEGON, MI",United States of America,United States of America,USA,Ice Hockey,IHO,United States of America,1.87/6'1''


In [12]:
# Adding primary key column 'athlete_id'
df_athletes['athlete_id'] = range(1, 1+len(df_athletes)) # adding extra column
df_athletes = df_athletes[ ['athlete_id'] + [ col for col in df_athletes.columns if col != 'athlete_id' ] ]

df_athletes.head(4)

Unnamed: 0,athlete_id,name,gender,birth_date,birth_place,birth_country,country,country_code,discipline,discipline_code,residence_country,height_m/ft
0,1,AAGAARD Mikkel,Male,1995-10-18,FREDERIKSHAVN,Denmark,Denmark,DEN,Ice Hockey,IHO,Sweden,1.84/6'0''
1,2,AALTO Antti,Male,1995-04-02,KITEE,Finland,Finland,FIN,Ski Jumping,SJP,Finland,
2,3,AALTONEN Miro,Male,1993-06-07,JOENSUU,Finland,Finland,FIN,Ice Hockey,IHO,Russian Federation,1.80/5'10''
3,4,ABDELKADER Justin,Male,1987-02-25,"MUSKEGON, MI",United States of America,United States of America,USA,Ice Hockey,IHO,United States of America,1.87/6'1''


### Data in SQL

#### Writing Data to SQL Database

In [14]:
exec_sql = f"CREATE DATABASE `{dst_dbname}`;"

conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
sqlEngine.execute(exec_sql) # create db
sqlEngine.execute("USE olympics_dw;") # select new db

<sqlalchemy.engine.result.ResultProxy at 0x7feef1139130>

In [15]:
dataframe = df_athletes
table_name = 'dim_athletes'
primary_key ='athlete_id'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Generating Brief Summary of Data File Ingestion 

In [16]:
# Validating that new dimension tables were created
sql_athletes = "SELECT * FROM olympics_dw.dim_athletes;"
df_dim_athletes = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_athletes)
df_dim_athletes.head(2)

Unnamed: 0,athlete_id,name,gender,birth_date,birth_place,birth_country,country,country_code,discipline,discipline_code,residence_country,height_m/ft
0,1,AAGAARD Mikkel,Male,1995-10-18,FREDERIKSHAVN,Denmark,Denmark,DEN,Ice Hockey,IHO,Sweden,1.84/6'0''
1,2,AALTO Antti,Male,1995-04-02,KITEE,Finland,Finland,FIN,Ski Jumping,SJP,Finland,
