# RC Pakistan Cargo & Logistics - Star Schema and ETL Pipeline

This notebook covers:
1. Star schema design for logistics data warehouse
2. Dimension table creation
3. Fact table creation
4. ETL pipeline implementation

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import sqlite3
import warnings
warnings.filterwarnings('ignore')

print("RC Pakistan Cargo & Logistics - Star Schema ETL Pipeline")
print("=" * 60)

## 1. Load Cleaned Data

In [None]:
# Load cleaned datasets
customers_df = pd.read_csv('../processed_data/customers_clean.csv')
bookings_df = pd.read_csv('../processed_data/bookings_clean.csv')
shipments_df = pd.read_csv('../processed_data/shipments_clean.csv')
payments_df = pd.read_csv('../processed_data/payments_clean.csv')

# Convert date columns
customers_df['CreatedDate'] = pd.to_datetime(customers_df['CreatedDate'])
bookings_df['BookingDate'] = pd.to_datetime(bookings_df['BookingDate'])
shipments_df['ShipmentDate'] = pd.to_datetime(shipments_df['ShipmentDate'])
shipments_df['ExpectedDelivery'] = pd.to_datetime(shipments_df['ExpectedDelivery'])
payments_df['Date'] = pd.to_datetime(payments_df['Date'])

print("Data loaded successfully")

## 2. Create Dimension Tables

In [None]:
# DimDate - Time dimension
def create_dim_date(start_date='2022-01-01', end_date='2022-12-31'):
    start = pd.to_datetime(start_date)
    end = pd.to_datetime(end_date)
    
    date_range = pd.date_range(start=start, end=end, freq='D')
    
    dim_date = pd.DataFrame({
        'DateKey': [int(d.strftime('%Y%m%d')) for d in date_range],
        'FullDate': date_range,
        'Year': date_range.year,
        'Quarter': date_range.quarter,
        'Month': date_range.month,
        'MonthName': date_range.strftime('%B'),
        'Day': date_range.day,
        'WeekDay': date_range.dayofweek + 1,
        'WeekDayName': date_range.strftime('%A'),
        'IsWeekend': (date_range.dayofweek >= 5).astype(int)
    })
    
    return dim_date

dim_date = create_dim_date()
print(f"DimDate created with {len(dim_date)} records")
print(dim_date.head())

In [None]:
# DimCustomer
dim_customer = customers_df.copy()
dim_customer['CustomerKey'] = range(1, len(dim_customer) + 1)
dim_customer = dim_customer[['CustomerKey', 'CustomerID', 'Name', 'Phone', 'City', 'CreatedDate']]
dim_customer.rename(columns={'Name': 'CustomerName'}, inplace=True)

print(f"DimCustomer created with {len(dim_customer)} records")
print(dim_customer.head())

In [None]:
# DimCity
cities_data = [
    (1, 'Dubai', 'UAE', 'Origin'),
    (2, 'Sharjah', 'UAE', 'Origin'),
    (3, 'Ajman', 'UAE', 'Origin'),
    (4, 'Karachi', 'Pakistan', 'Destination'),
    (5, 'Lahore', 'Pakistan', 'Destination'),
    (6, 'Islamabad', 'Pakistan', 'Destination'),
    (7, 'Rawalpindi', 'Pakistan', 'Destination'),
    (8, 'Peshawar', 'Pakistan', 'Destination'),
    (9, 'Mirpur', 'Azad Kashmir', 'Destination'),
    (10, 'Muzaffarabad', 'Azad Kashmir', 'Destination')
]

dim_city = pd.DataFrame(cities_data, columns=['CityKey', 'CityName', 'Country', 'CityType'])
print(f"DimCity created with {len(dim_city)} records")
print(dim_city)

In [None]:
# DimTransportMode
transport_modes = [
    (1, 'Air', 'Fast delivery, higher cost'),
    (2, 'Sea', 'Economical, longer transit time')
]

dim_transport = pd.DataFrame(transport_modes, columns=['ModeKey', 'ModeName', 'Description'])
print(f"DimTransportMode created with {len(dim_transport)} records")
print(dim_transport)

In [None]:
# DimStatus
status_data = [
    (1, 'Booked', 'Initial booking created'),
    (2, 'In Transit', 'Shipment in progress'),
    (3, 'Arrived', 'Arrived at destination country'),
    (4, 'Customs Cleared', 'Cleared customs procedures'),
    (5, 'Delivered', 'Successfully delivered to customer')
]

dim_status = pd.DataFrame(status_data, columns=['StatusKey', 'StatusName', 'Description'])
print(f"DimStatus created with {len(dim_status)} records")
print(dim_status)

## 3. Create Fact Tables

In [None]:
# FactShipment - Main fact table
def create_fact_shipment():
    # Merge all related tables
    fact_base = bookings_df.merge(shipments_df, on='BookingID', how='inner')
    
    # Create lookup dictionaries for dimension keys
    customer_lookup = dict(zip(dim_customer['CustomerID'], dim_customer['CustomerKey']))
    city_lookup = dict(zip(dim_city['CityName'], dim_city['CityKey']))
    transport_lookup = {'Air': 1, 'Sea': 2}
    status_lookup = {'Booked': 1, 'In Transit': 2, 'Arrived': 3, 'Customs Cleared': 4, 'Delivered': 5}
    
    # Create date keys
    def get_date_key(date_col):
        return pd.to_datetime(date_col).dt.strftime('%Y%m%d').astype(int)
    
    fact_shipment = pd.DataFrame({
        'ShipmentKey': range(1, len(fact_base) + 1),
        'ShipmentID': fact_base['ShipmentID'],
        'BookingID': fact_base['BookingID'],
        'CustomerKey': fact_base['CustomerID'].map(customer_lookup),
        'OriginCityKey': fact_base['Origin'].map(city_lookup),
        'DestinationCityKey': fact_base['Destination'].map(city_lookup),
        'TransportModeKey': fact_base['Mode'].map(transport_lookup),
        'StatusKey': fact_base['Status_x'].map(status_lookup),
        'BookingDateKey': get_date_key(fact_base['BookingDate']),
        'ShipmentDateKey': get_date_key(fact_base['ShipmentDate']),
        'ExpectedDeliveryDateKey': get_date_key(fact_base['ExpectedDelivery']),
        'WeightKG': fact_base['WeightKG'],
        'TransitDays': (pd.to_datetime(fact_base['ExpectedDelivery']) - pd.to_datetime(fact_base['ShipmentDate'])).dt.days
    })
    
    return fact_shipment

fact_shipment = create_fact_shipment()
print(f"FactShipment created with {len(fact_shipment)} records")
print(fact_shipment.head())

In [None]:
# FactRevenue - Revenue analysis
def create_fact_revenue():
    # Merge payments with bookings for additional context
    revenue_base = payments_df.merge(bookings_df, on='BookingID', how='inner')
    
    customer_lookup = dict(zip(dim_customer['CustomerID'], dim_customer['CustomerKey']))
    
    fact_revenue = pd.DataFrame({
        'RevenueKey': range(1, len(revenue_base) + 1),
        'PaymentID': revenue_base['PaymentID'],
        'BookingID': revenue_base['BookingID'],
        'CustomerKey': revenue_base['CustomerID'].map(customer_lookup),
        'PaymentDateKey': pd.to_datetime(revenue_base['Date']).dt.strftime('%Y%m%d').astype(int),
        'Amount': revenue_base['Amount'],
        'PaymentMethod': revenue_base['Method'],
        'WeightKG': revenue_base['WeightKG'],
        'RevenuePerKG': revenue_base['Amount'] / revenue_base['WeightKG']
    })
    
    return fact_revenue

fact_revenue = create_fact_revenue()
print(f"FactRevenue created with {len(fact_revenue)} records")
print(fact_revenue.head())

## 4. Data Warehouse Validation

In [None]:
# Validate star schema integrity
print("Star Schema Validation:")
print("-" * 30)

# Check for orphaned records
orphaned_customers = fact_shipment[~fact_shipment['CustomerKey'].isin(dim_customer['CustomerKey'])]
print(f"Orphaned customer records: {len(orphaned_customers)}")

orphaned_dates = fact_shipment[~fact_shipment['BookingDateKey'].isin(dim_date['DateKey'])]
print(f"Orphaned date records: {len(orphaned_dates)}")

# Summary statistics
print(f"\nStar Schema Summary:")
print(f"Dimension Tables: 5")
print(f"Fact Tables: 2")
print(f"Total Records: {len(fact_shipment) + len(fact_revenue)}")
print(f"Date Range: {dim_date['FullDate'].min()} to {dim_date['FullDate'].max()}")

## 5. Save Star Schema to Database

In [None]:
# Create SQLite database for the star schema
conn = sqlite3.connect('../processed_data/rc_logistics_dw.db')

# Save dimension tables
dim_date.to_sql('DimDate', conn, if_exists='replace', index=False)
dim_customer.to_sql('DimCustomer', conn, if_exists='replace', index=False)
dim_city.to_sql('DimCity', conn, if_exists='replace', index=False)
dim_transport.to_sql('DimTransportMode', conn, if_exists='replace', index=False)
dim_status.to_sql('DimStatus', conn, if_exists='replace', index=False)

# Save fact tables
fact_shipment.to_sql('FactShipment', conn, if_exists='replace', index=False)
fact_revenue.to_sql('FactRevenue', conn, if_exists='replace', index=False)

conn.close()

print("Star schema saved to SQLite database: rc_logistics_dw.db")
print("ETL pipeline completed successfully!")

## 6. Export for Analytics

In [None]:
# Save star schema tables as CSV for further analysis
import os
os.makedirs('../star_schema', exist_ok=True)

# Save dimension tables
dim_date.to_csv('../star_schema/DimDate.csv', index=False)
dim_customer.to_csv('../star_schema/DimCustomer.csv', index=False)
dim_city.to_csv('../star_schema/DimCity.csv', index=False)
dim_transport.to_csv('../star_schema/DimTransportMode.csv', index=False)
dim_status.to_csv('../star_schema/DimStatus.csv', index=False)

# Save fact tables
fact_shipment.to_csv('../star_schema/FactShipment.csv', index=False)
fact_revenue.to_csv('../star_schema/FactRevenue.csv', index=False)

print("Star schema exported to CSV files in star_schema/ directory")