# Part 2: Data Cleaning with MapReduce (Silver Layer)

## Overview
This notebook implements the **Silver Layer** of the Medallion Architecture, transforming raw data from the Bronze layer into cleaned, standardized, and validated data using **MapReduce/RDD operations exclusively**.

## CRITICAL REQUIREMENT
**This part MUST use MapReduce/RDD operations ONLY. NO SQL, NO DataFrames (except final conversion for inspection).**

## Objectives
1. **Data Parsing & Validation**: Parse raw JSON Lines and validate data quality using MapReduce patterns
2. **Data Cleaning**: Standardize formats, normalize values, and filter invalid records
3. **Deduplication**: Remove duplicate records using MapReduce `reduceByKey` operations
4. **Data Quality Validation**: Comprehensive quality checks using MapReduce aggregations
5. **Performance Profiling**: Measure and analyze execution times and throughput
6. **Optimization & Tuning**: Identify bottlenecks and apply optimization strategies

## Medallion Architecture: Silver Layer
- **Input**: Raw JSON Lines from Bronze layer (`Part1_Data_Ingestion/raw_data/`)
- **Processing**: MapReduce-based cleaning, validation, and deduplication
- **Output**: Cleaned data in JSON Lines format (`silver_data/`) and DataFrame for inspection


In [1]:
# Setup and Imports
import time
import json
import re
from typing import Dict, Any, Optional, Tuple
from datetime import datetime  # <-- added

# ---- Spark/JDK wiring (Notebook-only) ----
import os, sys, subprocess, shutil

# Point Spark to Java 17
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["PATH"] = os.path.join(os.environ["JAVA_HOME"], "bin") + os.pathsep + os.environ.get("PATH", "")

# Make sure no conflicting env is set
os.environ.pop("SPARK_HOME", None)
os.environ.pop("PYSPARK_SUBMIT_ARGS", None)

# Ensure pyspark is available in THIS kernel
try:
    import pyspark  # noqa
except ModuleNotFoundError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "--user", "pyspark==3.5.1"])
    import pyspark  # noqa

from pyspark.sql import SparkSession

# Initialize Spark Session with memory configuration
# Note: Memory settings are optional
spark = (
    SparkSession.builder
        .appName("Part2-DataCleaning-MapReduce")
        .master("local[*]")
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.executor.memory", "2g")
        .config("spark.driver.memory", "2g")
        .config("spark.executor.memoryFraction", "0.8")
        .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("WARN")

print("java at:", shutil.which("java"))
print("JAVA_HOME:", os.environ["JAVA_HOME"])
print("Spark version:", spark.version, "Default parallelism:", sc.defaultParallelism)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/24 20:59:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


java at: /usr/lib/jvm/java-17-openjdk-amd64/bin/java
JAVA_HOME: /usr/lib/jvm/java-17-openjdk-amd64
Spark version: 4.0.1 Default parallelism: 4


## Step 1: Load Raw Data from Bronze Layer

Load the raw JSON Lines data ingested in Part 1. This data contains energy consumption records with ingestion metadata.


In [2]:
# Load raw data from Bronze layer
bronze_data_path = "../Part1_Data_Ingestion/raw_data/"

# Load JSON lines (exclude metadata.json)
raw_rdd = sc.textFile(f"{bronze_data_path}/*.jsonl").repartition(64)

print("Loaded raw data")
print("Initial partition count:", raw_rdd.getNumPartitions())
print("Sample records (first 3):")
for i, line in enumerate(raw_rdd.take(3), 1):
    print(f"  Record {i}: {line[:150]}...")

Loaded raw data
Initial partition count: 64
Sample records (first 3):


[Stage 1:>                                                          (0 + 1) / 1]

  Record 1: {"datetime": "2023-11-01 08:00:00", "Usage (kW)": "0.33", "city": "Islamabad", "source_file": "islamabad_House41.csv", "_ingestion_metadata": {"ingest...
  Record 2: {"datetime": "2023-11-01 08:01:00", "Usage (kW)": "0.01", "city": "Islamabad", "source_file": "islamabad_House41.csv", "_ingestion_metadata": {"ingest...
  Record 3: {"datetime": "2023-11-01 08:02:00", "Usage (kW)": "0.1", "city": "Islamabad", "source_file": "islamabad_House41.csv", "_ingestion_metadata": {"ingesti...


                                                                                

## Step 2: Data Parsing and Validation (Map Phase)

Parse raw JSON Lines records and validate data quality using MapReduce operations. This step includes:
- **JSON Parsing**: Extract structured data from JSON Lines format
- **Datetime Normalization**: Standardize timestamps to 'YYYY-MM-DD HH:MM:SS' format
- **Type Conversion**: Convert consumption values from string to float
- **Range Validation**: Ensure usage values are within valid range (0-50 kW)
- **Field Standardization**: Trim and normalize text fields (city, source_file)
- **Error Handling**: Categorize records as valid, invalid, or parse errors


In [3]:
# Define parsing function for REWDP JSON Lines -> returns {'status', 'data', 'errors'}
def _normalize_dt(dt_val: Any) -> Optional[str]:
    """Return 'YYYY-MM-DD HH:MM:SS' or None."""
    if dt_val is None:
        return None
    s = str(dt_val).strip()
    # try full "YYYY-MM-DD HH:MM:SS"
    try:
        if len(s) > 10:
            return datetime.strptime(s[:19], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S")
        # date-only
        return datetime.strptime(s[:10], "%Y-%m-%d").strftime("%Y-%m-%d 00:00:00")
    except ValueError:
        # Try ISO variants
        try:
            return datetime.fromisoformat(s.replace("Z", "+00:00")[:19]).strftime("%Y-%m-%d %H:%M:%S")
        except Exception:
            return None

def parse_and_validate(record: str) -> Dict[str, Any]:
    """
    Parse and validate raw record from Bronze layer (JSON Lines format).
    
    Returns:
        Dictionary with:
        - 'status': 'valid', 'invalid', 'parse_error', 'error', or 'skip'
        - 'data': For valid records, tuple of (timestamp_str, usage_kw_float, city_str, source_file_str)
                  For invalid/error records, None
        - 'errors': List of error messages (empty for valid records)
    """
    try:
        data = json.loads(record)

        # Skip pure metadata records
        if '_ingestion_metadata' in data and len(data) == 1:
            return {'data': None, 'status': 'skip', 'errors': ['Metadata record']}

        # Required fields
        dt_raw = data.get('datetime') or data.get('timestamp')
        dt_norm = _normalize_dt(dt_raw)
        if not dt_norm:
            return {'data': None, 'status': 'invalid', 'errors': [f"Invalid/missing datetime: {dt_raw}"]}

        # Consumption field (string or number)
        consumption_field = None
        for field in ['Usage (kW)', 'Usage', 'consumption']:
            if field in data:
                consumption_field = field
                break
        if not consumption_field:
            return {'data': None, 'status': 'invalid', 'errors': ['Missing consumption field']}

        try:
            usage = float(data[consumption_field])
        except (ValueError, TypeError):
            return {'data': None, 'status': 'invalid', 'errors': [f"Consumption not numeric: {data.get(consumption_field)}"]}

        if usage < 0 or usage > 50:
            return {'data': None, 'status': 'invalid', 'errors': [f"Invalid consumption: {usage}"]}

        # Light payload (tuple) for valid rows
        city = (data.get('city') or '').strip()
        src  = (data.get('source_file') or '').strip()
        return {'data': (dt_norm, usage, city, src), 'status': 'valid', 'errors': []}

    except json.JSONDecodeError as e:
        return {'data': None, 'status': 'parse_error', 'errors': [str(e)]}
    except Exception as e:
        return {'data': None, 'status': 'error', 'errors': [str(e)]}


from pyspark import StorageLevel
from operator import add

# Map phase
parsed_rdd = raw_rdd.map(parse_and_validate)

# Spill to disk if needed (avoid huge memory pressure)
parsed_rdd = parsed_rdd.persist(StorageLevel.MEMORY_AND_DISK)

# Single pass status aggregation (Map -> ReduceByKey)
status_counts = (parsed_rdd
    .map(lambda x: (x['status'], 1))
    .reduceByKey(add)
    .collectAsMap())

total_count  = sum(status_counts.values())
valid_count  = status_counts.get('valid', 0)
invalid_count = status_counts.get('invalid', 0)
skip_count    = status_counts.get('skip', 0)
error_count   = status_counts.get('parse_error', 0) + status_counts.get('error', 0)

print("Parsing complete")
print(f"Total records: {total_count}")
print(f"Valid records: {valid_count} ({(valid_count/total_count*100 if total_count else 0):.1f}%)")
print(f"Invalid records: {invalid_count} ({(invalid_count/total_count*100 if total_count else 0):.1f}%)")
print(f"Skipped records: {skip_count}")
print(f"Error records: {error_count}")

# Display sample invalid records for validation
if invalid_count > 0:
    print("\nSample invalid records:")
    invalid_samples = parsed_rdd.filter(lambda x: x['status']=='invalid').take(2)
    for i, sample in enumerate(invalid_samples, 1):
        print(f"  Invalid record {i}:")
        print(f"    Errors: {sample['errors']}")


# Extract valid records as lightweight tuples for further processing
clean_rdd = (parsed_rdd
    .filter(lambda x: x['status'] == 'valid')
    .map(lambda x: x['data']))

print("✓ Cleaned RDD created")
print(f"Valid records available: {clean_rdd.count():,}")

                                                                                

Parsing complete
Total records: 35734531
Valid records: 33332692 (93.3%)
Invalid records: 2401839 (6.7%)
Skipped records: 0
Error records: 0

Sample invalid records:
  Invalid record 1:
    Errors: ['Consumption not numeric: NA']
  Invalid record 2:
    Errors: ['Consumption not numeric: NA']
✓ Cleaned RDD created




Valid records available: 33,332,692


                                                                                

In [4]:
# Additional validation: Check for parse errors or exceptions
error_count = parsed_rdd.filter(lambda x: x['status'] in ['parse_error', 'error']).count()
if error_count > 0:
    print("=== Error Record Analysis ===")
    error_samples = parsed_rdd.filter(lambda x: x['status'] in ['parse_error', 'error']).take(3)
    for i, sample in enumerate(error_samples, 1):
        print(f"\nError record {i}:")
        print(f"  Status: {sample['status']}")
        print(f"  Errors: {sample['errors']}")
        if 'raw' in sample.get('data', {}):
            raw_data = sample['data']['raw']
            print(f"  Raw data (first 200 chars): {str(raw_data)[:200]}")
        elif isinstance(sample.get('data'), dict):
            print(f"  Data keys: {list(sample['data'].keys())}")
            print(f"  Data sample: {str(sample['data'])[:200]}")
else:
    print("✓ No parse errors or exceptions found")




✓ No parse errors or exceptions found


                                                                                

## Step 3: Extract Cleaned Data

Extract valid records from parsed data and convert to lightweight tuples for efficient processing. The cleaned data structure is:
- **Format**: `(datetime_str, usage_kw_float, city_str, source_file_str)`
- **Purpose**: Lightweight tuple format optimized for MapReduce operations


In [5]:
# Display sample cleaned data and verify structure

print("=== Sample Cleaned Data ===")
print("Data format: (datetime_str, usage_kw_float, city_str, source_file_str)")
sample_cleaned = clean_rdd.take(5)
for i, rec in enumerate(sample_cleaned, 1):
    print(f"  Record {i}: {rec}")
    
print(f"\nTotal valid records available: {clean_rdd.count():,}")

=== Sample Cleaned Data ===
Data format: (datetime_str, usage_kw_float, city_str, source_file_str)
  Record 1: ('2023-11-01 08:00:00', 0.33, 'Islamabad', 'islamabad_House41.csv')
  Record 2: ('2023-11-01 08:01:00', 0.01, 'Islamabad', 'islamabad_House41.csv')
  Record 3: ('2023-11-01 08:02:00', 0.1, 'Islamabad', 'islamabad_House41.csv')
  Record 4: ('2023-11-01 08:03:00', 0.07, 'Islamabad', 'islamabad_House41.csv')
  Record 5: ('2023-11-01 08:04:00', 0.14, 'Islamabad', 'islamabad_House41.csv')





Total valid records available: 33,332,692


                                                                                

## Step 4: Deduplication (MapReduce Reduce Phase)

Remove duplicate records using MapReduce `reduceByKey` pattern. This is a classic Reduce operation:
- **Key Generation**: Extract unique key from each record
  - Primary: Household ID (from source_file) + datetime
  - Fallback: City + datetime if household ID unavailable
- **Reduce Operation**: Use `reduceByKey` to keep first occurrence when duplicates found
- **Result**: Deduplicated RDD with unique records only


In [6]:
# Deduplication using MapReduce pattern
import re

def extract_key_tuple(rec: Tuple[str, float, str, str]) -> str:
    """
    Extract a unique key for deduplication from a record tuple.
    
    Args:
        rec: Tuple of (datetime_str, usage_kw, city, source_file)
    
    Returns:
        Unique key string combining household ID (if available) or city with datetime
    """
    dt, _usage, city, src = rec

    # Extract household ID from source filename (e.g., "lahore_House41.csv" -> "H041")
    hh = None
    if src:
        m = re.search(r'House(\d+)', src, flags=re.IGNORECASE)
        if m:
            hh = f"H{m.group(1).zfill(3)}"

    # Use household ID if available, otherwise fall back to city name
    base = hh or (city.strip().lower() if city else "unknown")
    return f"{base}_{dt}"

# MapReduce deduplication: map records to (key, record) pairs, then reduceByKey to keep first occurrence
keyed_rdd = clean_rdd.map(lambda rec: (extract_key_tuple(rec), rec))

deduplicated_rdd = (keyed_rdd
    .reduceByKey(lambda a, b: a)  # Keep first occurrence when duplicates found
    .map(lambda kv: kv[1]))  # Extract record from (key, record) pair

# Cache deduplicated RDD for subsequent operations
deduplicated_rdd.cache()

# Calculate deduplication statistics
before_count = clean_rdd.count()
after_count = deduplicated_rdd.count()
duplicates_removed = before_count - after_count

print("Deduplication complete")
print(f"Records before deduplication: {before_count:,}")
print(f"Records after deduplication:  {after_count:,}")
print(f"Duplicates removed: {duplicates_removed:,} ({duplicates_removed/before_count*100:.2f}%)")



Deduplication complete
Records before deduplication: 33,332,692
Records after deduplication:  27,986,366
Duplicates removed: 5,346,326 (16.04%)


                                                                                

## Step 5: Data Quality Validation (MapReduce Aggregations)

Perform comprehensive data quality analysis using MapReduce aggregation patterns:
- **Completeness Analysis**: Count null/empty values per field using `map` + `reduce`
- **Range Validation**: Verify usage values are within valid bounds (0-50 kW)
- **Statistical Summary**: Calculate min, max, average using MapReduce operations
- **Distribution Analysis**: City distribution using `map` + `reduceByKey`
- **Quality Score**: Overall data quality metrics


In [7]:
# Data Quality Validation using MapReduce
from operator import add

print("=== Data Quality Validation Report ===\n")

# Get total record count
total_records = deduplicated_rdd.count()
print(f"Total records after deduplication: {total_records:,}")

# 1. Null/Empty Value Checks
print("\n1. Completeness Analysis (Null/Empty Values):")
def check_field_completeness(rdd, field_index: int, field_name: str):
    """Check if field at index is None or empty string"""
    null_or_empty = rdd.map(lambda rec: 1 if (len(rec) <= field_index or 
                                               rec[field_index] is None or 
                                               str(rec[field_index]).strip() == '') else 0) \
                           .reduce(add)
    return null_or_empty

datetime_nulls = check_field_completeness(deduplicated_rdd, 0, "datetime")
usage_nulls = check_field_completeness(deduplicated_rdd, 1, "usage_kw")
city_nulls = check_field_completeness(deduplicated_rdd, 2, "city")
source_nulls = check_field_completeness(deduplicated_rdd, 3, "source_file")

print(f"  datetime: {datetime_nulls:,} null/empty ({datetime_nulls/total_records*100:.2f}%)")
print(f"  usage_kw: {usage_nulls:,} null/empty ({usage_nulls/total_records*100:.2f}%)")
print(f"  city: {city_nulls:,} null/empty ({city_nulls/total_records*100:.2f}%)")
print(f"  source_file: {source_nulls:,} null/empty ({source_nulls/total_records*100:.2f}%)")

# 2. Range Validation
print("\n2. Range Validation:")
def count_out_of_range(rdd, min_val: float, max_val: float):
    """Count records where usage_kw is outside valid range"""
    out_of_range = rdd.map(lambda rec: 1 if (len(rec) > 1 and 
                                             (rec[1] < min_val or rec[1] > max_val)) else 0) \
                         .reduce(add)
    return out_of_range

out_of_range_count = count_out_of_range(deduplicated_rdd, 0.0, 50.0)
print(f"  usage_kw out of range (0-50 kW): {out_of_range_count:,} ({out_of_range_count/total_records*100:.2f}%)")

# 3. Statistical Summary
print("\n3. Statistical Summary (usage_kw):")
usage_values = deduplicated_rdd.map(lambda rec: rec[1] if len(rec) > 1 else 0.0)

# Calculate statistics using MapReduce
usage_count = usage_values.count()
usage_sum = usage_values.reduce(add)
usage_min = usage_values.reduce(lambda a, b: min(a, b))
usage_max = usage_values.reduce(lambda a, b: max(a, b))
usage_avg = usage_sum / usage_count if usage_count > 0 else 0.0

print(f"  Count: {usage_count:,}")
print(f"  Min: {usage_min:.3f} kW")
print(f"  Max: {usage_max:.3f} kW")
print(f"  Average: {usage_avg:.3f} kW")
print(f"  Total: {usage_sum:,.2f} kW")

# 4. City Distribution
print("\n4. City Distribution:")
city_counts = (deduplicated_rdd
    .map(lambda rec: (rec[2] if len(rec) > 2 else "unknown", 1))
    .reduceByKey(add)
    .collect())

city_counts_sorted = sorted(city_counts, key=lambda x: x[1], reverse=True)
print(f"  Total cities: {len(city_counts_sorted)}")
for city, count in city_counts_sorted[:10]:  # Top 10 cities
    print(f"    {city}: {count:,} records ({count/total_records*100:.2f}%)")

# 5. Data Completeness Score
print("\n5. Overall Data Quality Score:")
completeness_score = ((total_records - datetime_nulls - usage_nulls - city_nulls - source_nulls) / 
                     (total_records * 4) * 100) if total_records > 0 else 0
validity_score = ((total_records - out_of_range_count) / total_records * 100) if total_records > 0 else 0
overall_score = (completeness_score + validity_score) / 2

print(f"  Completeness: {completeness_score:.2f}%")
print(f"  Validity: {validity_score:.2f}%")
print(f"  Overall Quality Score: {overall_score:.2f}%")

print("\n✓ Data quality validation complete")


=== Data Quality Validation Report ===



                                                                                

Total records after deduplication: 27,986,366

1. Completeness Analysis (Null/Empty Values):


                                                                                

  datetime: 0 null/empty (0.00%)
  usage_kw: 0 null/empty (0.00%)
  city: 0 null/empty (0.00%)
  source_file: 0 null/empty (0.00%)

2. Range Validation:


                                                                                

  usage_kw out of range (0-50 kW): 0 (0.00%)

3. Statistical Summary (usage_kw):


                                                                                

  Count: 27,986,366
  Min: 0.000 kW
  Max: 18.898 kW
  Average: 0.729 kW
  Total: 20,389,153.58 kW

4. City Distribution:




  Total cities: 6
    Karachi: 5,222,447 records (18.66%)
    Multan: 5,195,056 records (18.56%)
    Peshawar: 5,022,626 records (17.95%)
    Islamabad: 4,938,929 records (17.65%)
    Lahore: 4,389,396 records (15.68%)
    Skardu: 3,217,912 records (11.50%)

5. Overall Data Quality Score:
  Completeness: 25.00%
  Validity: 100.00%
  Overall Quality Score: 62.50%

✓ Data quality validation complete


                                                                                

## Step 6: Silver Layer Output

Save cleaned data to Silver layer storage and create DataFrame for final validation:
- **Output Format**: JSON Lines (one JSON object per line)
- **Storage Location**: `silver_data/` directory
- **Rejected Records**: Save invalid records to `rejects/` for analysis
- **DataFrame Conversion**: Create DataFrame for inspection (allowed per requirements)


In [8]:
# Final Data Preparation: Convert to Silver Layer Format

# deduplicated_rdd contains: (datetime_str, usage_kw_float, city_str, source_file_str)
final_clean_rdd = deduplicated_rdd

# JSON line serializer for tuples
def tuple_to_json(rec: Tuple[str, float, str, str]) -> str:
    """Convert tuple to JSON string for Silver layer output"""
    dt_str, usage, city, src = rec
    return json.dumps({
        "datetime": dt_str,
        "usage_kw": float(usage),
        "city": city,
        "source_file": src
    })

# Output directories
import os, shutil
from urllib.parse import urlparse

def _remove_directory_if_exists(path: str):
    """Remove directory if it exists, handling both local paths and file:// URIs."""
    if path.startswith("file:"):
        path = urlparse(path).path
    if not os.path.isabs(path):
        path = os.path.abspath(path)
    shutil.rmtree(path, ignore_errors=True)

silver_out_dir = "../Part2_Data_Cleaning/silver_data"
rejects_out_dir = "../Part2_Data_Cleaning/rejects"

# Prepare output directories by removing existing content (RDD API requires empty directories)
_remove_directory_if_exists(silver_out_dir)
_remove_directory_if_exists(rejects_out_dir)

# Save cleaned data to Silver layer
record_count = final_clean_rdd.count()
print(f"=== Final Data Preparation ===")
print(f"Records to save: {record_count:,}")

if record_count > 0:
    # Save cleaned data
    (final_clean_rdd
        .map(tuple_to_json)
        .coalesce(8)
        .saveAsTextFile(silver_out_dir))
    
    print(f"✓ Saved cleaned data to: {silver_out_dir}")
    print(f"✓ Total records saved: {record_count:,}")
    
    # Save rejected records (for analysis)
    rejects_rdd = (parsed_rdd
        .filter(lambda x: x['status'] in ('invalid', 'parse_error', 'error'))
        .map(lambda x: json.dumps({
            "status": x['status'], 
            "errors": x['errors']
        })))
    
    reject_count = rejects_rdd.count()
    if reject_count > 0:
        (rejects_rdd
            .coalesce(2)
            .saveAsTextFile(rejects_out_dir))
        print(f"✓ Saved rejected records to: {rejects_out_dir}")
        print(f"✓ Total rejected records: {reject_count:,}")
    
    # Create DataFrame for final inspection and validation
    print("\n=== DataFrame View (Final Validation) ===")
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType
    
    schema = StructType([
        StructField("datetime", StringType(), True),
        StructField("usage_kw", DoubleType(), True),
        StructField("city", StringType(), True),
        StructField("source_file", StringType(), True),
    ])
    
    # Convert RDD to DataFrame
    df_ready = final_clean_rdd.map(lambda r: (r[0], float(r[1]), r[2], r[3]))
    final_df = spark.createDataFrame(df_ready, schema=schema)
    
    print("Sample data (first 10 rows):")
    final_df.show(10, truncate=False)
    print("\nSchema:")
    final_df.printSchema()
    print(f"\nTotal rows in DataFrame: {final_df.count():,}")
else:
    print("⚠️  No cleaned records to save!")


                                                                                

=== Final Data Preparation ===
Records to save: 27,986,366


                                                                                

✓ Saved cleaned data to: ../Part2_Data_Cleaning/silver_data
✓ Total records saved: 27,986,366


                                                                                

✓ Saved rejected records to: ../Part2_Data_Cleaning/rejects
✓ Total rejected records: 2,401,839

=== DataFrame View (Final Validation) ===
Sample data (first 10 rows):
+-------------------+--------+---------+---------------------+
|datetime           |usage_kw|city     |source_file          |
+-------------------+--------+---------+---------------------+
|2023-11-05 08:04:00|0.52    |Islamabad|islamabad_House41.csv|
|2023-11-12 10:48:00|0.61    |Islamabad|islamabad_House41.csv|
|2023-11-17 08:06:00|0.0     |Islamabad|islamabad_House41.csv|
|2023-11-17 08:09:00|0.0     |Islamabad|islamabad_House41.csv|
|2023-11-17 18:44:00|0.6     |Islamabad|islamabad_House41.csv|
|2023-11-24 00:09:00|0.94    |Islamabad|islamabad_House41.csv|
|2023-11-27 02:44:00|1.03    |Islamabad|islamabad_House41.csv|
|2023-11-29 08:09:00|1.25    |Islamabad|islamabad_House41.csv|
|2023-12-02 10:41:00|0.79    |Islamabad|islamabad_House41.csv|
|2023-12-02 10:47:00|1.15    |Islamabad|islamabad_House41.csv|
+------------




Total rows in DataFrame: 27,986,366


                                                                                

## Step 7: Performance Profiling and Analysis

Measure and analyze performance of MapReduce operations to identify bottlenecks:
- **Execution Time**: Time taken for each major operation (loading, parsing, deduplication, etc.)
- **Throughput Analysis**: Records processed per second for each stage
- **Partition Analysis**: Partition count and distribution across operations
- **Data Reduction Metrics**: Records removed at each stage
- **Resource Usage**: Memory and storage insights (via Spark UI)


In [9]:
# Performance Profiling and Analysis
import time

print("=== Performance Profiling ===")
print("Measuring execution time for each major operation...\n")

# Measure execution time for each major operation
operations = {
    '1. Data Loading': lambda: raw_rdd.count(),
    '2. Parsing & Validation': lambda: parsed_rdd.count(),
    '3. Filtering Valid Records': lambda: clean_rdd.count(),
    '4. Deduplication': lambda: deduplicated_rdd.count(),
    '5. Final Output': lambda: final_clean_rdd.count()
}

profiling_results = {}
for op_name, op_func in operations.items():
    start_time = time.time()
    result = op_func()
    elapsed = time.time() - start_time
    profiling_results[op_name] = {
        'time': elapsed,
        'records': result
    }
    print(f"{op_name}:")
    print(f"  Time: {elapsed:.3f} seconds")
    print(f"  Records: {result:,}")
    if result > 0:
        throughput = result / elapsed
        print(f"  Throughput: {throughput:,.0f} records/second")
    print()

# Calculate total processing time
total_time = sum(r['time'] for r in profiling_results.values())
print(f"Total Processing Time: {total_time:.3f} seconds ({total_time/60:.2f} minutes)")

# Throughput Summary
print("\n=== Throughput Summary ===")
for op_name, results in profiling_results.items():
    if results['records'] > 0:
        throughput = results['records'] / results['time']
        print(f"{op_name}: {throughput:,.0f} records/second")

# Partition Analysis
print("\n=== Partition Analysis ===")
print(f"Raw data partitions: {raw_rdd.getNumPartitions()}")
print(f"Parsed data partitions: {parsed_rdd.getNumPartitions()}")
print(f"Cleaned data partitions: {clean_rdd.getNumPartitions()}")
print(f"Deduplicated data partitions: {deduplicated_rdd.getNumPartitions()}")
print(f"Default parallelism: {sc.defaultParallelism}")
print(f"Optimal partitions (records/10k): {max(sc.defaultParallelism, final_clean_rdd.count() // 10000)}")

# Data Reduction Analysis
print("\n=== Data Reduction Analysis ===")
raw_count = profiling_results['1. Data Loading']['records']
final_count = profiling_results['5. Final Output']['records']
reduction_pct = ((raw_count - final_count) / raw_count * 100) if raw_count > 0 else 0
print(f"Raw records: {raw_count:,}")
print(f"Final cleaned records: {final_count:,}")
print(f"Records removed: {raw_count - final_count:,} ({reduction_pct:.2f}%)")
print(f"Data retention: {final_count/raw_count*100:.2f}%")

# Resource Usage
print("\n=== Resource Usage ===")
print("For detailed metrics, check Spark UI:")
print("  - Application UI: http://localhost:4040")
print("  - Jobs: http://localhost:4040/jobs/")
print("  - Stages: http://localhost:4040/stages/")
print("  - Storage: http://localhost:4040/storage/")
print("\nKey metrics to monitor:")
print("  - Shuffle read/write sizes")
print("  - Task execution times")
print("  - Memory usage")
print("  - Data skew indicators")

print("\nPerformance profiling complete")


=== Performance Profiling ===
Measuring execution time for each major operation...



                                                                                

1. Data Loading:
  Time: 13.194 seconds
  Records: 35,734,531
  Throughput: 2,708,433 records/second



                                                                                

2. Parsing & Validation:
  Time: 9.051 seconds
  Records: 35,734,531
  Throughput: 3,948,119 records/second



                                                                                

3. Filtering Valid Records:
  Time: 11.448 seconds
  Records: 33,332,692
  Throughput: 2,911,684 records/second



                                                                                

4. Deduplication:
  Time: 4.249 seconds
  Records: 27,986,366
  Throughput: 6,586,241 records/second



                                                                                

5. Final Output:
  Time: 4.189 seconds
  Records: 27,986,366
  Throughput: 6,681,001 records/second

Total Processing Time: 42.131 seconds (0.70 minutes)

=== Throughput Summary ===
1. Data Loading: 2,708,433 records/second
2. Parsing & Validation: 3,948,119 records/second
3. Filtering Valid Records: 2,911,684 records/second
4. Deduplication: 6,586,241 records/second
5. Final Output: 6,681,001 records/second

=== Partition Analysis ===
Raw data partitions: 64
Parsed data partitions: 64
Cleaned data partitions: 64
Deduplicated data partitions: 64
Default parallelism: 4




Optimal partitions (records/10k): 2798

=== Data Reduction Analysis ===
Raw records: 35,734,531
Final cleaned records: 27,986,366
Records removed: 7,748,165 (21.68%)
Data retention: 78.32%

=== Resource Usage ===
For detailed metrics, check Spark UI:
  - Application UI: http://localhost:4040
  - Jobs: http://localhost:4040/jobs/
  - Stages: http://localhost:4040/stages/
  - Storage: http://localhost:4040/storage/

Key metrics to monitor:
  - Shuffle read/write sizes
  - Task execution times
  - Memory usage
  - Data skew indicators

Performance profiling complete


                                                                                

## Step 8: Optimization Strategies and Tuning Recommendations

Document optimization strategies based on profiling results:
- **Partitioning Optimization**: Optimal partition count based on data size
- **Caching Strategy**: When and how to cache RDDs for reuse
- **Broadcast Variables**: Using broadcast for small lookup data
- **Data Skew Handling**: Techniques for handling uneven data distribution
- **Serialization**: KryoSerializer configuration and benefits
- **Memory Configuration**: Tuning memory settings for large datasets


In [10]:
# Optimization Strategies and Tuning Recommendations

print("=== Optimization Strategies ===")
print("\n1. Partitioning Optimization:")
current_partitions = final_clean_rdd.getNumPartitions()
optimal_partitions = max(sc.defaultParallelism * 2, final_clean_rdd.count() // 10000)
print(f"  Current partitions: {current_partitions}")
print(f"  Recommended partitions: {optimal_partitions}")
print(f"  Strategy: {'Repartition' if current_partitions < optimal_partitions else 'Coalesce'} to {optimal_partitions} partitions")
print("  Code: final_clean_rdd.repartition(optimal_partitions) or .coalesce(optimal_partitions)")

print("\n2. Caching Strategy:")
print("  - Cache RDDs that are used multiple times (e.g., parsed_rdd, clean_rdd)")
print("  - Use MEMORY_AND_DISK storage level for large datasets")
print("  - Unpersist cached RDDs when no longer needed")
print("  Code: rdd.persist(StorageLevel.MEMORY_AND_DISK)")

print("\n3. Broadcast Variables:")
print("  - Use for small lookup tables or reference data")
print("  - Reduces network overhead in map operations")
print("  Code: broadcast_var = sc.broadcast(lookup_dict)")

print("\n4. Data Skew Handling:")
print("  - Monitor task execution times in Spark UI")
print("  - If skew detected, use salting technique or repartition")
print("  - Consider custom partitioner for reduce operations")

print("\n5. Serialization:")
print("  - Current: KryoSerializer (configured)")
print("  - Benefits: Faster serialization, smaller data size")
print("  - Register custom classes if needed: spark.conf.set('spark.kryo.classesToRegister', ...)")

print("\n6. Memory Configuration:")
print("  - Monitor memory usage in Spark UI")
print("  - Adjust spark.executor.memory if needed")
print("  - Consider spark.memory.fraction and spark.memory.storageFraction")

print("\n=== Tuning Parameters Applied ===")
print("✓ spark.sql.adaptive.enabled = true")
print("✓ spark.sql.adaptive.coalescePartitions.enabled = true")
print("✓ spark.serializer = KryoSerializer")
print("✓ Log level set to WARN (reduced verbosity)")

print("\n=== Performance Bottlenecks Identified ===")
# Analyze profiling results
slowest_op = max(profiling_results.items(), key=lambda x: x[1]['time'])
print(f"Slowest operation: {slowest_op[0]} ({slowest_op[1]['time']:.3f}s)")
print(f"Recommendation: Focus optimization efforts on {slowest_op[0]}")

# Check for data skew (if partitions vary significantly)
if final_clean_rdd.getNumPartitions() > sc.defaultParallelism * 4:
    print("\n⚠️  High partition count detected - consider coalescing")
elif final_clean_rdd.getNumPartitions() < sc.defaultParallelism:
    print("\n⚠️  Low partition count detected - consider repartitioning")

print("\n=== Next Steps for Optimization ===")
print("1. Run with optimized partition count and measure improvement")
print("2. Add caching for frequently accessed RDDs")
print("3. Monitor Spark UI during execution")
print("4. Adjust memory settings if out-of-memory errors occur")
print("5. Consider checkpointing for long lineage chains")

print("\n✓ Optimization analysis complete")


=== Optimization Strategies ===

1. Partitioning Optimization:




  Current partitions: 64
  Recommended partitions: 2798
  Strategy: Repartition to 2798 partitions
  Code: final_clean_rdd.repartition(optimal_partitions) or .coalesce(optimal_partitions)

2. Caching Strategy:
  - Cache RDDs that are used multiple times (e.g., parsed_rdd, clean_rdd)
  - Use MEMORY_AND_DISK storage level for large datasets
  - Unpersist cached RDDs when no longer needed
  Code: rdd.persist(StorageLevel.MEMORY_AND_DISK)

3. Broadcast Variables:
  - Use for small lookup tables or reference data
  - Reduces network overhead in map operations
  Code: broadcast_var = sc.broadcast(lookup_dict)

4. Data Skew Handling:
  - Monitor task execution times in Spark UI
  - If skew detected, use salting technique or repartition
  - Consider custom partitioner for reduce operations

5. Serialization:
  - Current: KryoSerializer (configured)
  - Benefits: Faster serialization, smaller data size
  - Register custom classes if needed: spark.conf.set('spark.kryo.classesToRegister', ...)

6

                                                                                

In [11]:
# Cleanup
parsed_rdd.unpersist()
clean_rdd.unpersist()
deduplicated_rdd.unpersist()

spark.stop()
print("✓ Spark session stopped")


✓ Spark session stopped
