# BoostCredit ETL Pipeline Demo

This notebook demonstrates the ETL pipeline for processing CSV and JSON data.

## Pipeline Flow:
1. **Extract** → Read data from CSV/JSON files
2. **Transform** → Clean, convert types, mask PII
3. **Store** → Save to object store (Parquet)
4. **Load** → Load from object store to PostgreSQL database

In [None]:
import os
import sys
import importlib
import pandas as pd
from pathlib import Path

# Reload modules to ensure we have the latest code (important for notebooks)
import src.loaders
import src.pipeline
import src.extractors
import src.transformers


from src.pipeline import Pipeline
from src.extractors import CSVExtractor, JSONExtractor
from src.transformers import CSVTransformer, JSONTransformer

# Set environment variables
os.environ['STORE_KEY'] = 'demo_data'
os.environ['DB_TYPE'] = 'postgresql'
os.environ['DB_HOST'] = 'localhost'
os.environ['DB_PORT'] = '5432'
os.environ['DB_USER'] = 'etl_user'
os.environ['DB_PASSWORD'] = 'etl_password'
os.environ['DB_NAME'] = 'etl_database'
os.environ['DATA_PATH'] = './data'
os.environ['OBJECT_STORE_PATH'] = './output'

print("✓ Environment variables set")
print("✓ Modules reloaded and imports successful")

## Step 1: Initialize Pipeline

The pipeline handles the complete ETL process automatically.

In [None]:
pipeline = Pipeline()
print("✓ Pipeline initialized")

## Step 2: Test Individual Components

Let's test each component separately to understand what they do.

In [None]:
# Test CSV Extractor
csv_extractor = CSVExtractor()
csv_file = Path('data/test.csv')
if csv_file.exists():
    sample_data = csv_extractor.extract(str(csv_file))
    print(f"✓ CSV Extracted: {len(sample_data)} rows")
    print(f"  Columns: {list(sample_data.columns)}")
    print(f"\n  First row sample:")
    print(sample_data.head(1))
else:
    print("⚠ CSV file not found")

In [None]:
sample_data.head()

In [None]:
# Test CSV Transformer
csv_transformer = CSVTransformer()
if csv_file.exists():
    transformed = csv_transformer.transform(sample_data.head(5))
    print("✓ CSV Transformed")
    print(f"  Data types converted")
    print(f"  PII masked (name, address)")
    print(f"\n  Transformed sample:")
    print(transformed[['id', 'name', 'created_at', 'is_claimed', 'paid_amount']].head(2))

In [None]:
transformed.head()

In [None]:
# Test JSON Extractor
json_extractor = JSONExtractor()
json_file = Path('data/test.json')
if json_file.exists():
    json_data = json_extractor.extract(str(json_file))
    print(f"✓ JSON Extracted: {len(json_data)} records")
    print(f"\n  First record keys: {list(json_data[0].keys())}")
    print(f"  Sample user_id: {json_data[0].get('user_id', 'N/A')}")
else:
    print("⚠ JSON file not found")

In [None]:
# Test JSON Transformer
json_transformer = JSONTransformer()
if json_file.exists():
    json_transformed = json_transformer.transform(json_data[:2])  # Transform 2 records
    print("✓ JSON Transformed into 3 tables:")
    print(f"  - users: {len(json_transformed['users'])} rows")
    print(f"  - telephone_numbers: {len(json_transformed['telephone_numbers'])} rows")
    print(f"  - jobs_history: {len(json_transformed['jobs_history'])} rows")
    print(f"\n  Users sample:")
    print(json_transformed['users'][['user_id', 'name', 'username']].head(2))

In [None]:
json_transformed

## Step 3: Run Complete Pipeline

Now let's run the full pipeline for CSV processing.

In [None]:
# Process CSV file
if csv_file.exists():
    os.environ['STORE_KEY'] = 'csv_demo'
    pipeline.process_csv('test.csv')
    print("✓ CSV processing completed!")
    print("  → Data extracted, transformed, saved to object store, and loaded to database")
else:
    print("⚠ CSV file not found - skipping CSV processing")

## Step 4: Process JSON File

Process JSON data which creates multiple linked tables.

In [None]:
# Process JSON file
if json_file.exists():
    os.environ['STORE_KEY'] = 'json_demo'
    pipeline.process_json('test.json')
    print("✓ JSON processing completed!")
    print("  → Created 3 tables: users, telephone_numbers, jobs_history")
    print("  → All PII masked (emails, phones, national IDs, passwords)")
else:
    print("⚠ JSON file not found - skipping JSON processing")

## Step 5: Query Database Tables

Verify that data was loaded into the database by querying all tables.

In [None]:
from sqlalchemy import create_engine, text, inspect

# Get database credentials from environment or use defaults
db_host = os.getenv('DB_HOST', 'localhost')
db_port = os.getenv('DB_PORT', '5432')
db_user = os.getenv('DB_USER', 'etl_user')
db_password = os.getenv('DB_PASSWORD', 'etl_password')
db_name = os.getenv('DB_NAME', 'etl_database')

# Create connection string
connection_string = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
engine = create_engine(connection_string)

# Get all tables
inspector = inspect(engine)
tables = inspector.get_table_names()

print(f"✓ Connected to database: {db_name}")
print(f"✓ Found {len(tables)} table(s):\n")

# Show table information
for table_name in tables:
    with engine.connect() as conn:
        # Get row count
        result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
        row_count = result.fetchone()[0]
        
        # Get column names
        columns = [col['name'] for col in inspector.get_columns(table_name)]
        
        print(f"Table: {table_name}")
        print(f"  Rows: {row_count:,}")
        print(f"  Columns: {', '.join(columns)}")
        print()

# Close connection
engine.dispose()

## Step 6: Verify Data in Object Store

Check what was saved to the object store (intermediate step).

In [None]:
from src.storage import ObjectStore

store = ObjectStore('./output')

# Check CSV data in store
csv_data = store.load('csv_demo', 'parquet')
if csv_data is not None:
    print(f"✓ CSV data in object store: {len(csv_data)} rows")
    print(f"  Columns: {list(csv_data.columns)}")

# Check JSON data in store
json_data_store = store.load('json_demo', 'parquet')
if json_data_store is not None:
    print(f"\n✓ JSON data in object store:")
    for table_name, df in json_data_store.items():
        print(f"  - {table_name}: {len(df)} rows")

## Step 7: Cleanup

Close the pipeline to release database connections.

In [None]:
pipeline.close()
print("✓ Pipeline closed - database connections released")