In [1]:
# Import Required Libraries
import os
import time
import json
from datetime import datetime
from typing import Dict, Any, List, Tuple
import findspark

# Initialize findspark to locate Spark installation
findspark.init()

# PySpark imports
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

print("✓ Libraries imported successfully!")
print(f"Current timestamp: {datetime.now()}")
print(f"Working directory: {os.getcwd()}")

✓ Libraries imported successfully!
Current timestamp: 2025-11-25 10:08:59.875872
Working directory: /home/ubuntu/project2/cluster3


In [2]:
# Initialize SparkSession - CLUSTER 3: RESOURCE-CONSTRAINED TEST
# Testing with limited resources to measure performance degradation
# Configuration: 2 cores + 3GB RAM per executor (50% of available resources)

SPARK_MASTER = "spark://192.168.10.115:7077"  # Master node from cluster

spark = SparkSession.builder \
    .appName("Silver-Layer-Cluster3-Resource-Constrained") \
    .master(SPARK_MASTER) \
    .config("spark.executor.memory", "3g") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "8") \
    .config("spark.default.parallelism", "16") \
    .config("spark.sql.shuffle.partitions", "16") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Set log level to reduce verbose output
spark.sparkContext.setLogLevel("WARN")

# Get SparkContext for RDD operations
sc = spark.sparkContext

print("=" * 80)
print("SPARK CLUSTER 3 - RESOURCE-CONSTRAINED TEST")
print("=" * 80)
print(f"Application Name: {sc.appName}")
print(f"Spark Version: {spark.version}")
print(f"Max Cores: {spark.conf.get('spark.cores.max')}")
print(f"Executor Memory: {spark.conf.get('spark.executor.memory')}")
print(f"Executor Cores: {spark.conf.get('spark.executor.cores')}")
print(f"Shuffle Partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")
print()
print("Physical Cluster (Available):")
print("  • Workers: 4 nodes")
print("  • Cores per worker: 4 cores (16 total available)")
print("  • Memory per worker: 6.8 GiB (~27.2 GiB total available)")
print()
print("Allocated Resources (Constrained Test):")
print("  • Executors: 4 (one per worker)")
print("  • Cores per executor: 2 (8 total = 50% of available)")
print("  • Memory per executor: 3g (12 GB total = 44% of available)")
print("  • Partitions: 16 (2x allocated cores)")
print()
print("Test Objective: Measure performance degradation with limited resources")
print("=" * 80)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/25 10:09:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/25 10:09:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SPARK CLUSTER 3 - RESOURCE-CONSTRAINED TEST
Application Name: Silver-Layer-Cluster3-Resource-Constrained
Spark Version: 3.5.0
Max Cores: 8
Executor Memory: 3g
Executor Cores: 2
Shuffle Partitions: 16

Physical Cluster (Available):
  • Workers: 4 nodes
  • Cores per worker: 4 cores (16 total available)
  • Memory per worker: 6.8 GiB (~27.2 GiB total available)

Allocated Resources (Constrained Test):
  • Executors: 4 (one per worker)
  • Cores per executor: 2 (8 total = 50% of available)
  • Memory per executor: 3g (12 GB total = 44% of available)
  • Partitions: 16 (2x allocated cores)

Test Objective: Measure performance degradation with limited resources
Max Cores: 8
Executor Memory: 3g
Executor Cores: 2
Shuffle Partitions: 16

Physical Cluster (Available):
  • Workers: 4 nodes
  • Cores per worker: 4 cores (16 total available)
  • Memory per worker: 6.8 GiB (~27.2 GiB total available)

Allocated Resources (Constrained Test):
  • Executors: 4 (one per worker)
  • Cores per executor: 

In [3]:
# Define Unified Silver Layer Schema and Normalization Functions
print("=" * 80)
print("DEFINING UNIFIED SILVER LAYER SCHEMA")
print("=" * 80)
print()

# Define target schema - all field names lowercase, consistent types
SILVER_SCHEMA = {
    # Original taxi trip fields (19 fields)
    'vendorid': 'long',
    'tpep_pickup_datetime': 'timestamp',
    'tpep_dropoff_datetime': 'timestamp',
    'passenger_count': 'double',  # Normalize to double
    'trip_distance': 'double',
    'ratecodeid': 'double',  # Normalize to double
    'store_and_fwd_flag': 'string',
    'pulocationid': 'long',
    'dolocationid': 'long',
    'payment_type': 'long',
    'fare_amount': 'double',
    'extra': 'double',
    'mta_tax': 'double',
    'tip_amount': 'double',
    'tolls_amount': 'double',
    'improvement_surcharge': 'double',
    'total_amount': 'double',
    'congestion_surcharge': 'double',
    'airport_fee': 'double',  # Standardize to lowercase
    
    # Bronze metadata fields (5 fields) - keep as-is
    '_bronze_ingestion_timestamp': 'string',
    '_bronze_source_file': 'string',
    '_bronze_record_id': 'string',
    '_bronze_status': 'string',
    '_bronze_quality_flags': 'string'
}

print("Target Schema Defined:")
print(f"  Total fields: {len(SILVER_SCHEMA)}")
print(f"  Original taxi fields: 19")
print(f"  Bronze metadata fields: 5")
print()

# Field name mapping for variations
FIELD_NAME_MAPPING = {
    'Airport_fee': 'airport_fee',  # Capital A -> lowercase
    'RatecodeID': 'ratecodeid',    # Just in case
    'PULocationID': 'pulocationid',  # Just in case
    'DOLocationID': 'dolocationid',  # Just in case
    'VendorID': 'vendorid'  # Just in case
}

print("Field Name Normalization Rules:")
for old_name, new_name in FIELD_NAME_MAPPING.items():
    print(f"  {old_name:20s} -> {new_name}")
print()

# RDD-based normalization function
def normalize_record(record: Dict) -> Dict:
    """
    MAP function: Normalize a single record to match Silver schema.
    
    Handles:
    1. Field name standardization (all lowercase)
    2. Data type conversion (passenger_count, ratecodeid to double)
    3. Null value handling
    
    Args:
        record: Dictionary representing a Bronze layer record
    
    Returns:
        Normalized dictionary matching Silver schema
    """
    normalized = {}
    
    # First pass: normalize all field names to lowercase
    for key, value in record.items():
        # Check if field needs explicit mapping
        if key in FIELD_NAME_MAPPING:
            normalized_key = FIELD_NAME_MAPPING[key]
        else:
            # Default: convert to lowercase
            normalized_key = key.lower()
        
        normalized[normalized_key] = value
    
    # Second pass: ensure data type consistency for problematic fields
    # Convert passenger_count to double if it exists
    if 'passenger_count' in normalized and normalized['passenger_count'] is not None:
        try:
            normalized['passenger_count'] = float(normalized['passenger_count'])
        except (ValueError, TypeError):
            normalized['passenger_count'] = None
    
    # Convert ratecodeid to double if it exists
    if 'ratecodeid' in normalized and normalized['ratecodeid'] is not None:
        try:
            normalized['ratecodeid'] = float(normalized['ratecodeid'])
        except (ValueError, TypeError):
            normalized['ratecodeid'] = None
    
    return normalized

print("=" * 80)
print("NORMALIZATION FUNCTION DEFINED")
print("=" * 80)
print("Function: normalize_record()")
print("  • Standardizes field names to lowercase")
print("  • Converts passenger_count to double")
print("  • Converts ratecodeid to double")
print("  • Handles null values gracefully")
print()
print("This is a MAP operation for RDD-based cleaning!")
print("=" * 80)

DEFINING UNIFIED SILVER LAYER SCHEMA

Target Schema Defined:
  Total fields: 24
  Original taxi fields: 19
  Bronze metadata fields: 5

Field Name Normalization Rules:
  Airport_fee          -> airport_fee
  RatecodeID           -> ratecodeid
  PULocationID         -> pulocationid
  DOLocationID         -> dolocationid
  VendorID             -> vendorid

NORMALIZATION FUNCTION DEFINED
Function: normalize_record()
  • Standardizes field names to lowercase
  • Converts passenger_count to double
  • Converts ratecodeid to double
  • Handles null values gracefully

This is a MAP operation for RDD-based cleaning!


In [4]:
# Define Data Quality Validation Functions (RDD MapReduce)

def validate_record_quality(record: Dict) -> Tuple[Dict, List[str]]:
    """
    MAP function: Validate a single record against business rules.
    
    Returns:
        Tuple of (record, list_of_quality_issues)
    """
    issues = []
    
    # 1. NULL VALUE CHECKS
    critical_fields = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 
                      'trip_distance', 'total_amount']
    for field in critical_fields:
        if record.get(field) is None:
            issues.append(f'null_{field}')
    
    # 2. BUSINESS LOGIC VALIDATIONS
    
    # Check: pickup before dropoff
    pickup = record.get('tpep_pickup_datetime')
    dropoff = record.get('tpep_dropoff_datetime')
    if pickup and dropoff:
        if dropoff <= pickup:
            issues.append('invalid_trip_duration')
    
    # Check: trip distance positive
    distance = record.get('trip_distance')
    if distance is not None:
        if distance <= 0:
            issues.append('invalid_trip_distance')
        elif distance > 100:  # Suspiciously long trip (>100 miles in NYC)
            issues.append('suspicious_trip_distance')
    
    # Check: passenger count reasonable
    passengers = record.get('passenger_count')
    if passengers is not None:
        if passengers <= 0:
            issues.append('invalid_passenger_count')
        elif passengers > 6:  # NYC taxis typically max 4-5, but allow up to 6
            issues.append('suspicious_passenger_count')
    
    # Check: fare amounts non-negative
    fare_fields = ['fare_amount', 'extra', 'mta_tax', 'tip_amount', 
                   'tolls_amount', 'improvement_surcharge', 'total_amount',
                   'congestion_surcharge', 'airport_fee']
    
    for field in fare_fields:
        value = record.get(field)
        if value is not None and value < 0:
            issues.append(f'negative_{field}')
    
    # Check: total amount consistency (basic check)
    total = record.get('total_amount')
    fare = record.get('fare_amount')
    if total is not None and fare is not None:
        if total > 0 and fare == 0:
            issues.append('suspicious_fare_structure')
        elif total > 1000:  # Very expensive ride
            issues.append('suspicious_total_amount')
    
    # Check: location IDs present
    if record.get('pulocationid') is None or record.get('dolocationid') is None:
        issues.append('missing_location_info')
    
    return (record, issues)


def extract_quality_issues(validation_result: Tuple[Dict, List[str]]) -> List[Tuple[str, int]]:
    """
    FLATMAP function: Extract individual quality issues from validation result.
    
    Returns:
        List of (issue_name, count=1) tuples for aggregation
    """
    record, issues = validation_result
    if issues:
        return [(issue, 1) for issue in issues]
    else:
        return [('no_issues', 1)]


print("=" * 80)
print("DATA QUALITY VALIDATION FUNCTIONS DEFINED")
print("=" * 80)
print()
print("Functions for RDD MapReduce pipeline:")
print("-" * 80)
print("1. validate_record_quality()")
print("   • MAP operation")
print("   • Returns: (record, list_of_issues)")
print("   • Checks:")
print("     - Null values in critical fields")
print("     - Pickup before dropoff datetime")
print("     - Positive trip distances")
print("     - Reasonable passenger counts")
print("     - Non-negative fare amounts")
print("     - Total amount consistency")
print("     - Location information present")
print()
print("2. extract_quality_issues()")
print("   • FLATMAP operation") 
print("   • Extracts individual issues for counting")
print("   • Returns: [(issue_name, 1), ...]")
print()
print("=" * 80)

DATA QUALITY VALIDATION FUNCTIONS DEFINED

Functions for RDD MapReduce pipeline:
--------------------------------------------------------------------------------
1. validate_record_quality()
   • MAP operation
   • Returns: (record, list_of_issues)
   • Checks:
     - Null values in critical fields
     - Pickup before dropoff datetime
     - Positive trip distances
     - Reasonable passenger counts
     - Non-negative fare amounts
     - Total amount consistency
     - Location information present

2. extract_quality_issues()
   • FLATMAP operation
   • Extracts individual issues for counting
   • Returns: [(issue_name, 1), ...]



In [5]:
# Define Silver Layer Cleaning Functions

def should_remove_record(issues: List[str]) -> bool:
    """
    FILTER function: Determine if record should be removed (True) or kept (False).
    
    Remove records with critical data quality issues that make them unusable.
    """
    critical_issues = {
        'null_tpep_pickup_datetime',
        'null_tpep_dropoff_datetime',
        'null_trip_distance',
        'null_total_amount',
        'invalid_trip_duration',
        'invalid_trip_distance',
        'invalid_passenger_count'
    }
    
    # Remove if any critical issue present
    return any(issue in critical_issues for issue in issues)


def create_silver_record(validation_result: Tuple[Dict, List[str]]) -> Dict:
    """
    MAP function: Create Silver layer record with cleaning metadata.
    
    Adds Silver-specific fields tracking the cleaning process.
    """
    record, issues = validation_result
    
    # Create Silver record (copy of normalized record)
    silver_record = record.copy()
    
    # Add Silver layer metadata
    silver_record['_silver_cleaning_timestamp'] = datetime.now().isoformat()
    silver_record['_silver_quality_issues'] = ','.join(issues) if issues else None
    silver_record['_silver_status'] = 'flagged' if (issues and issues != ['no_issues']) else 'clean'
    
    # Count of issues (excluding 'no_issues')
    issue_count = len([i for i in issues if i != 'no_issues'])
    silver_record['_silver_issue_count'] = issue_count
    
    return silver_record


print("=" * 80)
print("SILVER LAYER CLEANING FUNCTIONS DEFINED")
print("=" * 80)
print()
print("Functions for RDD cleaning pipeline:")
print("-" * 80)
print()
print("1. should_remove_record(issues)")
print("   • FILTER predicate function")
print("   • Returns: True if record should be removed")
print("   • Critical issues that cause removal:")
print("     - Null critical timestamps or amounts")
print("     - Invalid trip duration (dropoff before pickup)")
print("     - Invalid trip distance (≤ 0)")
print("     - Invalid passenger count (≤ 0)")
print()
print("2. create_silver_record(validation_result)")
print("   • MAP transformation")
print("   • Adds Silver metadata:")
print("     - _silver_cleaning_timestamp")
print("     - _silver_quality_issues")
print("     - _silver_status (clean/flagged)")
print("     - _silver_issue_count")
print()
print("=" * 80)
print("Cleaning Strategy:")
print("  • Remove ~2-3% of records with critical issues")
print("  • Keep ~97-98% including flagged records for transparency")
print("  • Silver layer will have 28 fields (24 Bronze + 4 Silver metadata)")
print("=" * 80)

SILVER LAYER CLEANING FUNCTIONS DEFINED

Functions for RDD cleaning pipeline:
--------------------------------------------------------------------------------

1. should_remove_record(issues)
   • FILTER predicate function
   • Returns: True if record should be removed
   • Critical issues that cause removal:
     - Null critical timestamps or amounts
     - Invalid trip duration (dropoff before pickup)
     - Invalid trip distance (≤ 0)
     - Invalid passenger count (≤ 0)

2. create_silver_record(validation_result)
   • MAP transformation
   • Adds Silver metadata:
     - _silver_cleaning_timestamp
     - _silver_quality_issues
     - _silver_status (clean/flagged)
     - _silver_issue_count

Cleaning Strategy:
  • Remove ~2-3% of records with critical issues
  • Keep ~97-98% including flagged records for transparency
  • Silver layer will have 28 fields (24 Bronze + 4 Silver metadata)


In [None]:
# Load Enrichment Lookup Data
import csv

# 1. Load Taxi Zone Lookup Data
taxi_zone_lookup_path = "/home/ubuntu/dat535-2025-group10/Metadata/taxi_zone_lookup.csv"
taxi_zones = {}

with open(taxi_zone_lookup_path, 'r') as f:
    reader = csv.DictReader(f)
    for row in reader:
        location_id = int(row['LocationID'])
        taxi_zones[location_id] = {
            'borough': row['Borough'],
            'zone': row['Zone'],
            'service_zone': row['service_zone']
        }

print("=" * 80)
print("ENRICHMENT LOOKUP DATA LOADED")
print("=" * 80)
print()
print(f"1. Taxi Zone Lookup: {len(taxi_zones)} zones loaded")
print("   Sample zones:")
for zone_id in [1, 132, 138, 264, 265]:
    if zone_id in taxi_zones:
        info = taxi_zones[zone_id]
        print(f"   • ID {zone_id}: {info['zone']}, {info['borough']} ({info['service_zone']})")
print()

# 2. Vendor ID Mapping (from TLC data dictionary)
vendor_mapping = {
    1: "Creative Mobile Technologies",
    2: "Curb Mobility",
    6: "Myle Technologies",
    7: "Helix"
}
print(f"2. Vendor Mapping: {len(vendor_mapping)} vendors")
for vid, name in vendor_mapping.items():
    print(f"   • {vid}: {name}")
print()

# 3. Rate Code Mapping (from TLC data dictionary)
ratecode_mapping = {
    1: "Standard rate",
    2: "JFK",
    3: "Newark",
    4: "Nassau or Westchester",
    5: "Negotiated fare",
    6: "Group ride",
    99: "Unknown"
}
print(f"3. Rate Code Mapping: {len(ratecode_mapping)} rate codes")
for rid, desc in ratecode_mapping.items():
    print(f"   • {rid}: {desc}")
print()

# 4. Payment Type Mapping (from TLC data dictionary)
payment_mapping = {
    0: "Flex Fare",
    1: "Credit card",
    2: "Cash",
    3: "No charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}
print(f"4. Payment Type Mapping: {len(payment_mapping)} payment types")
for pid, method in payment_mapping.items():
    print(f"   • {pid}: {method}")
print()

print("=" * 80)
print("✓ All lookup data ready for enrichment")
print("=" * 80)

ENRICHMENT LOOKUP DATA LOADED

1. Taxi Zone Lookup: 265 zones loaded
   Sample zones:
   • ID 1: Newark Airport, EWR (EWR)
   • ID 132: JFK Airport, Queens (Airports)
   • ID 138: LaGuardia Airport, Queens (Airports)
   • ID 264: N/A, Unknown (N/A)
   • ID 265: Outside of NYC, N/A (N/A)

2. Vendor Mapping: 4 vendors
   • 1: Creative Mobile Technologies
   • 2: Curb Mobility
   • 6: Myle Technologies
   • 7: Helix

3. Rate Code Mapping: 7 rate codes
   • 1: Standard rate
   • 2: JFK
   • 3: Newark
   • 4: Nassau or Westchester
   • 5: Negotiated fare
   • 6: Group ride
   • 99: Unknown

4. Payment Type Mapping: 7 payment types
   • 0: Flex Fare
   • 1: Credit card
   • 2: Cash
   • 3: No charge
   • 4: Dispute
   • 5: Unknown
   • 6: Voided trip

✓ All lookup data ready for enrichment


In [7]:
# Define Data Enrichment Function

def enrich_record(record):
    """
    MAP operation: Enrich record with human-readable descriptions for ID fields.
    
    Adds 9 new fields:
    - pickup_borough, pickup_zone, pickup_service_zone
    - dropoff_borough, dropoff_zone, dropoff_service_zone
    - vendor_name
    - rate_description
    - payment_method
    
    Keeps original ID fields for joins in Gold layer.
    """
    enriched = record.copy()
    
    # Enrich Pickup Location
    pu_location_id = record.get('pulocationid')
    if pu_location_id and pu_location_id in taxi_zones:
        zone_info = taxi_zones[pu_location_id]
        enriched['pickup_borough'] = zone_info['borough']
        enriched['pickup_zone'] = zone_info['zone']
        enriched['pickup_service_zone'] = zone_info['service_zone']
    else:
        enriched['pickup_borough'] = 'Unknown'
        enriched['pickup_zone'] = 'Unknown'
        enriched['pickup_service_zone'] = 'Unknown'
    
    # Enrich Dropoff Location
    do_location_id = record.get('dolocationid')
    if do_location_id and do_location_id in taxi_zones:
        zone_info = taxi_zones[do_location_id]
        enriched['dropoff_borough'] = zone_info['borough']
        enriched['dropoff_zone'] = zone_info['zone']
        enriched['dropoff_service_zone'] = zone_info['service_zone']
    else:
        enriched['dropoff_borough'] = 'Unknown'
        enriched['dropoff_zone'] = 'Unknown'
        enriched['dropoff_service_zone'] = 'Unknown'
    
    # Enrich Vendor ID
    vendor_id = record.get('vendorid')
    enriched['vendor_name'] = vendor_mapping.get(vendor_id, 'Unknown')
    
    # Enrich Rate Code
    rate_code = record.get('ratecodeid')
    if rate_code is not None:
        enriched['rate_description'] = ratecode_mapping.get(int(rate_code), 'Unknown')
    else:
        enriched['rate_description'] = 'Unknown'
    
    # Enrich Payment Type
    payment_type = record.get('payment_type')
    if payment_type is not None:
        enriched['payment_method'] = payment_mapping.get(int(payment_type), 'Unknown')
    else:
        enriched['payment_method'] = 'Unknown'
    
    return enriched


print("=" * 80)
print("DATA ENRICHMENT FUNCTION DEFINED")
print("=" * 80)
print()
print("Function: enrich_record(record)")
print("  • Type: MAP operation")
print("  • Input: Normalized record dictionary")
print("  • Output: Record with 9 additional enrichment fields")
print()
print("Enrichment Fields Added:")
print("-" * 80)
print("  Location Enrichment (6 fields):")
print("    • pickup_borough, pickup_zone, pickup_service_zone")
print("    • dropoff_borough, dropoff_zone, dropoff_service_zone")
print()
print("  ID Enrichment (3 fields):")
print("    • vendor_name (from vendorid)")
print("    • rate_description (from ratecodeid)")
print("    • payment_method (from payment_type)")
print()
print("Strategy:")
print("  • Keep original ID fields (needed for joins)")
print("  • Add human-readable descriptions alongside IDs")
print("  • Handle missing/unknown IDs gracefully with 'Unknown'")
print()
print("=" * 80)
print("Total Silver Schema: 19 original + 9 enrichment + 5 Bronze + 4 Silver = 37 fields")
print("=" * 80)

DATA ENRICHMENT FUNCTION DEFINED

Function: enrich_record(record)
  • Type: MAP operation
  • Input: Normalized record dictionary
  • Output: Record with 9 additional enrichment fields

Enrichment Fields Added:
--------------------------------------------------------------------------------
  Location Enrichment (6 fields):
    • pickup_borough, pickup_zone, pickup_service_zone
    • dropoff_borough, dropoff_zone, dropoff_service_zone

  ID Enrichment (3 fields):
    • vendor_name (from vendorid)
    • rate_description (from ratecodeid)
    • payment_method (from payment_type)

Strategy:
  • Keep original ID fields (needed for joins)
  • Add human-readable descriptions alongside IDs
  • Handle missing/unknown IDs gracefully with 'Unknown'

Total Silver Schema: 19 original + 9 enrichment + 5 Bronze + 4 Silver = 37 fields


In [None]:
# Batch Processing: Clean All 24 Files
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType

bronze_path = "/home/ubuntu/dat535-2025-group10/bronze_layer"
silver_output_path = "/home/ubuntu/dat535-2025-group10/silver_layer"

# Create output directory
if not os.path.exists(silver_output_path):
    os.makedirs(silver_output_path)
    print(f"✓ Created Silver layer output directory: {silver_output_path}")
else:
    print(f"✓ Using existing Silver layer directory: {silver_output_path}")

print()
print("=" * 80)
print("BATCH PROCESSING - ALL 24 FILES")
print("=" * 80)
print(f"Source: {bronze_path}")
print(f"Target: {silver_output_path}")
print()

# Get all Bronze files
bronze_dirs = sorted([d for d in os.listdir(bronze_path) if os.path.isdir(os.path.join(bronze_path, d))])
print(f"Files to process: {len(bronze_dirs)}")
print()

# Define explicit schema for Silver DataFrame (37 fields: 19 original + 9 enrichment + 5 Bronze + 4 Silver)
silver_schema = StructType([
    # Original taxi fields (19)
    StructField("vendorid", LongType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", DoubleType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("ratecodeid", DoubleType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("pulocationid", LongType(), True),
    StructField("dolocationid", LongType(), True),
    StructField("payment_type", LongType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),
    StructField("airport_fee", DoubleType(), True),
    # Enrichment fields (9)
    StructField("pickup_borough", StringType(), True),
    StructField("pickup_zone", StringType(), True),
    StructField("pickup_service_zone", StringType(), True),
    StructField("dropoff_borough", StringType(), True),
    StructField("dropoff_zone", StringType(), True),
    StructField("dropoff_service_zone", StringType(), True),
    StructField("vendor_name", StringType(), True),
    StructField("rate_description", StringType(), True),
    StructField("payment_method", StringType(), True),
    # Bronze metadata (5)
    StructField("_bronze_ingestion_timestamp", StringType(), True),
    StructField("_bronze_source_file", StringType(), True),
    StructField("_bronze_record_id", StringType(), True),
    StructField("_bronze_status", StringType(), True),
    StructField("_bronze_quality_flags", StringType(), True),
    # Silver metadata (4)
    StructField("_silver_cleaning_timestamp", StringType(), True),
    StructField("_silver_quality_issues", StringType(), True),
    StructField("_silver_status", StringType(), True),
    StructField("_silver_issue_count", LongType(), True)
])

# Track overall statistics
overall_stats = {
    'files_processed': 0,
    'total_input_records': 0,
    'total_removed_records': 0,
    'total_output_records': 0,
    'total_clean_records': 0,
    'total_flagged_records': 0,
    'processing_times': [],
    'file_stats': []
}

print("Processing files (RDD MapReduce pipeline):")
print("-" * 80)

# CLUSTER-OPTIMIZED: Load ALL files at once and let Spark distribute work
overall_start_time = time.time()

# Load ALL Bronze files - process each file to RDD then union (avoids schema conflicts)
# Spark will distribute the union RDD across the cluster
bronze_file_paths = [os.path.join(bronze_path, d) for d in bronze_dirs]
print(f"Loading all {len(bronze_file_paths)} files in parallel across cluster...")
print("  Strategy: Union RDDs to avoid schema merge conflicts")

# Load each file separately and convert to RDD immediately (bypasses schema merging)
# Then union all RDDs - Spark will parallelize this across cluster
rdds = []
for bronze_path_file in bronze_file_paths:
    df = spark.read.parquet(bronze_path_file)
    rdd = df.rdd.map(lambda row: row.asDict())
    rdds.append(rdd)

# Union all RDDs into single distributed RDD
# sc.union() distributes work across all cluster workers
rdd_all_bronze = sc.union(rdds)

# Repartition for constrained resource allocation (16 partitions = 2x allocated cores)
rdd_all_bronze = rdd_all_bronze.repartition(16)

print("✓ All files loaded into single RDD")
print(f"  Partitions: {rdd_all_bronze.getNumPartitions()}")
print()

# Get input count
input_count = rdd_all_bronze.count()
print(f"Total input records: {input_count:,}")
print()

# RDD PROCESSING PIPELINE - All 24 files processed in parallel
# Each transformation distributed across 8 ALLOCATED cores (50% of physical 16 cores)
print("Executing RDD pipeline with constrained resources...")
print("  Step 1/5: Normalizing field names and types...")
rdd_normalized = rdd_all_bronze.map(normalize_record)

print("  Step 2/5: Enriching with lookup data...")
rdd_enriched = rdd_normalized.map(enrich_record)

print("  Step 3/5: Validating data quality...")
rdd_validated = rdd_enriched.map(validate_record_quality)

print("  Step 4/5: Filtering critical issues...")
rdd_filtered = rdd_validated.filter(lambda x: not should_remove_record(x[1]))

print("  Step 5/5: Creating Silver records...")
rdd_silver = rdd_filtered.map(create_silver_record)

# No caching or repartitioning - avoid shuffle overhead with large dataset
# Pipeline will execute in single pass during write operation

print()
print("Pipeline ready - writing to Silver layer (this triggers execution)...")
print()

# Helper function to convert dict to tuple (37 fields)
def dict_to_tuple(d):
    return (
        # Original 19 fields
        d.get('vendorid'), d.get('tpep_pickup_datetime'), d.get('tpep_dropoff_datetime'),
        d.get('passenger_count'), d.get('trip_distance'), d.get('ratecodeid'),
        d.get('store_and_fwd_flag'), d.get('pulocationid'), d.get('dolocationid'),
        d.get('payment_type'), d.get('fare_amount'), d.get('extra'),
        d.get('mta_tax'), d.get('tip_amount'), d.get('tolls_amount'),
        d.get('improvement_surcharge'), d.get('total_amount'), d.get('congestion_surcharge'),
        d.get('airport_fee'),
        # Enrichment 9 fields
        d.get('pickup_borough'), d.get('pickup_zone'), d.get('pickup_service_zone'),
        d.get('dropoff_borough'), d.get('dropoff_zone'), d.get('dropoff_service_zone'),
        d.get('vendor_name'), d.get('rate_description'), d.get('payment_method'),
        # Bronze 5 fields
        d.get('_bronze_ingestion_timestamp'), d.get('_bronze_source_file'),
        d.get('_bronze_record_id'), d.get('_bronze_status'), d.get('_bronze_quality_flags'),
        # Silver 4 fields
        d.get('_silver_cleaning_timestamp'), d.get('_silver_quality_issues'),
        d.get('_silver_status'), d.get('_silver_issue_count')
    )

# Convert to DataFrame and write - Spark will handle partitioning by source file
print("Writing Silver layer data...")
rdd_tuples = rdd_silver.map(dict_to_tuple)
silver_df = spark.createDataFrame(rdd_tuples, schema=silver_schema)

# Write as single unified Silver layer with coalescing for efficiency
silver_df.coalesce(8).write.mode('overwrite').parquet(silver_output_path)

write_time = time.time() - overall_start_time

print(f"✓ Silver layer written to: {silver_output_path}")
print()
print("=" * 80)
print("BATCH PROCESSING COMPLETE!")
print("=" * 80)
print(f"Total processing time: {write_time:.1f}s")
print(f"All {len(bronze_dirs)} files processed in parallel")
print(f"Input records: {input_count:,}")
print(f"Cluster 3 (Constrained): 4 workers, 8 allocated cores (50%), 12 GB allocated (44%)")
print(f"Throughput: {input_count/write_time:,.0f} records/second")
print(f"Resource efficiency: {(input_count/write_time)/8:,.0f} records/sec/core")
print("=" * 80)

✓ Using existing Silver layer directory: /home/ubuntu/project2/silver_layer

BATCH PROCESSING - ALL 24 FILES
Source: /home/ubuntu/project2/bronze_layer
Target: /home/ubuntu/project2/silver_layer

Files to process: 24

Processing files (RDD MapReduce pipeline):
--------------------------------------------------------------------------------
Loading all 24 files in parallel across cluster...
  Strategy: Union RDDs to avoid schema merge conflicts


                                                                                

✓ All files loaded into single RDD
  Partitions: 16



                                                                                

Total input records: 79,479,946

Executing RDD pipeline with constrained resources...
  Step 1/5: Normalizing field names and types...
  Step 2/5: Enriching with lookup data...
  Step 3/5: Validating data quality...
  Step 4/5: Filtering critical issues...
  Step 5/5: Creating Silver records...

Pipeline ready - writing to Silver layer (this triggers execution)...

Writing Silver layer data...


25/11/25 10:20:03 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

✓ Silver layer written to: /home/ubuntu/project2/silver_layer

BATCH PROCESSING COMPLETE!
Total processing time: 1975.5s
All 24 files processed in parallel
Input records: 79,479,946
Cluster 3 (Constrained): 4 workers, 8 allocated cores (50%), 12 GB allocated (44%)
Throughput: 40,233 records/second
Resource efficiency: 5,029 records/sec/core


                                                                                