# Day 2: Data Quality & Cleaning Pipeline
# Smart City IoT Analytics Pipeline

"""
üéØ LEARNING OBJECTIVES:
- Implement comprehensive data quality assessment
- Design cleaning procedures for IoT sensor data
- Handle missing values and outliers appropriately
- Create reusable data quality functions

üìÖ SCHEDULE:
Morning (4 hours):
1. Data Quality Assessment (2 hours)
2. Missing Data Strategy (2 hours)

Afternoon (4 hours):
3. Outlier Detection & Treatment (2 hours)
4. Data Standardization (2 hours)

‚úÖ DELIVERABLES:
- Data quality assessment report
- Comprehensive cleaning pipeline
- Outlier detection and treatment functions
- Standardized datasets ready for analysis
"""

# =============================================================================
# IMPORTS AND SETUP
# =============================================================================

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Machine learning imports (for outlier detection)
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.stat import Correlation

In [2]:
# Initialize Spark Session (should already exist from Day 1)
try:
    spark.sparkContext.setLogLevel("WARN")
    print("‚úÖ Using existing Spark session")
except:
    spark = (SparkSession.builder
             .appName("SmartCityIoTPipeline-Day2")
             .master("local[*]")
             .config("spark.driver.memory", "4g")
             .config("spark.sql.adaptive.enabled", "true")
             .getOrCreate())
    print("‚úÖ Created new Spark session")

print("üîß Day 2: Data Quality & Cleaning Pipeline")
print("=" * 60)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/04 21:12:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/04 21:12:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


‚úÖ Created new Spark session
üîß Day 2: Data Quality & Cleaning Pipeline


# =============================================================================
# SECTION 1: COMPREHENSIVE DATA PROFILING (Morning - 2 hours)
# =============================================================================

In [3]:
print("\nüìä SECTION 1: COMPREHENSIVE DATA PROFILING")
print("=" * 60)


üìä SECTION 1: COMPREHENSIVE DATA PROFILING


In [4]:
# Load cleaned data from Day 1 (or reload if needed)
data_dir = "../data/raw"

# TODO 1.1: Load all datasets with error handling
def load_all_datasets():
    """Load all sensor datasets with consistent error handling"""
    datasets = {}
    
    try:
        # Load each dataset
        datasets['zones'] = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{data_dir}/city_zones.csv")
        datasets['traffic'] = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{data_dir}/traffic_sensors.csv")
        datasets['air_quality'] = spark.read.json(f"{data_dir}/air_quality.json")
        datasets['weather'] = spark.read.parquet(f"{data_dir}/weather_data.parquet")
        datasets['energy'] = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{data_dir}/energy_meters.csv")
        
        # Convert timestamp columns to proper format
        for name, df in datasets.items():
            if name != 'zones' and 'timestamp' in df.columns:
                datasets[name] = df.withColumn("timestamp", F.to_timestamp(F.col("timestamp")))
        
        print("‚úÖ All datasets loaded successfully")
        return datasets
        
    except Exception as e:
        print(f"‚ùå Error loading datasets: {str(e)}")
        return {}

datasets = load_all_datasets()

‚úÖ All datasets loaded successfully


# =============================================================================
# TODO 1.2: Advanced Data Quality Metrics (45 minutes)
# =============================================================================
"""
üéØ TASK: Create comprehensive data quality profiling functions
üí° HINT: Look beyond basic missing values - consider temporal patterns, distributions
üìö CONCEPTS: Data profiling, quality metrics, statistical validation
"""

In [9]:
def comprehensive_data_profile(df, dataset_name, time_col="timestamp"):
    """
    Generate comprehensive data quality profile
    
    Args:
        df: Spark DataFrame to profile
        dataset_name: Name for reporting
        time_col: Timestamp column name (can be None for non-time series data)
    
    Returns:
        Dictionary with quality metrics
    """
    print(f"\nüîç Comprehensive Profile: {dataset_name}")
    print("-" * 50)
    
    # Basic statistics
    total_rows = df.count()
    total_cols = len(df.columns)
    
    profile = {
        'dataset_name': dataset_name,
        'total_rows': total_rows,
        'total_columns': total_cols,
        'memory_usage_mb': 0,  # Estimate
        'quality_issues': []
    }
    
    # TODO: Calculate missing value patterns
    print("üìã Missing Value Analysis:")
    missing_analysis = {}
    for col in df.columns:
        missing_count = df.filter(F.col(col).isNull()).count()
        missing_pct = (missing_count / total_rows) * 100 if total_rows > 0 else 0
        missing_analysis[col] = {'count': missing_count, 'percentage': missing_pct}
        
        if missing_pct > 5:  # Flag columns with >5% missing
            profile['quality_issues'].append(f"High missing values in {col}: {missing_pct:.2f}%")
        
        if missing_count > 0:
            print(f"   {col}: {missing_count:,} ({missing_pct:.2f}%)")
    
    profile['missing_analysis'] = missing_analysis
    
    # TODO: Temporal data gaps (if timestamp column exists and is not None)
    if time_col and time_col in df.columns:
        print("‚è∞ Temporal Analysis:")
        
        # Get time range
        time_stats = df.agg(
            F.min(time_col).alias('min_time'),
            F.max(time_col).alias('max_time'),
            F.count(time_col).alias('time_count')
        ).collect()[0]
        
        print(f"   Time Range: {time_stats['min_time']} to {time_stats['max_time']}")
        print(f"   Records with timestamps: {time_stats['time_count']:,}")
        
        # TODO: Check for temporal gaps
        # Calculate expected vs actual record counts
        if dataset_name == 'traffic':
            expected_interval_minutes = 5
        elif dataset_name == 'air_quality':
            expected_interval_minutes = 15
        elif dataset_name == 'weather':
            expected_interval_minutes = 30
        elif dataset_name == 'energy':
            expected_interval_minutes = 10
        else:
            expected_interval_minutes = 15
        
        print(f"   Expected interval: {expected_interval_minutes} minutes")
        
        # TODO: Data freshness (for time series data)
        latest_record = df.agg(F.max(time_col).alias('latest')).collect()[0]['latest']
        if latest_record:
            from datetime import datetime
            hours_old = (datetime.now() - latest_record).total_seconds() / 3600
            print(f"üìÖ Data Freshness: Latest record is {hours_old:.1f} hours old")
    
    # TODO: Numeric column distributions
    numeric_cols = [field.name for field in df.schema.fields 
                   if field.dataType in [IntegerType(), DoubleType(), FloatType(), LongType()]]
    
    if numeric_cols:
        print("üìà Numeric Column Analysis:")
        # Get basic statistics for numeric columns
        stats_df = df.select(numeric_cols).describe()
        stats_df.show()
        
        # TODO: Check for suspicious patterns in numeric data
        for col in numeric_cols:
            if col not in ['location_lat', 'location_lon']:
                # Check for columns with very low variance (potentially stuck sensors)
                variance_check = df.agg(F.variance(col).alias('variance')).collect()[0]['variance']
                if variance_check is not None and variance_check < 0.001:
                    profile['quality_issues'].append(f"Very low variance in {col}: {variance_check}")
    
    # TODO: Categorical column analysis
    categorical_cols = [field.name for field in df.schema.fields 
                       if field.dataType == StringType() and field.name not in [time_col] if time_col]
    
    if categorical_cols:
        print("üìÇ Categorical Column Analysis:")
        for col in categorical_cols:
            distinct_count = df.select(col).distinct().count()
            print(f"   {col}: {distinct_count} distinct values")
            
            # Show top values
            if distinct_count < 20:
                top_values = df.groupBy(col).count().orderBy(F.desc("count")).limit(5)
                print(f"      Top values:")
                top_values.show(5, truncate=False)
    
    # TODO: Check for duplicate records
    duplicate_count = total_rows - df.dropDuplicates().count()
    if duplicate_count > 0:
        profile['quality_issues'].append(f"Duplicate records found: {duplicate_count}")
        print(f"üîÑ Duplicate Records: {duplicate_count:,}")
    
    return profile

      

# TODO: Profile all datasets
print("üîç Starting comprehensive data profiling...")

profiles = {}
for name, df in datasets.items():
    if df is not None:
        try:
            # Check if the dataset has a timestamp column
            if name != 'zones' and 'timestamp' in df.columns:
                profiles[name] = comprehensive_data_profile(df, name, time_col="timestamp")
            else:
                # For datasets without timestamp column (like zones), don't pass time_col
                profiles[name] = comprehensive_data_profile(df, name, time_col=None)
        except Exception as e:
            print(f"‚ùå Error profiling {name}: {str(e)}")


üîç Starting comprehensive data profiling...

üîç Comprehensive Profile: zones
--------------------------------------------------
üìã Missing Value Analysis:
üìà Numeric Column Analysis:
+-------+--------------------+--------------------+-------------------+-------------------+------------------+
|summary|             lat_min|             lat_max|            lon_min|            lon_max|        population|
+-------+--------------------+--------------------+-------------------+-------------------+------------------+
|  count|                   8|                   8|                  8|                  8|                 8|
|   mean|  40.730000000000004|             40.7525| -73.99125000000001|          -73.97125|           21250.0|
| stddev|0.023904572186687328|0.028157719063465373|0.02474873734153055|0.02474873734153458|14260.334598358582|
|    min|                40.7|               40.72|             -74.02|              -74.0|              5000|
|    max|               40.76|  

                                                                                

‚è∞ Temporal Analysis:
   Time Range: 2025-08-28 19:13:45.641653 to 2025-09-04 19:13:45.641653
   Records with timestamps: 100,850
   Expected interval: 5 minutes
üìÖ Data Freshness: Latest record is 13.3 hours old
üìà Numeric Column Analysis:
+-------+--------------------+-------------------+------------------+------------------+
|summary|        location_lat|       location_lon|     vehicle_count|         avg_speed|
+-------+--------------------+-------------------+------------------+------------------+
|  count|              100850|             100850|            100850|            100850|
|   mean|   40.75011790265806| -73.95663481997272|22.197045116509667|45.601101580647274|
| stddev|0.027328503495008007|0.03287764405886609|13.561857125784131| 17.04167685328942|
|    min|  40.700157401736284| -74.01290766686316|                 0|               5.0|
|    max|  40.798402010459405| -73.90082117344016|                91| 116.4255545170856|
+-------+--------------------+------------

                                                                                


üîç Comprehensive Profile: air_quality
--------------------------------------------------
üìã Missing Value Analysis:
‚ùå Error profiling air_quality: [UNSUPPORTED_FEATURE.QUERY_ONLY_CORRUPT_RECORD_COLUMN] The feature is not supported: Queries from raw JSON/CSV/XML files are disallowed when the
referenced columns only include the internal corrupt record column
(named `_corrupt_record` by default). For example:
`spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()`
and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`.
Instead, you can cache or save the parsed results and then send the same query.
For example, `val df = spark.read.schema(schema).json(file).cache()` and then
`df.filter($"_corrupt_record".isNotNull).count()`. SQLSTATE: 0A000

üîç Comprehensive Profile: weather
--------------------------------------------------
üìã Missing Value Analysis:
‚è∞ Temporal Analysis:
   Time Range: 2025-08-28 19:13:45.641653 to 2025-09-04 1

                                                                                

‚è∞ Temporal Analysis:


                                                                                

   Time Range: 2025-08-28 19:13:45.641653 to 2025-09-04 19:13:45.641653
   Records with timestamps: 201,800
   Expected interval: 10 minutes
üìÖ Data Freshness: Latest record is 13.3 hours old
üìà Numeric Column Analysis:


                                                                                

+-------+--------------------+-------------------+------------------+------------------+------------------+--------------------+
|summary|        location_lat|       location_lon| power_consumption|           voltage|           current|        power_factor|
+-------+--------------------+-------------------+------------------+------------------+------------------+--------------------+
|  count|              201800|             201800|            201800|            201800|            201800|              201800|
|   mean|   40.75114696746992| -73.95570226085518|17.661501426137587|239.99110505236544| 73.62718756676854|  0.9000784069428297|
| stddev|0.029149474776993135|0.03696918385133131| 18.75357389149128| 5.010000846318629| 78.21712082409343|0.028898400050630813|
|    min|   40.70003316266749| -74.01817156010618|1.6800237743084105| 218.3401346513567| 6.660334472337805|   0.850000401033654|
|    max|   40.79986752880212| -73.90007563085238| 64.99993909041741|  264.016592785443|288.69565

                                                                                

   meter_id: 200 distinct values
   building_type: 5 distinct values
      Top values:
+-------------+-----+
|building_type|count|
+-------------+-----+
|residential  |48432|
|commercial   |40360|
|retail       |40360|
|office       |36324|
|industrial   |36324|
+-------------+-----+



                                                                                

# =============================================================================
# TODO 1.3: Sensor Health Analysis (45 minutes)
# =============================================================================

"""
üéØ TASK: Identify sensors with potential operational issues
üí° HINT: Look for patterns that indicate sensor malfunctions
üìö CONCEPTS: Sensor diagnostics, operational monitoring, health scoring
"""

In [6]:
def analyze_sensor_health(df, sensor_id_col, value_cols, time_col="timestamp"):
    """
    Analyze individual sensor health and identify problematic sensors
    
    Args:
        df: DataFrame with sensor data
        sensor_id_col: Column name for sensor ID
        value_cols: List of measurement columns to analyze
        time_col: Timestamp column
    
    Returns:
        DataFrame with sensor health metrics
    """
    print(f"\nüè• Sensor Health Analysis")
    print("-" * 30)
    
    # TODO: Calculate health metrics per sensor
    window_spec = Window.partitionBy(sensor_id_col)
    
    health_metrics = df.groupBy(sensor_id_col).agg(
        F.count("*").alias("total_readings"),
        F.min(time_col).alias("first_reading"),
        F.max(time_col).alias("last_reading")
    )
    
    # TODO: Add missing data percentage per sensor
    for col in value_cols:
        missing_col_name = f"{col}_missing_pct"
        health_metrics = health_metrics.withColumn(
            missing_col_name,
            # TODO: Calculate percentage of missing values for this sensor and column
            # HINT: Use window functions to calculate missing percentage per sensor
        )
    
    # TODO: Add variance analysis (detect stuck sensors)
    for col in value_cols:
        if col in df.columns:
            variance_col_name = f"{col}_variance"
            sensor_variance = df.groupBy(sensor_id_col).agg(
                F.variance(col).alias(variance_col_name)
            )
            health_metrics = health_metrics.join(sensor_variance, sensor_id_col, "left")
    
    # TODO: Calculate data gaps (irregular reporting)
    # This is more complex - calculate time differences between consecutive readings
    
    # TODO: Create overall health score
    # Combine multiple factors into a single health score (0-100)
    health_metrics = health_metrics.withColumn(
        "health_score",
        # TODO: Create a formula that combines:
        # - Missing data percentage (lower is better)
        # - Data variance (too low = stuck sensor, too high = noisy sensor)
        # - Reporting regularity
        # - Recent data availability
        F.lit(100.0)  # Placeholder - implement your scoring logic
    )
    
    # TODO: Flag problematic sensors
    health_metrics = health_metrics.withColumn(
        "status",
        F.when(F.col("health_score") > 80, "healthy")
         .when(F.col("health_score") > 60, "warning")
         .otherwise("critical")
    )
    
    return health_metrics

# TODO: Analyze health for each sensor type
print("üè• Analyzing sensor health across all datasets...")

sensor_health_results = {}

# Traffic sensors
if 'traffic' in datasets:
    traffic_health = analyze_sensor_health(
        datasets['traffic'], 
        'sensor_id', 
        ['vehicle_count', 'avg_speed']
    )
    sensor_health_results['traffic'] = traffic_health
    
    print("üöó Traffic Sensor Health Summary:")
    traffic_health.groupBy("status").count().show()

# TODO: Analyze other sensor types
# Air quality sensors
if 'air_quality' in datasets:
    # TODO: Implement air quality sensor health analysis
    pass

# Weather sensors  
if 'weather' in datasets:
    # TODO: Implement weather sensor health analysis
    pass

# Energy sensors
if 'energy' in datasets:
    # TODO: Implement energy sensor health analysis
    pass


üè• Analyzing sensor health across all datasets...

üè• Sensor Health Analysis
------------------------------


TypeError: DataFrame.withColumn() missing 1 required positional argument: 'col'