In [2]:
import pandas as pd
from configparser import ConfigParser
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from datetime import datetime

In [2]:
config=ConfigParser()
config.read('config.ini')

['config.ini']

In [3]:
db_host=config['DB']['hostname']
db_user=config['DB']['user']
db_pwd=config['DB']['password']
db_driver=config['DB']['driver']
db_database=config['DB']['database']

db_url=f'{db_driver}://{db_user}:{db_pwd}@{db_host}/{db_database}?driver=ODBC+Driver+17+for+SQL+Server'
db_engine=create_engine(db_url)

In [4]:
dwh_host=config['DWH']['hostname']
dwh_port=config['DWH']['port']
dwh_user=config['DWH']['user']
dwh_pwd=config['DWH']['password']
dwh_driver=config['DWH']['driver']
dwh_database=config['DWH']['database']

dwh_url=f'{dwh_driver}://{dwh_user}:{dwh_pwd}@{dwh_host}:{dwh_port}/{dwh_database}'
dwh_engine=create_engine(dwh_url)

In [5]:
runTime=datetime.now().strftime("%Y-%m-%d 23:59:59")
runTime

'2023-05-05 23:59:59'

In [16]:
# Get the last_update record in datawarehouse
query1=f"""SELECT last_update FROM public.orders"""
dwh_data=pd.read_sql_query(query1,con=dwh_engine)
dwh_data

Unnamed: 0,last_update
0,2023-05-05
1,2023-05-07
2,2023-05-05
3,2023-05-05


In [21]:
last_update_dwh=dwh_data['last_update'].max().strftime("%Y-%m-%d 23:59:59")
last_update_dwh

'2023-05-07 23:59:59'

In [5]:
dfx=pd.read_csv('2022-T3-4.txt',on_bad_lines='skip')

In [8]:
dfx

Unnamed: 0,Danh mục thuốc Tân dược trúng thầu T3-T4 năm 2022 theo hình thức đấu thầu rộng rãi tại các địa phương đề nghị đăng tải
0,STT
1,Tên hoạt chất
2,Đường
3,dạng bào
4,chế
...,...
199347,BV đa khoa Tỉnh
199348,Khánh Hòa
199349,709/QĐ-BVĐKT
199350,27/04/2022


In [44]:
# Get all records which have updatedAt > last_update
query2=f"""SELECT * FROM cdc.sales_orders_CT WHERE updatedAt <'{last_update_dwh}'"""
df_source=pd.read_sql_query(query2,con=db_engine)
df_source

Unnamed: 0,__$start_lsn,__$end_lsn,__$seqval,__$operation,__$update_mask,id,orderId,storeId,empId,cusId,subtotal,tax,discount,totalPayment,paymentMethod,shippingMethod,orderStatus,createdAt,updatedAt,__$command_id


In [28]:
df_source=df_source.drop(columns=['__$start_lsn','__$end_lsn','__$seqval','__$update_mask','__$command_id','createdAt'])
df_source=df_source.rename(columns={
    '__$operation':'operation',
    'orderId':'order_id',
    'storeId':'store_id',
    'empId':'emp_id',
    'cusId':'cus_id',
    'totalPayment':'total_payment',
    'paymentMethod':'payment_method',
    'shippingMethod':'shipping_method',
    'orderStatus':'order_status',
    'updatedAt':'last_update',
    }
)


In [29]:
df_source

Unnamed: 0,operation,id,order_id,store_id,emp_id,cus_id,subtotal,tax,discount,total_payment,payment_method,shipping_method,order_status,last_update
0,2,1,OD0001,WH0010,MR2558,,0,0,0,0,1,1,0,2023-05-05 01:59:18.323
1,2,2,OD0002,WH0012,MR2559,,0,0,0,0,1,1,0,2023-05-05 02:00:22.097
2,2,3,OD0003,WH0012,MR2559,,0,0,0,0,1,1,0,2023-05-05 02:00:25.907
3,3,1,OD0001,WH0010,MR2558,,0,0,0,0,1,1,0,2023-05-05 01:59:18.323
4,4,1,OD0001,WH0010,MR2558,,0,0,0,100,1,1,0,2023-05-05 02:04:25.580


In [30]:
df_new=df_source.loc[df_source['operation']==2]

In [None]:
df_new=df_new.drop(columns=['operation'])
df_new

In [15]:
df_new.to_sql('orders',con=dwh_engine,if_exists='append',index=False)

3

In [32]:
df_update=df_source.loc[df_source['operation']==4]
df_update=df_update.drop(columns=['operation'])

In [33]:
df_update

Unnamed: 0,operation,id,order_id,store_id,emp_id,cus_id,subtotal,tax,discount,total_payment,payment_method,shipping_method,order_status,last_update
4,4,1,OD0001,WH0010,MR2558,,0,0,0,100,1,1,0,2023-05-05 02:04:25.580


In [34]:

for row in df_update.iterrows():
    query3=f"""UPDATE orders SET store_id='{row[1]['store_id']}',emp_id='{row[1]['emp_id']}',
               cus_id='{row[1]['cus_id']}',subtotal={row[1]['subtotal']},tax={row[1]['tax']},
               discount={row[1]['discount']},total_payment={row[1]['total_payment']},payment_method={row[1]['payment_method']},
               shipping_method={row[1]['shipping_method']},order_status={row[1]['order_status']},last_update='{row[1]['last_update']}'
    WHERE order_id='{row[1]['order_id']}'"""
    dwh_engine.execute(query3)
