In [1]:
# Import modules
import psycopg2
import pandas as pd

In [3]:
import logging
import sys

logging.basicConfig(stream = sys.stdout, level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [34]:
# Connect to database

def create_connection():
    connection = psycopg2.connect(
        host="introduction-01-intro-ap-southeast-1-dev-introduction-db.cpfm8ml2cxp2.ap-southeast-1.rds.amazonaws.com",
        database="postgres",
        user="postgres",
        password="postgres123"
    )
    connection.set_session(autocommit=True)
    return connection

connection = create_connection()
logger.info(f"connection log: {connection}")

INFO:root:connection log: <connection object at 0x7993676d9940; dsn: 'user=postgres password=xxx dbname=postgres host=introduction-01-intro-ap-southeast-1-dev-introduction-db.cpfm8ml2cxp2.ap-southeast-1.rds.amazonaws.com', closed: 0>


In [5]:
# Create cursor
cursor = connection.cursor()

logger.info("Start create schema")

# Create my schema
cursor.execute("""
    CREATE SCHEMA s4_giakhanh;
""")

# Close cursor
cursor.close()

INFO:root:Start create schema


### **Prepare the necessary  informations**

In [22]:
# Customer table information
customer_table = {
    "tables" : [
        "customer_scd_type_1",
        "customer_scd_type_2",
        "customer_scd_type_4"
    ],
    "information": [
        {
        "column" : "cust_id",
        "datatype" : "varchar"
        },
        {
        "column" : "cust_nm",
        "datatype" : "varchar"
        },
        {
        "column" : "birth_date",
        "datatype" : "date"
        },
        {
        "column" : "add_id",
        "datatype" : "varchar"
        },
        {
        "column" : "opn_dt",
        "datatype" : "date"
        },
        {
        "column" : "end_dt",
        "datatype" : "date"
        }
    ]
}


### **CREATE TABLES**

In [28]:
# Create a cursor
cursor = connection.cursor()

# Take columns and their datatype from customer table
list_of_col_and_datatype = [f"""{i["column"]} {i["datatype"]}""" for i in customer_table["information"]]
column_datatype = ",".join(list_of_col_and_datatype)

logger.info("Create table customer_scd_type_1, customer_scd_type_2, customer_scd_type_4")

# Create table customer_scd_type_1, customer_scd_type_2, customer_scd_type_4 base on customer table
for table in customer_table["tables"]:
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS s4_giakhanh.{table}(
            {column_datatype}
        );
    """)

INFO:root:Create table customer_scd_type_1, customer_scd_type_2, customer_scd_type_4


### **Copy data from customer table**

In [29]:
logger.info("Copy data from customer table to customer_scd_type_1, customer_scd_type_2, customer_scd_type_4")

for table in customer_table["tables"]:
    cursor.execute(f"""
        INSERT INTO s4_giakhanh.{table}
        SELECT *
        FROM ai4e_test.customer;
    """)

cursor.close()

INFO:root:Copy data from customer table to customer_scd_type_1, customer_scd_type_2, customer_scd_type_4


In [31]:
pd.read_sql("SELECT * FROM s4_giakhanh.customer_scd_type_1", connection)

Unnamed: 0,cust_id,cust_nm,birth_date,add_id,opn_dt,end_dt
0,6bb48a36c8bac63889a65ae53db9ea892102688200352,Marta Boyer,1986-05-22,b5e62f16d1613f1f8f0dfb39329ce9fb2102687963312,2010-01-03,2020-01-03
1,df3bf1e654728b0d0a3c2a31e2d5fdb02102688200736,Dewayne Reichel IV,1970-04-20,bb22428d19a6015c3b89fdfaa58acee62102688118480,2021-02-05,2031-02-05
2,bc299d4c81a67119d118ebadd96852ca2102688201792,Gaetano Hilll Sr.,1989-10-11,454def2e771b51f262186b1d5ff201972102665768240,2018-09-22,2028-09-22
3,8600c19ae766bb2d8e85c258a5d81f632102688200448,Blanca Lindgren,1976-12-05,4e054b459c09c5b5d0688e6559a6f30e2102666585088,2011-06-01,2021-06-01
4,c3731a21a656bb79141ef332557b257f2102688200976,Alfredo Cartwright,1971-02-06,67357e9a4287bdbc72e06f71b28dad582102687971216,2019-05-09,2029-05-09
...,...,...,...,...,...,...
9995,08df470853a0bdfb80d7396ba528ca302102714486304,Sonny Oda Sr.,1981-12-20,8c6f15ebaa89cf255aabb2f1503583a32102712356048,2015-04-05,2025-04-05
9996,08df470853a0bdfb80d7396ba528ca302102714486352,Christop Heaney Leffler,1984-04-13,4e054b459c09c5b5d0688e6559a6f30e2102665544320,2019-08-13,2029-08-13
9997,08df470853a0bdfb80d7396ba528ca302102714486400,Oda Ebert DDS,1995-03-18,55a4fb321ea98b782ad23d21a164f2f92102666552512,2017-06-16,2027-06-16
9998,08df470853a0bdfb80d7396ba528ca302102714486448,Ashley Grant Effertz,1990-08-26,af558e06e8cc590576e69558c8bc889c2102712400720,2014-01-21,2024-01-21


In [35]:
pd.read_sql("SELECT * FROM ai4e_test.customer_cdc_18", connection)

Unnamed: 0,cust_id,cust_nm,birth_date,add_id,opn_dt,end_dt
0,6bb48a36c8bac63889a65ae53db9ea892102688200352,Marta Boyer update 18/10/2023,1986-05-22,b5e62f16d1613f1f8f0dfb39329ce9fb2102687963312,2010-01-03,2020-01-03
1,df3bf1e654728b0d0a3c2a31e2d5fdb02102688200736,Dewayne Reichel IV update 18/10/2023,1970-04-20,bb22428d19a6015c3b89fdfaa58acee62102688118480,2021-02-05,2031-02-05
2,bc299d4c81a67119d118ebadd96852ca2102688201792,Gaetano Hilll Sr. update 18/10/2023,1989-10-11,454def2e771b51f262186b1d5ff201972102665768240,2018-09-22,2028-09-22
3,8600c19ae766bb2d8e85c258a5d81f632102688200448,Blanca Lindgren update 18/10/2023,1976-12-05,4e054b459c09c5b5d0688e6559a6f30e2102666585088,2011-06-01,2021-06-01
4,c3731a21a656bb79141ef332557b257f2102688200976,Alfredo Cartwright update 18/10/2023,1971-02-06,67357e9a4287bdbc72e06f71b28dad582102687971216,2019-05-09,2029-05-09
...,...,...,...,...,...,...
4995,6a4928a9e41ebe44737025fe40bbf3662102690994784,Niko Pfeffer Jr. update 18/10/2023,1970-08-12,9374327ceabcabc493ab1f6ec72782ad2102665572160,2014-02-12,2024-02-12
4996,6a4928a9e41ebe44737025fe40bbf3662102690994832,Herminio Armstrong II update 18/10/2023,1995-12-26,1dcaf0b84d2b8058451e1f322e2929252102688108944,2017-04-26,2027-04-26
4997,6a4928a9e41ebe44737025fe40bbf3662102690994880,Marietta Makayla V update 18/10/2023,1996-09-20,454def2e771b51f262186b1d5ff201972102665796768,2011-04-17,2021-04-17
4998,6a4928a9e41ebe44737025fe40bbf3662102690994928,Alessandro Krajcik O'Hara update 18/10/2023,1972-04-09,1dcaf0b84d2b8058451e1f322e2929252102688104304,2014-01-11,2024-01-11


### **SCD Type 1**

In [32]:
# Create a cursor
cursor = connection.cursor()

logger.info("Update/insert customer_cdc_18 and customer_cdc_19 to customer_scd_type_1")

# From customer_cdc_18
cursor.execute("""
    MERGE s4_giakhanh.customer as target
    USING ai4e_test.customer_cdc_18 as source
    ON target.cust_id = source.cust_id
    WHEN MATCHED
        THEN UPDATE
               cust_nm = source.cust_nm,
               birth_date = source.birth_date,
               add_id = source.add_id,
               opn_dt = source.opn_dt,
               end_dt = source.end_dt
    WHEN NOT MATCHED
        THEN 
               INSERT (cust_id, cust_nm, birth_date, add_in, opn_dt, end_dt)
               VALUES (source.cust_id, source.cust_nm, source.birth_date, source.add_in, source.opn_dt, source.end_dt);
""")

INFO:root:Update/insert customer_cdc_18 and customer_cdc_19 to customer_scd_type_1


KeyboardInterrupt: 