# Weather ML Feature Engineering

## Objective
Create comprehensive ML features from raw weather data for temperature prediction.

This notebook:
1. Loads weather data from the `weather_current` table
2. Creates time-based features (hour, day of week, month, etc.)
3. Creates lag features (previous observations)
4. Creates rolling window statistics
5. Creates interaction features
6. Saves engineered features to `weather_features` table

**Output Table:** `weather_features`
**Target Variable:** `temperature`
**Total Features:** 25+ engineered features

In [None]:
# Import Required Libraries
import logging

from pyspark.sql import Window
from pyspark.sql.functions import (
    avg,
    col,
    count,
    dayofweek,
    dayofyear,
    hour,
    lag,
    month,
    stddev,
    unix_timestamp,
    when,
)
from pyspark.sql.types import DoubleType, IntegerType

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("✓ Libraries imported successfully")

## Section 1: Load and Explore Weather Data

Load raw weather data from the `weather_current` table and examine its structure.

In [None]:
# Load weather data
try:
    df = spark.table("weather_current")
    logger.info("✓ Loaded weather_current table")
except Exception as e:
    logger.error(f"Failed to load table: {e}")
    raise

# Display schema
print("Data Schema:")
df.printSchema()

# Display basic statistics
print(f"\nTotal Records: {df.count()}")
print(f"Columns: {len(df.columns)}")

# Display sample data
print("\nSample Data:")
df.show(5, truncate=False)

# Check for missing values
print("\nMissing Values:")
missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
missing_counts.show(truncate=False)

## Section 2: Create Time-Based Features

Extract temporal features from timestamp (hour, day of week, month, etc.)

In [None]:
# Create time-based features
df_features = df.withColumn(
    "timestamp_unix",
    unix_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss")
).withColumn(
    "hour",
    hour(col("timestamp")).cast(IntegerType())
).withColumn(
    "day_of_week",
    dayofweek(col("timestamp")).cast(IntegerType())
).withColumn(
    "day_of_year",
    dayofyear(col("timestamp")).cast(IntegerType())
).withColumn(
    "month",
    month(col("timestamp")).cast(IntegerType())
)

print("✓ Created time-based features:")
print("  - timestamp_unix")
print("  - hour (0-23)")
print("  - day_of_week (1-7)")
print("  - day_of_year (1-365)")
print("  - month (1-12)")

## Section 3: Create Lag Features

Create features based on previous observations (1, 3, 6, 12 lags)

In [None]:
# Create window for lag features (partitioned by city, ordered by timestamp)
window_spec = Window.partitionBy("city").orderBy("timestamp")

# Create lag features for multiple metrics
lag_values = [1, 3, 6, 12]

for lag_val in lag_values:
    df_features = df_features.withColumn(
        f"temperature_lag_{lag_val}",
        lag("temperature", lag_val).over(window_spec).cast(DoubleType())
    ).withColumn(
        f"humidity_lag_{lag_val}",
        lag("humidity", lag_val).over(window_spec).cast(DoubleType())
    ).withColumn(
        f"pressure_lag_{lag_val}",
        lag("pressure", lag_val).over(window_spec).cast(DoubleType())
    )

print("✓ Created lag features:")
for lag_val in lag_values:
    print(f"  - Lag {lag_val}: temperature_lag_{lag_val}, humidity_lag_{lag_val}, pressure_lag_{lag_val}")

## Section 4: Create Rolling Window Features

Create statistics over rolling windows (3, 6, 12 hour windows)

In [None]:
# Create rolling window features
window_sizes = [3, 6, 12]

for window_size in window_sizes:
    # Create window spec for rolling calculations (by time)
    window_spec_rolling = Window.partitionBy("city").orderBy(
        "timestamp_unix"
    ).rangeBetween(
        -(window_size * 3600 - 1), 0  # Window size in seconds
    )
    
    # Create rolling mean and std for temperature
    df_features = df_features.withColumn(
        f"temperature_rolling_mean_{window_size}",
        avg("temperature").over(window_spec_rolling).cast(DoubleType())
    ).withColumn(
        f"temperature_rolling_std_{window_size}",
        stddev("temperature").over(window_spec_rolling).cast(DoubleType())
    )
    
    # Create rolling mean for humidity and pressure
    df_features = df_features.withColumn(
        f"humidity_rolling_mean_{window_size}",
        avg("humidity").over(window_spec_rolling).cast(DoubleType())
    ).withColumn(
        f"pressure_rolling_mean_{window_size}",
        avg("pressure").over(window_spec_rolling).cast(DoubleType())
    )

print("✓ Created rolling window features:")
for window_size in window_sizes:
    print(f"  - Window {window_size}h: mean & std temperature, mean humidity & pressure")

## Section 5: Create Interaction Features

Create derived features from combinations of base features

In [None]:
# Create interaction features
df_features = df_features.withColumn(
    "temp_humidity_interaction",
    (col("temperature") * col("humidity") / 100).cast(DoubleType())
).withColumn(
    "cloud_visibility_ratio",
    (col("cloudiness") / (col("visibility") + 0.1)).cast(DoubleType())
).withColumn(
    "pressure_humidity_interaction",
    (col("pressure") * col("humidity") / 1000).cast(DoubleType())
)

print("✓ Created interaction features:")
print("  - temp_humidity_interaction: temperature × (humidity / 100)")
print("  - cloud_visibility_ratio: cloudiness / (visibility + 0.1)")
print("  - pressure_humidity_interaction: pressure × (humidity / 1000)")

## Section 6: Handle Missing Values

Drop rows with null values (necessary after creating lag and rolling features)

In [None]:
# Handle missing values
records_before = df_features.count()
df_features = df_features.dropna()
records_after = df_features.count()

print("✓ Dropped null values:")
print(f"  Records before: {records_before}")
print(f"  Records after:  {records_after}")
print(f"  Records removed: {records_before - records_after} ({100*(records_before-records_after)/records_before:.1f}%)")

## Section 7: Feature Summary

Display the engineered features and save to table

In [None]:
# Display feature summary
print("=" * 70)
print("FEATURE ENGINEERING SUMMARY")
print("=" * 70)

print("\nFinal Dataset:")
print(f"  Total Records: {df_features.count():,}")
print(f"  Total Features: {len(df_features.columns)}")
print("  Target Variable: temperature")

print("\nFeature Categories:")
print("  Time Features: hour, day_of_week, day_of_year, month, timestamp_unix")
print("  Base Features: humidity, pressure, wind_speed, visibility, cloudiness")
print("  Lag Features: 12 features (temperature, humidity, pressure × 4 lags)")
print("  Rolling Features: 12 features (temperature mean/std, humidity/pressure mean × 3 windows)")
print("  Interaction Features: 3 features (interactions)")

print("\nData Distribution by City:")
df_features.groupBy("city").count().show(truncate=False)

print("\nSample Features:")
display_cols = ["city", "timestamp", "temperature", "humidity", "pressure", "hour", "day_of_week"]
display_cols = [c for c in display_cols if c in df_features.columns]
df_features.select(*display_cols).show(5, truncate=False)

## Section 8: Save to Delta Lake

Save the engineered features to the `weather_features` table

In [None]:
# Save features to Delta Lake
print("Saving engineered features to weather_features table...")

try:
    df_features.write \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable("weather_features")
    
    print("✓ Successfully saved to weather_features table")
    
    # Verify saved data
    verify_df = spark.table("weather_features")
    print("\nVerification:")
    print(f"  Rows saved: {verify_df.count():,}")
    print(f"  Columns saved: {len(verify_df.columns)}")
    
except Exception as e:
    logger.error(f"Failed to save table: {e}")
    raise

print("\n" + "=" * 70)
print("FEATURE ENGINEERING COMPLETED SUCCESSFULLY")
print("=" * 70)
print("\nNext Steps:")
print("1. Train temperature prediction model (train_model.py)")
print("2. Analyze predictions (analyze_predictions.py)")
print("=" * 70)