### Import libraries and set up connections

In [1]:
#import needed libraries
from sqlalchemy import create_engine, MetaData
import pandas as pd
import pyodbc
import os
from datetime import datetime

In [2]:
#get password from environmnet var
pwd = os.environ['PGPASS']
uid = os.environ['PGUID']
driver = "{SQL Server Native Client 11.0}"
server = "localhost"
database = "AdventureWorksDW2019;"
#defaut date
format_data = "%d/%m/%Y %H:%M:%S.%f"
time_data = "01/01/1900 00:00:0.000"
default_date = datetime.strptime(time_data,format_data)

In [3]:
# Source connection: sql server
src_conn = pyodbc.connect('DRIVER=' + driver + ';SERVER=' + server + ';DATABASE=' + database + ';UID=' + uid + ';PWD=' + pwd)

In [4]:
# Destination: Postgres
engine = create_engine(f'postgresql://{uid}:{pwd}@{server}:5432/AdventureWorks')

### Create a function to insert ETL Logs. To keep track of Last ETL Load data

In [5]:
def insertetllog(logid, tblname, rowcount, status, error):
    try:
        # set record attributes
        record = {"processlogid":logid,"tablename":tblname,"extractrowcount": rowcount,"starttime":datetime.now(),
                  "endtime":datetime.now(),"lastextractdatetime":datetime.now(),"status":status,"errormessage":error[0:490]}
        #print(record)
        #create df
        inert_etl_log = pd.DataFrame(record, index=[0])
        tbl_name = "etlextractlog"
        inert_etl_log.to_sql(tbl_name, engine, if_exists='append', index=False, schema="etl")
    except Exception as e:
        print("Unable to insert record into etl logs" + print(str(e)))

In [6]:
# Test the log entry
insertetllog(0, "test", 0, "N", "NA")

In [7]:
def getLastETLRunDate(tblName):
    try:
        #
        qry_logs = pd.read_sql(f"""Select  max(lastextractdatetime) as lastETLRunDate
        from etl.etlextractlog where tablename= '{tblName}'""", engine)
        etlrundate = qry_logs['lastetlrundate'][0]
        if not etlrundate:
            etlrundate = default_date
        return etlrundate
    except Exception  as e:
        return default_date

In [10]:
# Get the Last ETL run date
stg_tbl = "stg_customer"
lastrundate = getLastETLRunDate(stg_tbl)
lastrundate

datetime.datetime(1900, 1, 1, 0, 0)

### Source Change Detection Query. Only read new and modified records.

In [11]:
source = pd.read_sql_query(f"""  select *
                            from [dbo].[customers]
                            where  (Created_at   >= convert(datetime2,'{lastrundate}')
                                or modified_at  >= convert(datetime2,'{lastrundate}')) """, src_conn)
source

Unnamed: 0,customerId,customername,customertype,entrydate,created_at,modified_at
0,1,Michael Scott,Corporate,2022-03-30 13:30:47.730,2022-03-30 13:30:47.730,2022-03-30 13:30:47.730
1,2,Dwight Schrute,Individual,2022-03-30 13:31:29.650,2022-03-30 13:31:29.650,2022-03-30 13:31:29.650
2,3,Jim Halpert,Individual,2022-03-30 13:31:46.090,2022-03-30 13:31:46.090,2022-03-30 17:08:39.050


### Function to Insert and update records

In [12]:
def upsert(df, tbl, key):
    if(lastrundate== default_date):
        try:
            print('Historical Load')
            df.to_sql(tbl, engine, if_exists='replace', index=False, schema="public")
            insertetllog(1, tbl, len(df), "Y", "NA")
        except Exception as e:  
            insertetllog(1, tbl, len(df), "N", str(e))
    else:
        try:
            print('Incremental Load')
            update_to_sql(df, tbl, key)
            insertetllog(1, tbl, len(source), "Y", "NA")
        except Exception as e:  
            insertetllog(1, tbl, len(source), "N", str(e))

In [13]:
upsert(source, stg_tbl, "customerId")

Historical Load


### Upsert Function

In [14]:
def update_to_sql(df, table_name, key_name):
    update = []
    columns = []
    table = table_name
    primary_key = key_name
    temp_table = f"{table_name}_temporary_table"
    for col in df.columns:
        columns.append(col)
        if col == primary_key:
            continue
        update.append(f'"{col}"=EXCLUDED."{col}"')
    # Persist data to temp table
    df.to_sql(temp_table, engine, if_exists='replace', index=False, schema='public')
    update_stmt_3 = ", ".join(f' "{c}" ' for c in columns )
    insert_stmt_1 = f' INSERT INTO {table_name} ( {update_stmt_3} ) '
    insert_stmt_2 = f' Select * from {temp_table} '
    insert_stmt_3 = f' ON CONFLICT ("{key_name}") '
    insert_stmt_4 = f' DO UPDATE SET '
    update_stmt_1 = ", ".join(update)

    upsert_stmt = insert_stmt_1 + insert_stmt_2 + insert_stmt_3 + insert_stmt_4 + update_stmt_1 +  ";"
    #print(upsert_stmt)
    with engine.begin() as cnx:
        cnx.execute(upsert_stmt)

In [15]:
# Read Target data into a dataframe
logs = pd.read_sql('Select * from etl."etlextractlog"', engine)
logs

Unnamed: 0,extractlogid,processlogid,tablename,extractrowcount,starttime,endtime,lastextractdatetime,success,status,errormessage
0,11,1,stg_customer,3,2022-04-04 20:03:21.759183,2022-04-04 20:03:21.759183,2022-04-04 20:03:21.759183,0,Y,


In [16]:
# Read Target data into a dataframe
target = pd.read_sql(f'Select * from public."{stg_tbl}"', engine)
target

Unnamed: 0,customerId,customername,customertype,entrydate,created_at,modified_at
0,1,Michael Scott,Corporate,2022-03-30 13:30:47.730,2022-03-30 13:30:47.730,2022-03-30 13:30:47.730
1,2,Dwight Schrute,Individual,2022-03-30 13:31:29.650,2022-03-30 13:31:29.650,2022-03-30 13:31:29.650
2,3,Jim Halpert,Individual,2022-03-30 13:31:46.090,2022-03-30 13:31:46.090,2022-03-30 17:08:39.050


### Let's get the incremental data

In [19]:
#Get Last ETL run date
stg_tbl = "stg_customer"
lastrundate = getLastETLRunDate(stg_tbl)
print(lastrundate)

source = pd.read_sql_query(f"""  select *
                            from [dbo].[customers]
                            where  (Created_at   >= convert(datetime2,'{lastrundate}')
                                or modified_at  >= convert(datetime2,'{lastrundate}')) """, src_conn)
source

2022-04-04 20:03:21.759183


Unnamed: 0,customerId,customername,customertype,entrydate,created_at,modified_at
0,3,Jim Halpert,Corporate,2022-03-30 13:31:46.090,2022-03-30 13:31:46.090,2022-04-04 20:06:31.630
1,4,Pam Halpert,Individual,2022-04-04 20:06:31.627,2022-04-04 20:06:31.627,2022-04-04 20:06:31.627
2,5,Stanely Hudson,Individual,2022-04-04 20:06:31.627,2022-04-04 20:06:31.627,2022-04-04 20:06:31.627


In [23]:
upsert(source, stg_tbl, "customerId")

Incremental Load


In [24]:
# Read Target data into a dataframe
target = pd.read_sql(f'Select * from public."{stg_tbl}"', engine)
target

Unnamed: 0,customerId,customername,customertype,entrydate,created_at,modified_at
0,1,Michael Scott,Corporate,2022-03-30 13:30:47.730,2022-03-30 13:30:47.730,2022-03-30 13:30:47.730
1,2,Dwight Schrute,Individual,2022-03-30 13:31:29.650,2022-03-30 13:31:29.650,2022-03-30 13:31:29.650
2,3,Jim Halpert,Corporate,2022-03-30 13:31:46.090,2022-03-30 13:31:46.090,2022-04-04 20:06:31.630
3,4,Pam Halpert,Individual,2022-04-04 20:06:31.627,2022-04-04 20:06:31.627,2022-04-04 20:06:31.627
4,5,Stanely Hudson,Individual,2022-04-04 20:06:31.627,2022-04-04 20:06:31.627,2022-04-04 20:06:31.627
