In [22]:
#import relevant packages
import pandas as pd
import numpy as np
import requests
import pymssql
from functions import sql_connector,github_connector, mail_sender, error_messages, azure_blob_uploader
import os
import datetime as dt
from dotenv import load_dotenv
load_dotenv()

dammy = os.environ['dammy']


True

# EXTRACT

In [20]:
# etl project to load data from github to data warehouse
# define parameters
username = 'DaliDalmas'
repo_name = 'files'
subfolder_path = 'files'

send_email = mail_sender.send_email

In [3]:
get_csv_files = github_connector.get_csv_files

get_json_files = github_connector.get_json_files


csv_files = [text.replace(' ','%20') for text in get_csv_files(username,repo_name,subfolder_path)]

json_files = [text.replace(' ','%20') for text in get_json_files(username,repo_name,subfolder_path)]

csv_urls = [f'https://raw.githubusercontent.com/DaliDalmas/files/main/files/{filename}' for filename in csv_files]

json_urls = [f'https://raw.githubusercontent.com/DaliDalmas/files/main/files/{filename}' for filename in json_files]

In [4]:
# extracting csv files
message_list = []

cols = pd.read_json('https://raw.githubusercontent.com/DaliDalmas/files/main/files/file5.json').columns

df_csv = pd.DataFrame(columns=cols)
conv_dict = {'id':str,'file_id':str}

for url in csv_urls:
    df = pd.read_csv(url)
    if 'employee' in df.columns:
        pass
    else:
        try:
            df['file_id'] = csv_urls.index(url)
            df = df.astype(conv_dict)
            df['id'] = df['file_id'] + '-' + df['id']
            df.drop('file_id',axis=1,inplace=True)
            df_csv = pd.concat([df_csv,df])
            
        except Exception as e:
            csv_status = 'Incomplete'
            message_list.append(e)
        else:
            csv_status = 'completed'
            # send a mail to dami - unable to extract csv files
            

In [5]:
# extracting json files
cols = pd.read_json('https://raw.githubusercontent.com/DaliDalmas/files/main/files/file5.json').columns

df_json = pd.DataFrame(columns=cols)
conv_dict = {'id':str,'file_id':str}

for url in json_urls:
    df = pd.read_json(url)
    if 'employee' in df.columns:
        pass
    else:
        try:
            df['file_id'] = json_urls.index(url)
            df = df.astype(conv_dict)
            df['id'] = 'j' + df['file_id'] + '-' + df['id']
            df.drop('file_id',axis=1,inplace=True)
            df_json = pd.concat([df_json,df])

        except Exception as e:
            json_status = 'Incomplete'
            message_list.append(e)
        else:
            json_status = 'completed'


#  TRANSFORM

In [7]:
if csv_status == 'completed' and json_status == 'completed':
    try:
        df = pd.concat([df_json,df_csv])
        df['gender'] = df.gender.apply(lambda x:str(x).lower())
        df.gender.replace({'m':'male','f':'female','nan':np.nan,'none':np.nan},inplace=True)
        df['phone'] = df['phone'].replace({None: np.nan})
        df['dateloaded'] = dt.datetime.utcnow()
    except Exception as e:
        transformation_status = 'Incomplete'
        message_list.append(e)
    else:
        transformation_status = 'completed'

else:
    transformation_status = 'Not started'
    
    send_email(send_to=dammy,subject='DATA EXTRACTION FAILED',body=error_messages.extraction_error(message_list))
    


# LOAD

In [8]:
create_tables_file_path = os.getcwd() + '/queries/create_tables.sql'
create_sp_file_path = os.getcwd() +'/queries/sp_truncate_load_customers.sql'


# read relevant queries from files
with open(create_tables_file_path,mode='r') as f:
    create_tables_query = f.read()

with open(create_sp_file_path,mode='r') as f:
    create_sp_query = f.read()

In [29]:
# dql statements to create relevant sql objects
# run once then comment out

write_sql = sql_connector.write_sql

write_sql(query=create_tables_query)

write_sql(query=create_sp_query)

In [None]:
if transformation_status == 'completed':

    try:
        # write csv file to blob
        save_dataframe_to_blob = azure_blob_uploader.save_dataframe_to_blob

        file_url = save_dataframe_to_blob(df, 'all_customers.csv')

        # copy csv file from blob to stage table

        copy_csv_to_table_query = f"""
        COPY INTO stage.customers
        FROM '{file_url}'
        WITH
        (
            FILE_TYPE = 'CSV',
            MAXERRORS = 0,
            FIRSTROW = 2,
            PARSER_VERSION = '2.0'
        );
        """
        write_sql(query=copy_csv_to_table_query)

        # execute stored procedure to move data from stage to dbo schema
        write_sql('exec sp_truncate_load_customers')
    
    except Exception as e:
        message_list.append(e)
        send_email(send_to=dammy,subject='DATA LOADING FAILED',body=error_messages.loading_error(message_list))


elif transformation_status == 'Not started':
    pass

else:
    
    send_email(send_to=dammy,subject='DATA TRANSFORMATION FAILED',body=error_messages.transformation_error(message_list))



