### Library

In [None]:
#!pip install pandas
#!pip install sqlalchemy
#!pip install psycopg2-binary

In [None]:
import logging
import pandas as pd

from urllib.parse import quote_plus
from sqlalchemy import create_engine

from config import oltp_conn_string
from config import warehouse_conn_string
from config import etl_config

import warnings
warnings.filterwarnings('ignore')

### Logging setup

In [None]:
logging.basicConfig(level=logging.INFO)

### Setup connection from Source to Destination

In [None]:
def db_connection(conn_params):
    conn_str = f"postgresql://{conn_params['user']}:{quote_plus(conn_params['password'])}@{conn_params['host']}:{conn_params['port']}/{conn_params['database']}"
    engine = create_engine(conn_str)
    return engine.connect()

### Validate the ETL configuration (config.py -> can be customize)

In [None]:
def validate_config(etl_config):
    required_keys = ['source_table', 'query', 'destination_table', 'column_mapping']
    for table_name, table_config in etl_config.items():
        for key in required_keys:
            if key not in table_config:
                raise ValueError(f"Missing {key} in config for table {table_name}")
    logging.info("Config validation passed")

### Extract data from the source

In [None]:
def extract(table_config):
    try:
        logging.info(f"Extracting data from {table_config['source_table']}...")
        with db_connection(oltp_conn_string) as conn:
            df = pd.read_sql(table_config["query"], conn)
        return df
    except Exception as e:
        logging.error(f"Error extracting data from {table_config['source_table']}: {e}")
        raise

### Transform the extracted data

In [None]:
def transform(df, table_config):
    try:
        logging.info(f"Transforming data for {table_config['destination_table']}...")
        df.rename(columns=table_config["column_mapping"], inplace=True)
        return df
    except Exception as e:
        logging.error(f"Error transforming data for {table_config['destination_table']}: {e}")
        raise

### Load the transformed data into the destination table, replacing the data without dropping the table

In [None]:
from sqlalchemy import text

def load(df, table_config):
    try:
        logging.info(f"Replacing data in {table_config['destination_table']}...")

        # Connect to the warehouse database (destination)
        with db_connection(warehouse_conn_string) as conn:
            # Step 1: Remove all rows + reset identity using text()
            conn.execute(text(f"TRUNCATE TABLE {table_config['destination_table']} RESTART IDENTITY CASCADE;"))
            conn.commit()

            # Step 2: Insert the new records into the table
            df.to_sql(
                table_config["destination_table"], 
                conn, 
                if_exists="append",  # This will insert new data
                index=False
            )
        
        logging.info(f"Data successfully loaded into {table_config['destination_table']}")
    
    except Exception as e:
        logging.error(f"Error replacing data in {table_config['destination_table']}: {e}")
        raise


### Run full ETL process

In [None]:
def run_etl():
    try:
        logging.info("Starting ETL Process...")
        validate_config(etl_config) 
        for table_name, table_config in etl_config.items():
            df = extract(table_config)
            df = transform(df, table_config)
            load(df, table_config)
        logging.info("ETL Process Completed Successfully!")
    except Exception as e:
        logging.error(f"ETL process failed: {e}")

### Run Process

In [None]:
if __name__ == "__main__":
    run_etl()