<h3>ETL delta load</h3>
<p>Simple example script for delta load with using petl, pandas, sqlalchemy</p>

In [1]:
import pandas as pd
import petl as etl

import psycopg2 as pg
from sqlalchemy import create_engine, text

from datetime import datetime
import pytz
from petl import datetimeparser
isodatetime = datetimeparser('%Y-%m-%d %H:%M:%S')

#Importing libs

In [2]:
conn = pg.connect('host=172.18.0.3 dbname=olist_customers_dataset user=airflow password=airflow')
#Connection for database

In [3]:
sourcer = etl.convert(etl.fromcsv('src.csv'), {"modi_date": lambda d: isodatetime(d),"id": int})
etl.todb(table = sourcer, dbo = conn, tablename = 'source', schema='delta', drop=True)
#CSV Load

In [4]:
source = etl.fromdb(conn,'select * from delta.source')
source = etl.convert(source, "id", int)
source.display(20)
#Source table

id,name,modi_date
1,dd,2021-08-09 22:03:00
2,ff,2021-08-09 22:03:00
3,dd,2021-08-09 22:03:00
4,l2,2021-08-10 10:50:59
5,rr,2021-08-09 22:03:00
6,dd,2021-08-11 11:33:00
7,fd,2021-08-10 18:02:00
8,rr,2021-08-10 18:05:00
9,tt,2021-08-11 10:47:10
10,gg,2021-08-11 11:33:30


In [5]:
delta_info = etl.fromdb(conn, 'select count(1)+1 count_rec, max(end_ts) last_e_ts from delta.delta')
delta_info
#Delta table

count_rec,last_e_ts
12,2021-08-11 15:26:13


In [6]:
now = datetime.now(tz=pytz.timezone('Europe/Moscow'))
now = datetime.strftime(now, "%Y-%m-%d %H:%M:%S")
now = isodatetime(now)
now
#Now timestamp

datetime.datetime(2021, 8, 11, 15, 26, 59)

In [7]:
lv_last_e_ts = datetime(2000,1,1,0,0,0) if delta_info['last_e_ts'][0] == None else delta_info['last_e_ts'][0] 
lv_delta_id = delta_info['count_rec'][0]
lv_last_e_ts
#Setting delta_id and last_e_ts

datetime.datetime(2021, 8, 11, 15, 26, 13)

In [8]:
rec = etl.addfield(etl.select(source, lambda rec: rec.modi_date >= lv_last_e_ts and rec.modi_date < now), 'ts', now)
values = etl.todataframe(rec)
values
#Select data by timestamp filter

Unnamed: 0,id,name,modi_date,ts
0,16,yy,2021-08-11 15:26:30,2021-08-11 15:26:59
1,17,bc,2021-08-11 15:26:30,2021-08-11 15:26:59


In [9]:
delta_reciever = etl.fromdb(conn, 'select * from delta.reciever')
delta_reciever.display(20)
#Recieve table from db for count updates and inserts

id,name,modi_date,ts
1,dd,2021-08-09 22:03:00,2021-08-10 21:56:50
2,ff,2021-08-09 22:03:00,2021-08-10 21:56:50
3,dd,2021-08-09 22:03:00,2021-08-10 21:56:50
4,l2,2021-08-10 10:50:59,2021-08-10 21:56:50
5,rr,2021-08-09 22:03:00,2021-08-10 21:56:50
7,fd,2021-08-10 18:02:00,2021-08-10 21:56:50
8,rr,2021-08-10 18:05:00,2021-08-10 21:56:50
11,gt,2021-08-10 22:43:30,2021-08-10 22:46:19
12,vv,2021-08-11 10:00:00,2021-08-11 10:03:28
9,tt,2021-08-11 10:47:10,2021-08-11 10:48:37


In [10]:
cnt_upd = len(values[values['id'].isin(list(delta_reciever['id']))])
cnt_ins = len(values)-cnt_upd
print(cnt_upd, cnt_ins)
#Count updates and inserts

1 1


In [11]:
engine = create_engine('postgresql://airflow:airflow@172.18.0.3/olist_customers_dataset')
for i in list(values.to_records(index=False)):
    query = text(f"""
                UPDATE delta.reciever
                SET name='{str(i[1])}', modi_date='{str(i[2])}', ts='{str(i[3])}' WHERE id={int(i[0])};
                INSERT INTO delta.reciever (id, name, modi_date, ts)
                SELECT {','.join([str(i)]).replace('(','').replace(')','')}
                WHERE NOT EXISTS (SELECT 1 FROM delta.reciever WHERE id={int(i[0])});
                 """)
    engine.execute(query)
#Upsert to recieve table

In [12]:
delta_table_new = etl.setheader([['id','start_ts','end_ts','delta_obj','ts','cn_r'],\
                                 [lv_delta_id,lv_last_e_ts,now,'delta_1',now,cnt_upd, cnt_ins]],\
                                ['id','start_ts','end_ts','delta_obj','ts','cn_u','cn_i'])
delta_table_new
#Creating delta info

id,start_ts,end_ts,delta_obj,ts,cn_u,cn_i
12,2021-08-11 15:26:13,2021-08-11 15:26:59,delta_1,2021-08-11 15:26:59,1,1


In [13]:
etl.appenddb(table = delta_table_new, dbo = conn, tablename = "delta", schema = 'delta')
#Inserting new delta info