In [1]:
import pandas as pd
import sqlite3
from datetime import datetime
import logging

In [2]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

###Database Connection

In [3]:
conn = sqlite3.connect("warehouse.db")
cursor = conn.cursor()

logging.info("Connected to warehouse")

###Create Warehouse Table

In [4]:
cursor.execute("""
CREATE TABLE IF NOT EXISTS fact_orders (
    order_id TEXT PRIMARY KEY,
    customer_id TEXT,
    order_status TEXT,
    purchase_timestamp TIMESTAMP,
    approved_at TIMESTAMP,
    delivered_customer_date TIMESTAMP
)
""")

conn.commit()
logging.info("Warehouse table ready")

###Load Source Data

In [5]:
orders = pd.read_csv("olist_orders_dataset.csv")

orders["order_purchase_timestamp"] = pd.to_datetime(
    orders["order_purchase_timestamp"]
)

orders["order_approved_at"] = pd.to_datetime(
    orders["order_approved_at"]
)

orders["order_delivered_customer_date"] = pd.to_datetime(
    orders["order_delivered_customer_date"]
)

logging.info("Source data loaded")
print(orders.shape)

(99441, 8)


###Get Last Loaded Timestamp

In [6]:
query = "SELECT MAX(purchase_timestamp) FROM fact_orders"
result = pd.read_sql(query, conn)

last_loaded = result.iloc[0, 0]

if last_loaded is None:
    last_loaded = "2010-01-01"

last_loaded = pd.to_datetime(last_loaded)

print("Last loaded timestamp:", last_loaded)

Last loaded timestamp: 2018-10-17 17:30:18


###Incremental Filter

In [7]:
new_orders = orders[
    orders["order_purchase_timestamp"] > last_loaded
]

logging.info(f"New records found: {len(new_orders)}")

###Select Required Columns

In [8]:
new_orders = new_orders[[
    "order_id",
    "customer_id",
    "order_status",
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_customer_date"
]]

new_orders.rename(columns={
    "order_purchase_timestamp": "purchase_timestamp",
    "order_approved_at": "approved_at",
    "order_delivered_customer_date": "delivered_customer_date"
}, inplace=True)

In [9]:
print(new_orders.columns)

Index(['order_id', 'customer_id', 'order_status', 'purchase_timestamp',
       'approved_at', 'delivered_customer_date'],
      dtype='object')


In [10]:
new_orders.head()

Unnamed: 0,order_id,customer_id,order_status,purchase_timestamp,approved_at,delivered_customer_date


###Load to Warehouse

In [11]:
new_orders.to_sql(
    "fact_orders",
    conn,
    if_exists="append",
    index=False
)

logging.info("Incremental load completed")
print("Rows inserted:", len(new_orders))

Rows inserted: 0


###Validate Load

In [12]:
df = pd.read_sql("SELECT COUNT(*) as total_rows FROM fact_orders", conn)
df

Unnamed: 0,total_rows
0,99441


###Data Quality Check

In [13]:
duplicates = new_orders["order_id"].duplicated().sum()
print("Duplicate orders:", duplicates)

Duplicate orders: 0


###Close Connection

In [15]:
conn.close()
logging.info("Pipeline finished")