In [1]:
import pandas as pd 
import os
from dotenv import load_dotenv
from sqlalchemy import inspect, create_engine
from pipeline.utils.read_sql import read_sql_file
from pipeline.utils.load_data import load_to_wh
import warnings
warnings.filterwarnings('ignore')

In [2]:
def db_conn():

    try:
        src_db = os.getenv("SRC_DB")
        src_host = os.getenv("SRC_HOST")
        src_user= os.getenv("SRC_USER")
        src_password = os.getenv("SRC_PASSWORD")


        dwh_db = os.getenv("DWH_POSTGRES_DB")
        dwh_host = os.getenv("DWH_POSTGRES_HOST")
        dwh_user = os.getenv("DWH_POSTGRES_USER")
        dwh_password = os.getenv("DWH_POSTGRES_PASSWORD")
        dwh_port = os.getenv("DWH_POSTGRES_PORT")



        src_conn = create_engine(f"mysql+pymysql://{src_user}:{src_password}@{src_host}/{src_db}")

        # src_conn = mysql.connect(host=src_host,
        #                  user=src_user,
        #                  password= src_password,
        #                  db= src_db)

        dwh_conn = create_engine(f"postgresql://{dwh_user}:{dwh_password}@{dwh_host}:{dwh_port}/{dwh_db}")


        return src_conn, dwh_conn
    
    except Exception as e:
        print(f"Error when connecting to database: {e}")

In [3]:
# src_connection,_ = db_conn()

# try:
#     with src_connection.connect() as conn:
#         print('berhasil konek')

# except Exception as e:
#     print(f'gagal konek: {e}')

In [3]:
def extract(tablename: str, rename_col: dict = None):

    src_engine,_ = db_conn()

    # src_engine = demo_engine()

    query = f"SELECT * FROM {tablename}"

    read_query = pd.read_sql(query, con=src_engine)
    
    if rename_col:
        read_query = read_query.rename(columns=rename_col)
    
    print(f"Extract table {tablename} Success")

    return read_query

In [4]:
tb_source = ['delivery_man',
            'packinglist',
             'transferstatus',
             'deliveryproof',             
             'customers',
             'expedition',
             'license_plate',
             'office_address'
             ]

In [8]:
for tb_name in tb_source:
    
    RENAME_COL = {'no_invoice':'sj_numbers', 
                  'concat_inv':'no_sj', 
                  'pod':'pod_image', 
                  'timestamp':'pod_date',
                  'created_at':'src_created_at',
                  'update_at':'src_update_at'}
    df = extract(tb_name, rename_col=RENAME_COL)
    load_to_wh(df, tb_name, schema='public', track_deleted=True)

Extract table delivery_man Success
Load data to public.delivery_man success.

Marked deleted records in public.delivery_man
Extract table packinglist Success
Load data to public.packinglist success.

Marked deleted records in public.packinglist
Extract table transferstatus Success
Load data to public.transferstatus success.

Marked deleted records in public.transferstatus
Extract table deliveryproof Success
Load data to public.deliveryproof success.

Marked deleted records in public.deliveryproof
Extract table customers Success
Load data to public.customers success.

Marked deleted records in public.customers
Extract table expedition Success
Load data to public.expedition success.

Marked deleted records in public.expedition
Extract table license_plate Success
Load data to public.license_plate success.

Marked deleted records in public.license_plate
Extract table office_address Success
Load data to public.office_address success.

Marked deleted records in public.office_address


In [None]:
# def load_to_wh(df, tablename, key_col: list = None, schema: str = None):

#     try:

#         _, dwh_engine = db_conn()
#         # dwh_engine = wh_engine()

#         table_init = ""

#         if schema:
#             table_init += schema + "."

#         table_init += tablename

#         df_columns = list(df.columns)

#         if not key_col:

#             insp = inspect(dwh_engine)
#             get_constraint_col = insp.get_unique_constraints(tablename, schema=schema)

#             key_col = []
#             for col in get_constraint_col:
#                 key_col.extend(col['column_names'])

#         match_col = ",".join([f"{col}" for col in key_col])
#         list_col_to_insert = ", ".join([f"{col}" for col in df_columns])
#         list_col_to_update = [col for col in df_columns if col not in key_col]
#         col_to_update = ", ".join([f'"{col_name}" = EXCLUDED."{col_name}"' for col_name in list_col_to_update])
        

#         stmt = f"""
#                 INSERT INTO {table_init} ({list_col_to_insert})
#                 SELECT {list_col_to_insert} FROM temp_table
#                 ON CONFLICT ({match_col}) DO
#                 UPDATE SET
#                 {col_to_update},
#                 update_at = CURRENT_TIMESTAMP
#                 """
        

#         with dwh_engine.begin() as conn:

#             conn.exec_driver_sql("DROP TABLE IF EXISTS temp_table")
#             conn.exec_driver_sql(
#                 f"CREATE TEMPORARY TABLE temp_table AS SELECT * FROM {table_init} WHERE false"
#             )

#             df.to_sql("temp_table", conn, if_exists='append', index=False)

#             conn.exec_driver_sql(stmt)

#             print(f"Load data to table {table_init} success.")

#     except Exception as e:
#         print(f"Error when loading data to table {tablename}: {e}")


In [None]:
df_columns = list(df.columns)
key_col = ['packinglist_id']
table_init = 'stg_demo.packinglist'

match_col = ",".join([f"{col}" for col in key_col])
list_col_to_insert = ", ".join([f"{col}" for col in df_columns])
list_col_to_update = [col for col in df_columns if col not in key_col]
col_to_update = ", ".join([f'"{col_name}" = EXCLUDED."{col_name}"' for col_name in list_col_to_update])


stmt = f"""
        INSERT INTO {table_init} ({list_col_to_insert})
        SELECT * FROM temp_table
        ON CONFLICT ({match_col}) DO
        UPDATE SET
        {col_to_update}
        """