### Import libraries and set up connections

In [None]:
#import needed libraries
from sqlalchemy import create_engine, MetaData
import pandas as pd
import pyodbc
import os
from dotenv import load_dotenv
import urllib
from datetime import datetime

In [None]:
#Connecting to MS SQL Server database
load_dotenv()

#get parameters from environmnet var
pwd = os.getenv('mssql_pwd')
uid = os.getenv('mssql_uid')
#sql db details
driver = os.getenv('mssql_driver')
server = os.getenv('mssql_server')
database = os.getenv("mssql_database")

params = urllib.parse.quote_plus(f"DRIVER={driver};"
                            f"SERVER={server};"
                            f"DATABASE={database};"
                            "Trusted_Connection=yes;"
                            "TrustServerCertificate=yes")

mssql_engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}")

In [None]:
#Establishing connection to postgres database
pg_driver = os.getenv('postgre_driver') 
pg_database = os.getenv('postgre_database') 
pg_server = os.getenv('postgre_server') 
pg_port = os.getenv('postgre_port') 
pg_uid = os.getenv('postgre_uid') 
pg_pwd = os.getenv('postgre_pwd') 

pg_engine = create_engine(f'postgresql://{pg_uid}:{pg_pwd}@{pg_server}:{pg_port}/{pg_database}')

In [None]:
#default date
format_data = "%d/%m/%Y %H:%M:%S.%f"
time_data = "01/01/1900 00:00:0.000"
default_date = datetime.strptime(time_data, format_data)

In [None]:
default_date

### Create a function to insert ETL Logs. To keep track of Last ETL Load data

In [None]:
def insertetllog(tblname, rowcount, status, error):
    """ Function to insert data into the ETL log table in PostgreSQL. Captures the current run time and 
        end time of the ETL process and row_count along with any error messages.

        tblName: Name of the table for which ETL process was run
        rowcount: Number of rows inserted in the current process
        status: Can be Y or N which denotes Success or Failure resepctively
        error: Contains details of the error if any, for the current run  
    """
    try:
        # set record attributes
        record = {"tablename":tblname,"extractrowcount": rowcount,"starttime":datetime.now(),
                  "endtime":datetime.now(),"lastextractdatetime":datetime.now(),"status":status,"errormessage":error[0:490]}
        #print(record)
        #create df
        inert_etl_log = pd.DataFrame(record, index=[0])
        tbl_name = "etlextractlog"
        inert_etl_log.to_sql(tbl_name, pg_engine, if_exists='append', index=False, schema="public")
    except Exception as e:
        print("Unable to insert record into etl logs" + print(str(e)))

In [None]:
# Test the log entry
insertetllog("test", 0, "N", "NA")

In [None]:
def getLastETLRunDate(tblName):
    """ Function to return the timestamp of the last ETL process run date. If it doesn't exist
        or the table has never been loaded before it will return a default date and enable 
        historical load of data.

        tblName: Name of the table for which we want to check its last run date.  
    """
    try:
        qry_logs = pd.read_sql_query(f"""Select max(lastextractdatetime) as lastETLRunDate
        from public.etlextractlog where tablename = '{tblName}'""", pg_engine)
        etlrundate = qry_logs['lastetlrundate'][0]
        if not etlrundate:
            etlrundate = default_date
        return etlrundate
    except Exception as e:
        return default_date

In [None]:
# Get the Last ETL run date
stg_tbl = "stg_customer"
lastrundate = getLastETLRunDate(stg_tbl)
lastrundate

### Upsert Function

In [None]:
def update_to_sql(df, table_name, primary_key):
    """ This function creates a PostgreSQL Dynamic SQL statement to upsert records and executes it.
        Creates a temp table to store the changed data temporarily and inserts into the main target table. 

        df: This is the data/changed_data from the source
        tbl: This is the target table to where we want to load the data
        key: primary key of the target table
    """
    update = []
    columns = []
    temp_table = f"{table_name}_temp"
    for col in df.columns:
        columns.append(col)
        if col == primary_key:
            continue
        update.append(f'"{col}"=EXCLUDED."{col}"')
    # Persist data to temp table
    df.to_sql(temp_table, pg_engine, if_exists='replace', index=False, schema='public')
    update_stmt_1 = ", ".join(f' "{c}" ' for c in columns )
    insert_stmt_1 = f' INSERT INTO {table_name} ( {update_stmt_1} ) '
    insert_stmt_2 = f' Select * from {temp_table} '
    insert_stmt_3 = f' ON CONFLICT ("{primary_key}") '
    insert_stmt_4 = f' DO UPDATE SET '
    update_stmt_2 = ", ".join(update)

    upsert_stmt = insert_stmt_1 + insert_stmt_2 + insert_stmt_3 + insert_stmt_4 + update_stmt_2 +  ";"
    print(upsert_stmt)
    with pg_engine.begin() as cnx:
        cnx.execute(upsert_stmt)
    
    #Drop the temporary table
    metadata = MetaData()
    metadata.reflect(bind=pg_engine)
    table = metadata.tables[f"{table_name}_temp"]
    metadata.drop_all(pg_engine, table, checkfirst=True)

### Function to Insert and update records

In [None]:
def upsert(df, tbl, key):
    """ This function is used to detect if the load is historic or incremental based on last etl run date. 
        If incremental it makes use of the above upsert function update_to_sql() created to upsert the data
        and insert details of the ETL in the ETL log table in PostgreSQL. 

        df: This is the data/changed_data from the source
        tbl: This is the target table to where we want to load the data
        key: primary key of the target table
    """
    if(lastrundate == default_date):
        try:
            print('Historical Load')
            df.to_sql(tbl, pg_engine, if_exists='replace', index=False, schema="public")
            insertetllog(tbl, len(df), "Y", "NA")
        except Exception as e:  
            insertetllog(tbl, len(df), "N", str(e))
    else:
        try:
            print('Incremental Load')
            update_to_sql(df, tbl, key)
            insertetllog(tbl, len(df), "Y", "NA")
        except Exception as e:  
            insertetllog(tbl, len(df), "N", str(e))

### Source Change Detection Query. Only read new and modified records.


In [None]:
source = pd.read_sql_query(f"""  select *
                            from [dbo].[customers]
                            where  (Created_at   >= convert(datetime2,'{lastrundate}')
                                or modified_at  >= convert(datetime2,'{lastrundate}')) """, mssql_engine)
source

In [None]:
upsert(source, stg_tbl, "customerId")

In [None]:
# Read Target data into a dataframe
logs = pd.read_sql('Select * from PUBLIC."etlextractlog"', pg_engine)
logs

In [None]:
# Read Target data 
target = pd.read_sql(f'Select * from public."{stg_tbl}"', pg_engine)
target

### Let's get the incremental data

In [None]:
#Get Last ETL run date
stg_tbl = "stg_customer"
lastrundate = getLastETLRunDate(stg_tbl)
print(lastrundate)

In [None]:
source = pd.read_sql_query(f"""  select *
                            from [dbo].[customers]
                            where  (Created_at   >= convert(datetime2,'{lastrundate}')
                                or modified_at  >= convert(datetime2,'{lastrundate}')) """, mssql_engine)
source

In [None]:
upsert(source, stg_tbl, "customerId")

In [None]:
# Checking the target table if all updates and inserts are reflected
target = pd.read_sql(f'Select * from public."{stg_tbl}"', pg_engine)
target