In [19]:
from dotenv import load_dotenv
import hashlib
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
from sqlalchemy.types import Text, DateTime, Numeric
import pandas as pd
from pathlib import Path
import os

Loading environment variables (.env)

In [None]:
# -------- 1) Load environment variables (.env) --------

# This allows to change credentials without changing code.
load_dotenv()

DB_USER = os.getenv("DB_USER", "postgres")
DB_PASS = os.getenv("DB_PASS", "")
DB_NAME = os.getenv("DB_NAME", "ecom")
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", "5432"))

Building the SQLAlchemy engine URL

In [None]:
# -------- 2) Build the SQLAlchemy engine URL --------

# Connecting to "ecom" DB
url = URL.create(
    "postgresql+psycopg2",
    username=DB_USER,
    password=DB_PASS,
    host=DB_HOST,
    port=DB_PORT,
    database=DB_NAME
)

# Creating engine
engine = create_engine(url, echo=False, pool_pre_ping=True, future=True)

In [None]:
# -------- 3) Load the CSV with pandas --------

# Creating a relative path to sample_orders.csv
csv_path = Path.cwd() / "data" / "sample_orders.csv"

# Mapping date columns' names
date_cols = [
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_customer_date",
    "order_estimated_delivery_date"
]

# Mapping expected columns
expected_cols = [
    "order_id",
    "customer_id",
    "order_status",
    # "order_purchase_timestamp",   # Let's evaluate if we end up incorporating this column.
    "order_approved_at",
    "order_delivered_customer_date",
    "order_estimated_delivery_date"
]

In [17]:
# Creating a Pandas df

sample_orders = pd.read_csv(
    csv_path, 
    parse_dates=date_cols, 
    keep_default_na=True,    # empty strings -> NaN
    usecols=expected_cols
    )

sample_orders.head()

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_customer_date,order_estimated_delivery_date
0,6961b52186094adeb7a6a1b4c6e45e72,6a46bdaf797f47a8bd992c27dda6e467,delivered,2018-05-12 15:46:46,2018-05-12 17:33:46,2018-05-27 14:33:46,2018-05-29
1,5fdd8858fb3f4aa098c1a06da5138384,eee1fe9e53424b25b03acb15183a0a39,delivered,2017-03-28 11:21:41,2017-03-28 13:27:41,2017-04-11 21:27:41,2017-04-09
2,491734820cce40348c2a6ea7612ff131,b9c1d556ad274173b092bb63afaef30c,shipped,2017-01-20 10:14:03,2017-01-20 12:02:03,NaT,2017-02-01
3,c947f49479f940bd959ce12f75585351,bd84e2305cef4a2c907c577b2c950199,delivered,2018-07-30 23:30:11,2018-07-31 00:12:11,2018-08-03 07:12:11,2018-08-13
4,f0acbf1378324a4b8188adef6b72c143,6cba08cfe2a247daaa37257083bd8ebb,delivered,2017-08-02 14:55:05,2017-08-02 16:28:05,2017-08-15 16:28:05,2017-08-21


Adding operational columns

In [None]:
# -------- 4) Add operational columns --------

In [None]:
# _ingested_at = when we loaded the row
sample_orders["_ingested_at"] = pd.Timestamp.utcnow()


In [None]:
# _source_file = Which file fed this row (helps trace issues later)
sample_orders["_source_file"] = os.path.basename(csv_path)


In [None]:
# _row_md5 = simple row-level checksum for idempotency/dedup in staging
# We hash a stable concatenation of key fields.

def row_hash(row) -> str:
    # We use natural keys + important fields that define the row's identity
    parts = [
        str(row["order_id"]),
        str(row["customer_id"]),
        str(row["order_status"]),
        # We use ISD format for datetimes to make deterministic strings
        row["order_approved_at"].isoformat() if pd.notna(row["order_approved_at"]) else "",
        row["order_delivered_customer_date"].isoformat() if pd.notna(row["order_delivered_customer_date"]) else "",
        row["order_estimated_delivery_date"].isoformat() if pd.notna(row["order_estimated_delivery_date"]) else "",
    ]
    txt = "|".join(parts)
    return hashlib.md5(txt.encode("utf-8")).hexdigest()

sample_orders["_row_md5"] = sample_orders.apply(row_hash, axis=1)

Map pandas dtypes to SQL column types (optional)

In [None]:
# -------- 5) Map pandas dtypes to SQL column types (optional) --------
# This helps Postgres store correct types. Matches staging.orders_raw DDL.

dtype_map = {
    "order_id": Text(),
    "customer_id" : Text(),
    "order_status"  : Text(),
    # "order_purchase_timestamp" : DateTime(),
    "order_approved_at" : DateTime(),
    "order_delivered_customer_date" : DateTime(),
    "order_estimated_delivery_date" : DateTime(),
    "_ingested_at" : DateTime(),
    "_source_file" : Text(),
    "_row_md5" : Text()
}

 to Postgres (staging.orders_raw)

In [None]:
# -------- 6) Write to Postgres (staging.orders_raw) --------
# if_exists='append' so we can run it multiple times; let's use chunksize for large files

table_name = "orders_raw"
schema_name = "staging"

with engine.begin() as conn: # begin() = transaction; commits or rolls back automatically
    # Optional: simple dedup strategy per file — delete rows with same _source_file first
    # This makes reruns idempotent for the same CSV. Comment out if you prefer pure append.

    conn.exec_driver_sql(
        f"DELETE FROM {schema_name}.{table_name} WHERE _source_file = %(src)s",
        {"src": os.path.basename(csv_path)},
    )

    sample_orders.to_sql(
        name=table_name,
        con=conn,
        schema=schema_name,
        if_exists="append"
        index=False,
        dtype=dtype_map,
        method="multi",     # batched inserts
        chunksize=1_000,    # can be adjusted
    )

Quick verification query

In [None]:
# -------- 7) Quick verification query --------
with engine.connect() as conn:
    res = conn.exec_driver_sql("SELECT COUNT(*) FROM staging.orders_raw;")
    count = res.scalar_one()
    print(f"Loaded rows in staging.orders_raw: {count}")

print("Done.")