<a href="https://colab.research.google.com/github/Naman2206/Retail-Inventory-Intelligence-/blob/main/Simplified_ETL_Process_with_Python_(Pandas).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import pandas as pd
import os
from datetime import datetime, timedelta

# --- Configuration ---
# Define the base directory for raw and curated data
BASE_DIR = "data_engineering_project"
RAW_DATA_DIR = os.path.join(BASE_DIR, "raw_data")
CURATED_DATA_DIR = os.path.join(BASE_DIR, "curated_data")

# Ensure directories exist
os.makedirs(RAW_DATA_DIR, exist_ok=True)
os.makedirs(CURATED_DATA_DIR, exist_ok=True)

# --- 1. Simulate Large Dataset Generation (Extract - Simplified) ---
# In a real scenario, this would be reading from S3, Kafka, DB, etc.
# We'll simulate daily transaction data for a few days.

print("--- Simulating Raw Data Generation ---")
num_days = 3
start_date = datetime(2025, 7, 23)

for i in range(num_days):
    current_date = start_date + timedelta(days=i)
    date_str = current_date.strftime("%Y-%m-%d")
    file_path = os.path.join(RAW_DATA_DIR, f"transactions_{date_str}.csv")

    # Simulate transaction data for the day
    data = {
        'transaction_id': range(100 * i, 100 * i + 50), # 50 transactions per day
        'customer_id': [f'CUST_{j % 10}' for j in range(50)],
        'product_id': [f'PROD_{j % 5}' for j in range(50)],
        'quantity': [j % 5 + 1 for j in range(50)],
        'price': [round((j % 10 + 1) * 10.5, 2) for j in range(50)],
        'timestamp': [(current_date + timedelta(hours=j)).isoformat() for j in range(50)],
        'status': ['completed' if j % 7 != 0 else 'failed' for j in range(50)]
    }
    df_raw = pd.DataFrame(data)

    # Save raw data (simulating ingestion into a raw layer)
    df_raw.to_csv(file_path, index=False)
    print(f"Generated raw data for {date_str} at: {file_path}")

# --- 2. ETL Process (Per-Day Partitioning & Transformation) ---

print("\n--- Starting ETL Process ---")

# Simulate a product lookup table for enrichment
products_lookup = pd.DataFrame({
    'product_id': [f'PROD_{j}' for j in range(5)],
    'product_name': [f'Product {j+1}' for j in range(5)],
    'product_category': [['Electronics', 'Home Goods', 'Apparel', 'Books', 'Food'][j % 5] for j in range(5)]
})

for i in range(num_days):
    current_date = start_date + timedelta(days=i)
    date_str = current_date.strftime("%Y-%m-%d")
    raw_file_path = os.path.join(RAW_DATA_DIR, f"transactions_{date_str}.csv")

    if not os.path.exists(raw_file_path):
        print(f"Raw file not found for {date_str}. Skipping ETL for this day.")
        continue

    print(f"\nProcessing data for {date_str}...")

    # --- Extract ---
    # Read the raw daily transaction file
    try:
        df_transactions = pd.read_csv(raw_file_path)
        print(f"Extracted {len(df_transactions)} records from {raw_file_path}")
    except Exception as e:
        print(f"Error reading {raw_file_path}: {e}")
        continue

    # --- Transform ---
    # 2.1. Data Cleaning: Handle missing values (though our simulated data is clean)
    # For demonstration, let's assume 'quantity' could be missing and fill with 1
    df_transactions['quantity'] = df_transactions['quantity'].fillna(1)
    # Convert timestamp to datetime objects
    df_transactions['timestamp'] = pd.to_datetime(df_transactions['timestamp'])

    # 2.2. Data Enrichment: Join with product lookup table
    df_transactions = pd.merge(df_transactions, products_lookup, on='product_id', how='left')
    # Handle cases where product_id might not be in lookup
    df_transactions['product_name'] = df_transactions['product_name'].fillna('Unknown Product')
    df_transactions['product_category'] = df_transactions['product_category'].fillna('Unknown Category')

    # 2.3. Data Aggregation / New Feature Creation: Calculate total amount
    df_transactions['total_amount'] = df_transactions['quantity'] * df_transactions['price']

    # 2.4. Filtering: Only keep 'completed' transactions for curated data
    df_curated = df_transactions[df_transactions['status'] == 'completed'].copy()

    # Select and reorder columns for the curated dataset
    df_curated = df_curated[[
        'transaction_id', 'customer_id', 'product_id', 'product_name',
        'product_category', 'quantity', 'price', 'total_amount', 'timestamp'
    ]]

    print(f"Transformed data: {len(df_curated)} completed transactions.")

    # --- Load ---
    # Create partitioned directory for curated data (e.g., curated_data/year=2025/month=07/day=23/)
    curated_partition_dir = os.path.join(
        CURATED_DATA_DIR,
        f"year={current_date.year}",
        f"month={current_date.month:02d}",
        f"day={current_date.day:02d}"
    )
    os.makedirs(curated_partition_dir, exist_ok=True)

    curated_file_path = os.path.join(curated_partition_dir, f"transactions_curated_{date_str}.parquet")

    # Save the curated data in Parquet format (efficient for analytical queries)
    # In a real scenario, this would be loading to BigQuery, Snowflake, etc.
    df_curated.to_parquet(curated_file_path, index=False)
    print(f"Loaded curated data to: {curated_file_path}")

print("\n--- ETL Process Completed ---")

# --- 3. Verification (Optional) ---
print("\n--- Verifying Curated Data ---")
# Let's read one of the curated files to show it worked
example_date = start_date + timedelta(days=num_days - 1)
example_date_str = example_date.strftime("%Y-%m-%d")

example_curated_path = os.path.join(
    CURATED_DATA_DIR,
    f"year={example_date.year}",
    f"month={example_date.month:02d}",
    f"day={example_date.day:02d}",
    f"transactions_curated_{example_date_str}.parquet"
)

if os.path.exists(example_curated_path):
    df_verified = pd.read_parquet(example_curated_path)
    print(f"\nSuccessfully read {len(df_verified)} records from curated data for {example_date_str}:")
    print(df_verified.head())
else:
    print(f"Could not find example curated file at: {example_curated_path}")

--- Simulating Raw Data Generation ---
Generated raw data for 2025-07-23 at: data_engineering_project/raw_data/transactions_2025-07-23.csv
Generated raw data for 2025-07-24 at: data_engineering_project/raw_data/transactions_2025-07-24.csv
Generated raw data for 2025-07-25 at: data_engineering_project/raw_data/transactions_2025-07-25.csv

--- Starting ETL Process ---

Processing data for 2025-07-23...
Extracted 50 records from data_engineering_project/raw_data/transactions_2025-07-23.csv
Transformed data: 42 completed transactions.
Loaded curated data to: data_engineering_project/curated_data/year=2025/month=07/day=23/transactions_curated_2025-07-23.parquet

Processing data for 2025-07-24...
Extracted 50 records from data_engineering_project/raw_data/transactions_2025-07-24.csv
Transformed data: 42 completed transactions.
Loaded curated data to: data_engineering_project/curated_data/year=2025/month=07/day=24/transactions_curated_2025-07-24.parquet

Processing data for 2025-07-25...
Extr

# New section