✅ How it fits together

Extract → Reads CSVs.

Transform → Cleans and aggregates data.

Load → Inserts into Postgres safely.

Data Quality Check → Runs Soda checks after loading

Logging → Tracks every step in etl_pipeline.log.

In [1]:
import pandas as pd
import psycopg2 # thie libary is use for connect to postgres
from psycopg2.extras import execute_values # this library is use for insert data
import logging
import subprocess # this library is use for run command
import os


# define the file paths for the orders and customers data directory.
base_dir = r"D:\DE\ETL_Practices\Practice_Coding\8 Weeks Pyhton Practice for DE\Week4_Data Pipelines in Pure Python\Sample Dataset"

orders_path = os.path.join(base_dir, "orders_data.csv")
customers_path = os.path.join(base_dir, "customer_data.csv")



In [None]:

# -----------------------
# Logging configuration
# -----------------------
logging.basicConfig(
    filename='etl_pipeline_mini_project.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# -----------------------
# Extract function
# -----------------------
def extract(customers_path, orders_path):
    try:
        customers = pd.read_csv(customers_path)
        orders = pd.read_csv(orders_path)
        logging.info("Extract successful")
        return customers, orders 
    except Exception as e:
        logging.error(f"Extract failed: {e}")
        raise # raise the exception

# -----------------------
# Transform function
# -----------------------
def transform(customers, orders):
    try:
        # Clean customers
        customers = customers.dropna(subset=['email']) # drop rows with missing email
        customers = customers[customers['email'].str.contains('@')] # drop rows with invalid email,by creating a boolean mask (True/False for each row).
        customers['signup_date'] = pd.to_datetime(customers['signup_date'])

        # Clean orders
        orders['order_date'] = pd.to_datetime(orders['order_date'])
        orders = orders[orders['customer_id'].isin(customers['customer_id'])] # drop rows with invalid customer_id

        # Aggregate: total amount per customer
        orders_summary = orders.groupby('customer_id', as_index=False)['amount'].sum() # group by customer_id and sum the amount
        orders_summary.rename(columns={'amount':'total_amount'}, inplace=True)

        logging.info("Transform successful")
        return customers, orders, orders_summary
    except Exception as e:
        logging.error(f"Transform failed: {e}")
        raise

# -----------------------
# Data Quality Checks using Soda
# -----------------------
def run_data_quality_checks():
    logging.info("Running Soda scan...")

    result = subprocess.run(
        ["soda", "scan", "-d", "postgres_db", "-c", "warehouse.yml", "checks.yml"],
        capture_output=True,
        text=True
    )
    # the above code will run the soda scan command and capture the output
    logging.info(result.stdout)
    logging.error(result.stderr)

    if result.returncode != 0:
        raise ValueError(" Data quality checks failed")

    logging.info(" Data quality checks passed")
    
# -----------------------
# Load function
# -----------------------
def load(customers, orders, orders_summary):
    try:
        conn = psycopg2.connect(
            host="localhost",
            dbname="db_test",
            user="postgres",
            password="Aung24188"
        )
        cur = conn.cursor()

        # Create tables
        cur.execute("""
            CREATE TABLE IF NOT EXISTS customers (
                customer_id INT PRIMARY KEY,
                first_name VARCHAR(50),
                last_name VARCHAR(50),
                email VARCHAR(100),
                signup_date DATE
            );
        """)
        cur.execute("""
            CREATE TABLE IF NOT EXISTS orders (
                order_id INT PRIMARY KEY,
                customer_id INT REFERENCES customers(customer_id),
                order_date DATE,
                amount FLOAT,
                status VARCHAR(20)
            );
        """)
        cur.execute("""
            CREATE TABLE IF NOT EXISTS orders_summary (
                customer_id INT PRIMARY KEY,
                total_amount FLOAT
            );
        """)

        # Insert data
        execute_values(cur,
                       "INSERT INTO customers (customer_id, first_name, last_name, email, signup_date) VALUES %s ON CONFLICT (customer_id) DO NOTHING;",
                       customers.values.tolist())

        execute_values(cur,
                       "INSERT INTO orders (order_id, customer_id, order_date, amount, status) VALUES %s ON CONFLICT (order_id) DO NOTHING;",
                       orders.values.tolist())

        execute_values(cur,
                       "INSERT INTO orders_summary (customer_id, total_amount) VALUES %s ON CONFLICT (customer_id) DO NOTHING;",
                       orders_summary.values.tolist())

        conn.commit()
        cur.close()
        conn.close()
        logging.info("Load successful")
    except Exception as e:
        logging.error(f"Load failed: {e}")
        raise

# -----------------------
# Main ETL Pipeline
# -----------------------
if __name__ == "__main__":
    customers_csv = customers_path
    orders_csv = orders_path
    logging.info("Logging ETL pipeline Mini Project started")
    
    
    # 1. Extract
    customers_df, orders_df = extract(customers_csv, orders_csv)

    # 2. Transform
    customers_clean, orders_clean, orders_summary = transform(customers_df, orders_df)

    # 3. Load
    load(customers_clean, orders_clean, orders_summary)
    
    # 4. Data Quality Check (Soda after load into Postgres)
    run_data_quality_checks()

    logging.info("ETL pipeline with data quality checks completed successfully")
