## Importing Required Libraries

In [46]:
import mysql.connector
import pandas as pd

## Defining Pipeline Functions

I'm assuming that data resides onto SQL Server. Thus, Data Extraction will take place from SQL. Another case defines here if data is present in "csv" file, then, we'll directly extract and validate data. Therefore, SQL have predefined data format. In transform, we're trying to handle noise and error data if error exists in Mandatory field.

In [47]:
class PipelineSQL:
    
    def create_db_connection(host_name, user_name, user_password, db_name):
        connection = None
        try:
            connection = mysql.connector.connect(
                host=host_name,
                user=user_name,
                passwd=user_password,
                database=db_name
            )
            print("MySQL Database connection successful")
        except Error as err:
            print(f"Error: '{err}'")

        return connection


    def execute_query(connection, query):
        cursor = connection.cursor()
        try:
            cursor.execute(query)
            connection.commit()
            #print("Query successful")
        except Error as err:
            print(f"Error: '{err}'")



    def read_query(connection, query):
        cursor = connection.cursor()
        result = None
        try:
            cursor.execute(query)
            result = cursor.fetchall()
            return result
        except Error as err:
            print(f"Error: '{err}'")


    def transform(tup2):
        lst=list(tup2)
        error_collector=[]
        try:  
            lst[0]=int(tup2[0]) 
            lst[1]=str(tup2[1])
            lst[2]=pd.to_datetime(str(tup2[2]))

        except Exception as e:
            print("Mandatory Fields have Invalid Data. Sending it to Error Collector.")
            lst.append(tup2)


        try:  
            lst[3]=pd.to_datetime(str(tup2[3]))
        except:
            lst[3]=None

        try:  
            lst[4]=str(tup2[4])
        except:
            lst[4]=None

        try:  
            lst[5]=str(tup2[5])
        except:
            lst[5]=None

        try:  
            lst[6]=str(tup2[6])
        except:
            lst[6]=None

        try:  
            lst[7]=str(tup2[7])
        except:
            lst[7]=None


        try:  
            lst[8]=pd.to_datetime(str(tup2[8]))
        except:
            lst[8]=None

        try:  
            lst[9]=str(tup2[9])
        except:
            lst[9]=None



        tupl=tuple(lst)

        return tupl,error_collector




    def load_to_staging_table(country):
        import pandas as pd
        error_data=[]
        for tup in country:
            #print(country_tables.format(tup[0]))
            execute_query(mydb,drop_table.format(tup[0]))
            execute_query(mydb,create_table.format(tup[0]))

            stage_data_list=read_query(mydb,country_tables.format(tup[0]))
            for tup2 in stage_data_list:

                tupl,error_collector=transform(tup2)

                error_data+=error_collector

                execute_query(mydb,insert_table.format(tup[0],tupl))
        if error_data:
            return error_data
        else:
            print("Successfully Created Staging Tables")

        
        
        


## Required SQL Queries

In [68]:
## queries      
drop_table="DROP TABLE IF EXISTS table_{};"
create_table='''CREATE TABLE IF NOT EXISTS table_{}(
                ï»¿Customer_Id INT NOT NULL,
                Customer_Name VARCHAR(225) NOT NULL,
                Customer_Open_Date DATE NOT NULL,
                Last_Consulted_Date DATE,
                Vaccination_Type VARCHAR(5),
                Doctor_Consulted VARCHAR(255),
                State VARCHAR(5),
                Country VARCHAR(5),
                date_of_birth DATE,
                active_customer CHAR,
                PRIMARY KEY(ï»¿Customer_Id)
                );'''

insert_table= ''' INSERT INTO table_{} (ï»¿Customer_Id, Customer_Name, Customer_Open_Date, 
                    Last_Consulted_Date,Vaccination_Type, Doctor_Consulted,  State, Country, date_of_birth, active_customer) \
                          VALUES{}'''

group_query="SELECT Country FROM {intermediate_table} GROUP BY Country;"

query="SELECT * FROM hospitaldata"
country_query="SELECT Country FROM hospitaldata GROUP BY Country;"
country_tables="SELECT * FROM hospitaldata WHERE Country='{}';"



## Step 2: Perform ETL on Data

In [51]:
#connection to DB
mydb=PipelineSQL.create_db_connection("127.0.0.1","root","2401","hospitaldata")

#finding distinct country if exists
country=PipelineSQL.read_query(mydb,country_query)

#loading data to staging tables
PipelineSQL.load_to_staging_table(country) 

MySQL Database connection successful
Successfully Created Staging Tables


## Viewing Tables

In [52]:
col=PipelineSQL.read_query(mydb,"DESCRIBE table_AU")
column=[]
for i in col:
    column.append(i[0])
    

## View Tables

In [56]:
df_AU=pd.DataFrame(PipelineSQL.read_query(mydb,"SELECT * FROM table_AU"), columns=column)
df_IND=pd.DataFrame(PipelineSQL.read_query(mydb,"SELECT * FROM table_IND"), columns=column)
df_NYC=pd.DataFrame(PipelineSQL.read_query(mydb,"SELECT * FROM table_NYC"), columns=column)
df_PHIL=pd.DataFrame(PipelineSQL.read_query(mydb,"SELECT * FROM table_PHIL"), columns=column)
df_USA=pd.DataFrame(PipelineSQL.read_query(mydb,"SELECT * FROM table_USA"), columns=column)

In [57]:
df_AU

Unnamed: 0,ï»¿Customer_Id,Customer_Name,Customer_Open_Date,Last_Consulted_Date,Vaccination_Type,Doctor_Consulted,State,Country,date_of_birth,active_customer
0,1256,Jacob,2010-10-12,2012-10-13,MVD,Paul,VIC,AU,1987-06-07,A


In [58]:
df_IND

Unnamed: 0,ï»¿Customer_Id,Customer_Name,Customer_Open_Date,Last_Consulted_Date,Vaccination_Type,Doctor_Consulted,State,Country,date_of_birth,active_customer
0,8186,Shivam,2001-03-15,2008-12-12,AVD,Narayan,UP,IND,1991-01-24,A
1,8187,Arohi,2001-12-11,2008-12-12,AVD,Narayan,PB,IND,2000-12-01,A
2,8188,Anant,2001-02-15,2008-12-12,AVD,Narayan,HR,IND,2001-02-01,A
3,8190,Anjali,2001-03-05,2008-12-12,AVD,Narayan,MH,IND,2003-04-07,A
4,8191,Mehul,2001-07-05,2008-12-12,AVD,Narayan,DL,IND,2004-01-31,A
5,123458,John,2010-10-12,2012-10-13,MVD,Paul,TN,IND,1987-06-04,A


In [59]:
df_NYC

Unnamed: 0,ï»¿Customer_Id,Customer_Name,Customer_Open_Date,Last_Consulted_Date,Vaccination_Type,Doctor_Consulted,State,Country,date_of_birth,active_customer
0,12345,Matt,2010-10-12,2012-10-13,MVD,Paul,BOS,NYC,1987-06-06,A


In [60]:
df_PHIL

Unnamed: 0,ï»¿Customer_Id,Customer_Name,Customer_Open_Date,Last_Consulted_Date,Vaccination_Type,Doctor_Consulted,State,Country,date_of_birth,active_customer
0,123459,Mathew,2010-10-12,2012-10-13,MVD,Paul,WAS,PHIL,1987-06-05,A


In [61]:
df_USA

Unnamed: 0,ï»¿Customer_Id,Customer_Name,Customer_Open_Date,Last_Consulted_Date,Vaccination_Type,Doctor_Consulted,State,Country,date_of_birth,active_customer
0,8189,Shweta,2001-03-21,2008-12-12,AVD,Narayan,WAS,USA,2002-04-01,A
1,123457,Alex,2010-10-12,2012-10-13,MVD,Paul,SA,USA,1987-06-03,A
