# ETL: Bronze to Silver Layer Transformation

This notebook connects to a SQL Server database, extracts raw data from the `bronze` schema, performs cleaning and transformations, and loads the clean, structured data into the `silver` schema. The process is optimized to handle large tables by training models on subsamples and processing data in memory-safe chunks.

## 1. Imports and Setup

In [1]:
import pandas as pd
import numpy as np
import gc
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sqlalchemy import create_engine

# Pandas display options
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)

print("Libraries imported successfully. ✅")

Libraries imported successfully. ✅


## 2. Database Connection

In [2]:
server = "127.0.0.1,1433"
database = "Financial Transaction WHD"
username = "sa"
password = "Mfa01042004!!"

try:
    conn_str = f"mssql+pyodbc://{username}:{password}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server"
    engine = create_engine(conn_str)
    print("Connection to SQL Server established. ✅")
except Exception as e:
    print(f"Failed to connect to database: {e}")

Connection to SQL Server established. ✅


## 3. Process Ancillary Tables (Bronze → Silver)

These tables are smaller and can be processed in a single pass.

## 4. Process Main Transactions Table

This is the largest table and requires an optimized workflow.

### Step 4.1: Load and Perform Initial Cleaning

In [3]:
print("--- Processing Transactions Data ---")
# Step 1: Load raw transaction data from Bronze
print("Step 1: Loading raw transactions data...")
bronze_txn = pd.read_sql("SELECT * FROM bronze.transactions_data", engine)
print(f"Loaded {len(bronze_txn)} transaction records.")

# Step 2: Perform initial, low-memory cleaning
print("Step 2: Performing initial data cleaning...")
bronze_txn["merchant_state"] = bronze_txn["merchant_state"].astype(str).str.strip()
bronze_txn["merchant_city"] = bronze_txn["merchant_city"].astype(str).str.strip()
bronze_txn["zip"] = bronze_txn["zip"].astype(str).str.strip()

# Fix cases where zip code was mistakenly entered in the state column
mask_zip_from_state = (~bronze_txn["zip"].str.match(r"^\d+$", na=False) & bronze_txn["merchant_state"].str.match(r"^\d+$", na=False))
bronze_txn.loc[mask_zip_from_state, "zip"] = bronze_txn.loc[mask_zip_from_state, "merchant_state"]
bronze_txn.loc[mask_zip_from_state, "merchant_state"] = np.nan

# Invalidate incorrect states and cities
bronze_txn.loc[bronze_txn["merchant_state"].str.len() > 2, "merchant_state"] = np.nan
invalid_city_mask = (bronze_txn["merchant_city"].str.match(r"^\d+$", na=False) | (bronze_txn["merchant_city"].str.len() <= 2))
bronze_txn.loc[invalid_city_mask, "merchant_city"] = np.nan

--- Processing Transactions Data ---
Step 1: Loading raw transactions data...
Loaded 1048575 transaction records.
Step 2: Performing initial data cleaning...


### Step 4.2: Train Models to Impute Missing Locations

In [4]:
print("Step 3: Training location imputation models on a data sample...")
# Create a temporary DataFrame with clean, known locations
temp_df = bronze_txn[(~bronze_txn["merchant_city"].isna()) & (~bronze_txn["merchant_state"].isna())][["merchant_state", "merchant_city", "zip"]].copy()
temp_df['zip'] = pd.to_numeric(temp_df['zip'], errors='coerce')
temp_df.dropna(subset=['zip'], inplace=True)
temp_df['zip'] = temp_df['zip'].astype(int)

# Encode labels
le_state = LabelEncoder()
temp_df['state_encoded'] = le_state.fit_transform(temp_df['merchant_state'])
le_city = LabelEncoder()
temp_df['city_encoded'] = le_city.fit_transform(temp_df['merchant_city'])

# Create a smaller sample for training
sample_size = min(50000, len(temp_df))
train_sample = temp_df.sample(n=sample_size, random_state=42)

X_train = train_sample[['zip']]
y_state_train = train_sample['state_encoded']
y_city_train = train_sample['city_encoded']

# Train state model
print(f"Training state model on {len(X_train)} samples...")
state_model = RandomForestClassifier(n_estimators=50, random_state=42, n_jobs=-1)
state_model.fit(X_train, y_state_train)

# Train city model
print(f"Training city model on {len(X_train)} samples...")
city_model = RandomForestClassifier(n_estimators=50, random_state=42, n_jobs=-1)
city_model.fit(X_train, y_city_train)

print("Models trained successfully. ✅")
del temp_df, train_sample, X_train # Clean up memory

Step 3: Training location imputation models on a data sample...
Training state model on 50000 samples...
Training city model on 50000 samples...
Models trained successfully. ✅


### Step 4.3: Process Full Dataset in Chunks

In [5]:
print("Step 4: Cleaning and imputing full dataset in memory-safe chunks...")
def clean_amount(val):
    if pd.isna(val): return None
    val = str(val).replace("$", "").replace(",", "")
    if val.startswith("(") and val.endswith(")"): return -float(val.strip("() "))
    try: return float(val)
    except (ValueError, TypeError): return None

cleaned_chunks = []
chunk_size = 100000
num_chunks = int(np.ceil(len(bronze_txn) / chunk_size))

for i in range(num_chunks):
    start_index = i * chunk_size
    end_index = start_index + chunk_size
    chunk = bronze_txn.iloc[start_index:end_index].copy()
    
    print(f"Processing chunk {i+1}/{num_chunks}...")

    # Apply cleaning to the chunk
    chunk["date"] = pd.to_datetime(chunk["date"], errors="coerce")
    chunk["amount"] = chunk["amount"].apply(clean_amount)
    chunk["errors"] = chunk["errors"].fillna("None")
    for col in ["id", "client_id", "card_id", "merchant_id"]:
        chunk[col] = chunk[col].astype(str).str.strip()

    # Impute missing locations within the chunk
    chunk['zip'] = pd.to_numeric(chunk['zip'], errors='coerce').astype('Int64')
    missing_mask = (chunk['merchant_state'].isna() | chunk['merchant_city'].isna()) & chunk['zip'].notna()
    rows_to_impute = chunk[missing_mask]

    if not rows_to_impute.empty:
        predicted_states = le_state.inverse_transform(state_model.predict(rows_to_impute[['zip']]))
        predicted_cities = le_city.inverse_transform(city_model.predict(rows_to_impute[['zip']]))
        chunk.loc[missing_mask, 'merchant_state'] = predicted_states
        chunk.loc[missing_mask, 'merchant_city'] = predicted_cities

    cleaned_chunks.append(chunk)

# Combine chunks into the final DataFrame
print("Combining all cleaned chunks...")
txn_final = pd.concat(cleaned_chunks, ignore_index=True)
print("Transactions DataFrame fully cleaned and imputed. ✅")

# Clean up memory
del cleaned_chunks, bronze_txn
gc.collect()

Step 4: Cleaning and imputing full dataset in memory-safe chunks...
Processing chunk 1/11...
Processing chunk 2/11...
Processing chunk 3/11...
Processing chunk 4/11...
Processing chunk 5/11...
Processing chunk 6/11...
Processing chunk 7/11...
Processing chunk 8/11...
Processing chunk 9/11...
Processing chunk 10/11...
Processing chunk 11/11...
Combining all cleaned chunks...
Transactions DataFrame fully cleaned and imputed. ✅


0

### Step 4.4: Load Final Data and Close Connection

In [8]:
csv_path = "/home/m.farrag/transactions_data.csv"

txn_final.to_csv(csv_path, index=False, encoding='utf-8')
print(f"Transactions data saved successfully to {csv_path}")


Transactions data saved successfully to /home/m.farrag/transactions_data.csv
