<a href="https://colab.research.google.com/github/edvandervegt/baselWeather/blob/main/IWBInterviewPreparation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [41]:
# Install necessary packages
!pip install pyspark
!pip install requests
!pip install pandas
!pip install matplotlib seaborn
!pip install pyarrow  # For Parquet support
!pip install fastavro  # For Avro support
!pip install plotly   # For interactive visualizations

# Import required libraries
import requests
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import time
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from google.colab import drive
from google.colab import userdata


# Mount Google Drive for persistent storage
drive.mount('/content/drive')

# Create a directory for your project
project_dir = "/content/drive/MyDrive/basel_weather_analysis"
if os.path.exists(project_dir):
    print(f"Using existing directory: {project_dir}")
else:
    !mkdir -p {project_dir}
    print(f"Created new directory: {project_dir}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Using existing directory: /content/drive/MyDrive/basel_weather_analysis


In [42]:
# Initialize PySpark session
spark = SparkSession.builder \
    .appName("Basel Weather Data Engineering") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

# API configuration using the Timeline API endpoint
API_KEY = userdata.get('API_Key')
LOCATION = "Basel,Switzerland"

# Calculate date range (3 months of hourly data)
end_date = datetime(2025, 4, 14)  # Current date
start_date = datetime(2025, 1, 1)  # January 1st
print(f"Fetching weather data for Basel from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")

# Function to fetch weather data for a specific date range
def fetch_weather_data_chunk(location, start_date, end_date, api_key, max_retries=3):
    """
    Fetch historical weather data using Visual Crossing Timeline API
    for a specific date range chunk
    """
    # Construct the URL for the Timeline API
    url = f"https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{location}/{start_date}/{end_date}"

    params = {
        "key": api_key,
        "unitGroup": "metric",  # Use metric units
        "include": "hours",     # Include hourly data
        "contentType": "json"   # Request JSON format
    }

    for attempt in range(max_retries):
        try:
            print(f"Sending API request to: {url}")
            response = requests.get(url, params=params)

            if response.status_code == 200:
                print("API request successful!")
                return response.json()
            elif response.status_code == 429:  # Rate limit exceeded
                wait_time = (2 ** attempt) * 30  # Exponential backoff
                print(f"Rate limit exceeded. Waiting {wait_time} seconds before retry.")
                time.sleep(wait_time)
            else:
                print(f"Error fetching data: {response.status_code}, {response.text}")
                if attempt < max_retries - 1:
                    wait_time = (2 ** attempt) * 5
                    print(f"Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Maximum retry attempts reached.")
                    return None
        except Exception as e:
            print(f"Exception occurred: {str(e)}")
            if attempt < max_retries - 1:
                wait_time = (2 ** attempt) * 5
                print(f"Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print("Maximum retry attempts reached.")
                return None

    return None

# Function to process all data chunks and combine them
def fetch_all_weather_data(location, start_date, end_date, api_key):
    """
    Fetch all weather data by breaking the request into smaller chunks
    that fit within the API limits (1000 records per request)
    """
    # Each chunk will be 30 days (720 hourly records) to stay well under the 1000 limit
    chunk_size = timedelta(days=30)
    current_date = start_date
    all_data = []

    while current_date < end_date:
        # Calculate the end date for this chunk
        chunk_end = min(current_date + chunk_size, end_date)

        # Format dates for API call
        chunk_start_str = current_date.strftime('%Y-%m-%d')
        chunk_end_str = chunk_end.strftime('%Y-%m-%d')

        # Create a cache file path for this specific chunk
        chunk_cache_file = f"/content/drive/MyDrive/basel_weather_analysis/basel_weather_cache_{chunk_start_str}_to_{chunk_end_str}.json"

        # Check if cached data exists for this chunk
        if os.path.exists(chunk_cache_file):
            print(f"Loading cached weather data for {chunk_start_str} to {chunk_end_str}")
            with open(chunk_cache_file, 'r') as f:
                chunk_data = json.load(f)
        else:
            # Fetch data from API for this chunk
            print(f"Fetching chunk from {chunk_start_str} to {chunk_end_str}")
            chunk_data = fetch_weather_data_chunk(location, chunk_start_str, chunk_end_str, api_key)

            # Cache the chunk results
            if chunk_data:
                print(f"Saving chunk data to cache: {chunk_cache_file}")
                with open(chunk_cache_file, 'w') as f:
                    json.dump(chunk_data, f)

        # Add successful chunk to our collection
        if chunk_data:
            all_data.append(chunk_data)
        else:
            print(f"Failed to fetch data for {chunk_start_str} to {chunk_end_str}")

        # Move to the next chunk (add 1 day to avoid overlap)
        current_date = chunk_end + timedelta(days=1)

        # Add a delay between chunks to avoid rate limiting
        if current_date < end_date:
            print("Waiting 5 seconds before fetching next chunk...")
            time.sleep(5)

    return all_data

# Fetch all weather data in chunks
all_weather_chunks = fetch_all_weather_data(LOCATION, start_date, end_date, API_KEY)

# Process the collected data chunks
if all_weather_chunks and len(all_weather_chunks) > 0:
    # Combine all chunks into a single dataset
    print("Processing and combining all data chunks...")
    hourly_data = []

    for chunk in all_weather_chunks:
        # Extract location information (should be the same for all chunks)
        location_info = {
            "address": chunk.get("address", ""),
            "latitude": chunk.get("latitude", 0),
            "longitude": chunk.get("longitude", 0),
            "timezone": chunk.get("timezone", ""),
            "tzoffset": chunk.get("tzoffset", 0)
        }

        # Extract hourly data from each day in this chunk
        for day in chunk.get("days", []):
            date = day.get("datetime", "")
            for hour in day.get("hours", []):
                # Combine day and hour data
                hour_data = hour.copy()
                hour_data["date"] = date

                # Add location information
                for key, value in location_info.items():
                    hour_data[key] = value

                hourly_data.append(hour_data)

    # Create a combined cache file for the complete dataset
    combined_cache_file = f"/content/drive/MyDrive/basel_weather_analysis/basel_weather_cache_combined_{start_date.strftime('%Y-%m-%d')}_to_{end_date.strftime('%Y-%m-%d')}.json"

    # Save the combined data
    print(f"Saving combined data to: {combined_cache_file}")
    with open(combined_cache_file, 'w') as f:
        json.dump({"hourly_data": hourly_data}, f)

    # Convert to Pandas DataFrame
    weather_df = pd.DataFrame(hourly_data)
    print(f"Successfully collected {len(hourly_data)} hourly weather records")
    print(weather_df.head())
else:
    print("Failed to fetch any weather data. Please check your API key and parameters.")

Spark version: 3.5.5
Fetching weather data for Basel from 2025-01-01 to 2025-04-14
Loading cached weather data for 2025-01-01 to 2025-01-31
Waiting 5 seconds before fetching next chunk...
Loading cached weather data for 2025-02-01 to 2025-03-03
Waiting 5 seconds before fetching next chunk...
Loading cached weather data for 2025-03-04 to 2025-04-03
Waiting 5 seconds before fetching next chunk...
Loading cached weather data for 2025-04-04 to 2025-04-14
Processing and combining all data chunks...
Saving combined data to: /content/drive/MyDrive/basel_weather_analysis/basel_weather_cache_combined_2025-01-01_to_2025-04-14.json
Successfully collected 2495 hourly weather records
   datetime  datetimeEpoch  temp  feelslike  humidity  dew  precip  \
0  00:00:00     1735686000  -2.1       -2.1     92.86 -3.1     0.0   
1  01:00:00     1735689600  -2.4       -4.4     92.85 -3.4     0.0   
2  02:00:00     1735693200  -2.4       -5.9     94.24 -3.2     0.0   
3  03:00:00     1735696800  -2.8       -

In [43]:
# Process the collected data chunks
if all_weather_chunks and len(all_weather_chunks) > 0:
    # Combine all chunks into a single dataset
    print("Processing and combining all data chunks...")
    hourly_data = []

    for chunk in all_weather_chunks:
        # Extract location information (should be the same for all chunks)
        location_info = {
            "address": chunk.get("address", ""),
            "latitude": chunk.get("latitude", 0),
            "longitude": chunk.get("longitude", 0),
            "timezone": chunk.get("timezone", ""),
            "tzoffset": chunk.get("tzoffset", 0)
        }

        # Extract hourly data from each day in this chunk
        for day in chunk.get("days", []):
            date = day.get("datetime", "")
            for hour in day.get("hours", []):
                # Combine day and hour data
                hour_data = hour.copy()
                hour_data["date"] = date

                # Add location information
                for key, value in location_info.items():
                    hour_data[key] = value

                hourly_data.append(hour_data)

    # Convert to Pandas DataFrame
    weather_df = pd.DataFrame(hourly_data)
    print(f"Successfully collected {len(hourly_data)} hourly weather records")
    print("Sample data:")
    print(weather_df.head())

    # Convert pandas DataFrame to PySpark DataFrame
    spark_df = spark.createDataFrame(weather_df)

    # Create proper timestamp column
    spark_df = spark_df.withColumn(
        "timestamp",
        F.to_timestamp(
            F.concat(F.col("date"), F.lit(" "), F.col("datetime")),
            "yyyy-MM-dd HH:mm:ss"
        )
    )

    # Store as Parquet (columnar format, efficient for analytics)
    parquet_path = "/content/drive/MyDrive/basel_weather_analysis/basel_weather_data.parquet"
    spark_df.write.mode("overwrite").parquet(parquet_path)

    print(f"Data successfully stored in Parquet format at: {parquet_path}")

    # Read back the data to confirm it was stored correctly
    parquet_df = spark.read.parquet(parquet_path)
    print("Sample of stored data:")
    parquet_df.select("timestamp", "temp", "humidity", "windspeed", "conditions").show(5)
else:
    print("Failed to fetch or load weather data. Please check your API key and parameters.")

Processing and combining all data chunks...
Successfully collected 2495 hourly weather records
Sample data:
   datetime  datetimeEpoch  temp  feelslike  humidity  dew  precip  \
0  00:00:00     1735686000  -2.1       -2.1     92.86 -3.1     0.0   
1  01:00:00     1735689600  -2.4       -4.4     92.85 -3.4     0.0   
2  02:00:00     1735693200  -2.4       -5.9     94.24 -3.2     0.0   
3  03:00:00     1735696800  -2.8       -6.5     92.82 -3.8     0.0   
4  04:00:00     1735700400  -3.1       -6.3     92.81 -4.1     0.0   

   precipprob  snow  snowdepth  ...         icon  \
0         0.0   0.0       14.4  ...  clear-night   
1         0.0   0.0       14.5  ...  clear-night   
2         0.0   0.0       14.4  ...  clear-night   
3         0.0   0.0       14.6  ...  clear-night   
4         0.0   0.0       14.0  ...  clear-night   

                                            stations  source        date  \
0  [06601099999, 06641099999, 06645099999, 072990...     obs  2025-01-01   
1  [06

In [44]:
# Function to assess data quality and generate statistics
def assess_data_quality(df):
    print("\n" + "="*50)
    print("DATA QUALITY ASSESSMENT REPORT")
    print("="*50)

    # 1. Check for missing values
    print("\n1. MISSING VALUES ANALYSIS:")
    missing_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
    missing_counts.show(truncate=False)

    # 2. Check for duplicates
    duplicate_count = df.count() - df.dropDuplicates().count()
    print(f"\n2. DUPLICATE RECORDS: {duplicate_count}")

    # 3. Check data completeness (time series continuity)
    print("\n3. TIME SERIES COMPLETENESS:")

    # Expected number of hours in the date range
    start_ts = df.select(F.min("timestamp")).collect()[0][0]
    end_ts = df.select(F.max("timestamp")).collect()[0][0]

    expected_hours = int((end_ts.timestamp() - start_ts.timestamp()) / 3600) + 1
    actual_hours = df.select("timestamp").distinct().count()

    print(f"Date range: {start_ts} to {end_ts}")
    print(f"Expected hourly records: {expected_hours}")
    print(f"Actual hourly records: {actual_hours}")
    print(f"Missing hours: {expected_hours - actual_hours}")

    # 4. Check for outliers in numerical columns
    print("\n4. NUMERICAL DATA DISTRIBUTION:")
    numeric_cols = [field.name for field in df.schema.fields
                   if isinstance(field.dataType, (IntegerType, DoubleType, FloatType))]

    for col in numeric_cols:
        if col in ["temp", "solarenergy", "windspeed"]:
            print(f"\nStatistics for {col}:")
            df.select(
                F.min(col).alias("min"),
                F.max(col).alias("max"),
                F.mean(col).alias("mean"),
                F.stddev(col).alias("stddev"),
                F.expr(f"percentile({col}, 0.25)").alias("25th_percentile"),
                F.expr(f"percentile({col}, 0.5)").alias("median"),
                F.expr(f"percentile({col}, 0.75)").alias("75th_percentile")
            ).show()

    return "Data quality assessment complete"

In [45]:
# Create synthetic "dirty" data for testing quality checks
def create_dirty_data(df):
    print("\n" + "="*50)
    print("CREATING SYNTHETIC DIRTY DATA FOR TESTING")
    print("="*50)

    # Get the schema from the original DataFrame to ensure type consistency
    original_schema = df.schema

    # 1. Introduce missing values for multiple columns
    dirty_df = df.withColumn(
        "temp",
        F.when(F.rand() < 0.05, None).otherwise(F.col("temp"))
    ).withColumn(
        "windspeed",
        F.when(F.rand() < 0.05, None).otherwise(F.col("windspeed"))
    ).withColumn(
        "solarenergy",
        F.when(F.rand() < 0.05, None).otherwise(F.col("solarenergy"))
    )

    # 2. Introduce outliers for temperature
    dirty_df = dirty_df.withColumn(
        "temp",
        F.when(F.rand() < 0.02, F.col("temp") * 100).otherwise(F.col("temp"))
    )

    # 3. Introduce outliers for windspeed (unrealistically high values)
    dirty_df = dirty_df.withColumn(
        "windspeed",
        F.when(F.rand() < 0.02, F.lit(200 + F.rand() * 300)).otherwise(F.col("windspeed"))
    )

    # 4. Introduce outliers for solar energy (negative values and unrealistically high values)
    dirty_df = dirty_df.withColumn(
        "solarenergy",
        F.when(F.rand() < 0.01, F.lit(-1 * F.rand() * 10))  # Negative values
         .when(F.rand() < 0.01, F.lit(50 + F.rand() * 50))  # Unrealistically high values
         .otherwise(F.col("solarenergy"))
    )

    # 5. Introduce day/night inconsistencies for solar energy
    dirty_df = dirty_df.withColumn(
        "solarenergy",
        F.when(
            ((F.hour("timestamp") >= 22) | (F.hour("timestamp") <= 4)) & (F.rand() < 0.2),
            F.lit(10 + F.rand() * 15)  # Significant solar energy at night
        ).otherwise(F.col("solarenergy"))
    )

    # 6. Introduce sudden windspeed spikes (isolated anomalies)
    window_spec = Window.orderBy("timestamp").rowsBetween(-3, 3)

    dirty_df = dirty_df.withColumn(
        "avg_windspeed_window",
        F.avg("windspeed").over(window_spec)
    )

    dirty_df = dirty_df.withColumn(
        "windspeed",
        F.when(
            (F.rand() < 0.02) & (F.col("avg_windspeed_window").isNotNull()),
            F.col("avg_windspeed_window") * 10  # Create a spike 10x the local average
        ).otherwise(F.col("windspeed"))
    ).drop("avg_windspeed_window")

    # 7. Introduce unusual hourly temperature changes
    window_spec_lag = Window.orderBy("timestamp")
    dirty_df = dirty_df.withColumn(
        "prev_temp",
        F.lag("temp", 1).over(window_spec_lag)
    )

    dirty_df = dirty_df.withColumn(
        "temp",
        F.when(
            (F.rand() < 0.03) & (F.col("prev_temp").isNotNull()),
            F.col("prev_temp") + F.when(F.rand() < 0.5, F.lit(15)).otherwise(F.lit(-15))
        ).otherwise(F.col("temp"))
    ).drop("prev_temp")

    # 8. Introduce duplicates - FIXED APPROACH
    # Instead of collecting and creating a new DataFrame, duplicate directly
    sample_df = dirty_df.limit(10)
    dirty_df = dirty_df.union(sample_df)

    # 9. Introduce data gaps (remove some time periods)
    timestamps_to_remove = dirty_df.select("timestamp").distinct().orderBy(F.rand()).limit(24).collect()
    timestamps_list = [row.timestamp for row in timestamps_to_remove]

    for ts in timestamps_list:
        dirty_df = dirty_df.filter(F.col("timestamp") != ts)

    # 10. Introduce inconsistent relationships between weather variables
    if "conditions" in dirty_df.columns and "weather_group" in dirty_df.columns:
        dirty_df = dirty_df.withColumn(
            "conditions",
            F.when(
                (F.col("temp") > 25) & (F.rand() < 0.1),
                F.lit("Snow")
            ).otherwise(F.col("conditions"))
        ).withColumn(
            "weather_group",
            F.when(
                (F.col("conditions") == "Snow"),
                F.lit("snowy")
            ).otherwise(F.col("weather_group"))
        )

    # Save the dirty data
    dirty_data_path = "/content/drive/MyDrive/basel_weather_analysis/dirty_data.parquet"
    dirty_df.write.mode("overwrite").parquet(dirty_data_path)

    # Generate a summary of introduced errors
    print(f"Dirty data created and saved to: {dirty_data_path}")
    print(f"Original record count: {df.count()}")
    print(f"Dirty data record count: {dirty_df.count()}")

    # Summary of introduced errors
    print("\nSummary of introduced errors:")
    print("1. Missing values in temperature, windspeed, and solar energy columns (~5% each)")
    print("2. Temperature outliers (values multiplied by 100 in ~2% of records)")
    print("3. Windspeed outliers (200-500 km/h in ~2% of records)")
    print("4. Solar energy outliers (negative values and values >50 MJ/m² in ~2% of records)")
    print("5. Day/night inconsistencies (significant solar energy during night hours)")
    print("6. Isolated windspeed spikes (sudden increases of 10x local average)")
    print("7. Unusual hourly temperature changes (±15°C changes in ~3% of records)")
    print("8. Duplicated records (10 duplicate records added)")
    print("9. Data gaps (24 hourly records removed)")
    print("10. Inconsistent weather relationships (high temperatures with snow)")

    return dirty_df

In [46]:
# Data quality checks
def validate_data_quality(df, rules):
    print("\n" + "="*50)
    print("DATA VALIDATION WITH ERROR MESSAGING")
    print("="*50)

    validation_results = []

    for rule in rules:
        rule_name = rule["name"]
        check_function = rule["check"]

        try:
            result = check_function(df)
            if not result["passed"]:
                error_message = f"QUALITY CHECK FAILED: {rule_name} - {result['message']}"
                print(error_message)
                validation_results.append({
                    "rule": rule_name,
                    "passed": False,
                    "error": error_message,
                    "affected_rows": result.get("affected_rows", None),
                    "examples": result.get("examples", "No examples provided")
                })
            else:
                validation_results.append({
                    "rule": rule_name,
                    "passed": True
                })
                print(f"QUALITY CHECK PASSED: {rule_name}")
        except Exception as e:
            error_message = f"ERROR RUNNING QUALITY CHECK: {rule_name} - {str(e)}"
            print(error_message)
            validation_results.append({
                "rule": rule_name,
                "passed": False,
                "error": error_message
            })

    return validation_results

# Define a function for hourly data completeness check with examples
def check_hourly_completeness(df):
    # Get min and max timestamps
    min_ts = df.select(F.min("timestamp")).collect()[0][0]
    max_ts = df.select(F.max("timestamp")).collect()[0][0]

    # Expected number of hours
    expected_hours = int((max_ts.timestamp() - min_ts.timestamp()) / 3600) + 1
    actual_hours = df.select("timestamp").distinct().count()

    # Find missing hours if any
    if expected_hours > actual_hours:
        # Generate a sequence of all expected hours
        expected_timestamps = spark.sql(f"""
        SELECT explode(sequence(
            to_timestamp('{min_ts}'),
            to_timestamp('{max_ts}'),
            interval 1 hour
        )) AS expected_timestamp
        """)

        # Join with actual data to find missing timestamps
        expected_timestamps.createOrReplaceTempView("expected_hours")
        df.createOrReplaceTempView("actual_data")

        missing_hours = spark.sql("""
        SELECT e.expected_timestamp
        FROM expected_hours e
        LEFT JOIN actual_data a ON date_format(e.expected_timestamp, 'yyyy-MM-dd HH:00:00') =
                                   date_format(a.timestamp, 'yyyy-MM-dd HH:00:00')
        WHERE a.timestamp IS NULL
        ORDER BY e.expected_timestamp
        """).limit(5).collect()

        example_str = "\n".join([
            f"  - Missing hour: {row['expected_timestamp']}"
            for row in missing_hours
        ])

        message = f"Expected {expected_hours} hourly records, but found {actual_hours}.\nExamples of missing hours:\n{example_str}"
    else:
        message = f"Expected {expected_hours} hourly records, but found {actual_hours}"

    return {
        "passed": expected_hours == actual_hours,
        "message": message,
        "affected_rows": expected_hours - actual_hours if expected_hours > actual_hours else 0
    }

# Define a function to check for unusual hourly temperature changes
def check_unusual_temp_changes(df):
    # Create a window specification ordered by timestamp
    window_spec = Window.orderBy("timestamp")

    # Calculate temperature difference between consecutive hours
    temp_diff_df = df.withColumn(
        "prev_temp", F.lag("temp").over(window_spec)
    ).withColumn(
        "temp_change", F.abs(F.col("temp") - F.col("prev_temp"))
    )

    # Find records with unusual temperature changes
    unusual_changes = temp_diff_df.filter(
        F.col("temp_change").isNotNull() & (F.col("temp_change") > 10)
    )

    unusual_count = unusual_changes.count()

    if unusual_count > 0:
        # Get some examples of unusual changes
        examples = unusual_changes.select(
            "timestamp", "temp", "prev_temp", "temp_change"
        ).orderBy(F.desc("temp_change")).limit(5).collect()

        example_str = "\n".join([
            f"  - At {row['timestamp']}: Change from {row['prev_temp']}°C to {row['temp']}°C (Δ{row['temp_change']}°C)"
            for row in examples
        ])

        message = f"Found {unusual_count} instances of unusual hourly temperature changes (>10°C).\nExamples:\n{example_str}"
    else:
        message = "No unusual hourly temperature changes detected."

    return {
        "passed": unusual_count == 0,
        "message": message,
        "affected_rows": unusual_count
    }

# Define a function for checking solar energy day/night patterns with examples
def check_solar_day_night_pattern(df):
    # Solar energy should be close to zero during nighttime (roughly 8pm-6am), may fail in summer: replace with actual day night cycle
    night_hours_df = df.filter((F.hour("timestamp") >= 20) | (F.hour("timestamp") <= 6))
    invalid_night_records = night_hours_df.filter(F.col("solarenergy") > 1)
    invalid_count = invalid_night_records.count()

    if invalid_count > 0:
        # Get examples of invalid records
        examples = invalid_night_records.select(
            "timestamp", "solarenergy", F.hour("timestamp").alias("hour")
        ).orderBy(F.desc("solarenergy")).limit(5).collect()

        example_str = "\n".join([
            f"  - At {row['timestamp']} (hour {row['hour']}): Solar energy value of {row['solarenergy']} MJ/m²"
            for row in examples
        ])

        message = f"Found {invalid_count} nighttime records with significant solar energy (>1 MJ/m²).\nExamples:\n{example_str}"
    else:
        message = f"Found {invalid_count} nighttime records with significant solar energy (>1 MJ/m²)"

    return {
        "passed": invalid_count == 0,
        "message": message,
        "affected_rows": invalid_count
    }

def check_windspeed_consistency(df):
    # Create a window specification ordered by timestamp
    window_spec = Window.orderBy("timestamp")

    # Calculate previous hour's windspeed
    windspeed_diff_df = df.withColumn(
        "prev_windspeed", F.lag("windspeed").over(window_spec)
    ).withColumn(
        "windspeed_ratio", F.when(
            F.col("prev_windspeed").isNotNull() & (F.col("prev_windspeed") != 0),
            F.col("windspeed") / F.col("prev_windspeed")
        ).otherwise(None)
    )

    # Find records where windspeed is more than 5 times the previous hour and >20 km/h
    sudden_changes = windspeed_diff_df.filter(
        F.col("windspeed_ratio").isNotNull() &
        (F.col("windspeed_ratio") > 5) &
        (F.col("windspeed") > 20)
    )

    unusual_count = sudden_changes.count()

    if unusual_count > 0:
        # Get examples of unusual windspeed changes
        examples = sudden_changes.select(
            "timestamp", "windspeed", "prev_windspeed", "windspeed_ratio"
        ).orderBy(F.desc("windspeed_ratio")).limit(5).collect()

        example_str = "\n".join([
            f"  - At {row['timestamp']}: Windspeed {row['windspeed']} km/h, previous {row['prev_windspeed']} km/h (ratio: {row['windspeed_ratio']:.2f})"
            for row in examples
        ])

        message = f"Found {unusual_count} instances where windspeed is >5x previous hour and >20 km/h.\nExamples:\n{example_str}"
    else:
        message = "No unusual hourly windspeed changes detected."

    return {
        "passed": unusual_count == 0,
        "message": message,
        "affected_rows": unusual_count
    }

# Modify lambda functions to include examples
def check_missing_timestamps(df):
    missing_timestamps = df.filter(F.col("timestamp").isNull())
    missing_count = missing_timestamps.count()

    if missing_count > 0:
        examples = missing_timestamps.select("*").limit(5).collect()
        example_str = "\n".join([
            f"  - Row with missing timestamp: {row}"
            for row in examples
        ])
        message = f"Found {missing_count} records with missing timestamps.\nExamples:\n{example_str}"
    else:
        message = "Found 0 records with missing timestamps"

    return {
        "passed": missing_count == 0,
        "message": message,
        "affected_rows": missing_count
    }

def check_temperature_range(df):
    out_of_range = df.filter((F.col("temp") < -50) | (F.col("temp") > 60))
    out_of_range_count = out_of_range.count()

    if out_of_range_count > 0:
        examples = out_of_range.select("timestamp", "temp").orderBy(
            F.when(F.col("temp") < -50, F.col("temp")).otherwise(F.lit(1000) - F.col("temp"))
        ).limit(5).collect()

        example_str = "\n".join([
            f"  - At {row['timestamp']}: Temperature of {row['temp']}°C"
            for row in examples
        ])

        message = f"Found {out_of_range_count} temperature values outside reasonable range (-50 to 60°C).\nExamples:\n{example_str}"
    else:
        message = f"Found {out_of_range_count} temperature values outside reasonable range (-50 to 60°C)"

    return {
        "passed": out_of_range_count == 0,
        "message": message,
        "affected_rows": out_of_range_count
    }

def check_duplicates(df):
    total_count = df.count()
    distinct_count = df.dropDuplicates().count()
    duplicate_count = total_count - distinct_count

    if duplicate_count > 0:
        # Find duplicated rows
        window_spec = Window.partitionBy(df.columns).orderBy("timestamp")
        df_with_count = df.withColumn("count", F.count("*").over(Window.partitionBy(df.columns)))
        duplicates = df_with_count.filter(F.col("count") > 1).drop("count")

        examples = duplicates.select("timestamp", "temp", "windspeed", "solarenergy").limit(5).collect()

        example_str = "\n".join([
            f"  - Duplicate record at {row['timestamp']}: temp={row['temp']}°C, windspeed={row['windspeed']} km/h, solarenergy={row['solarenergy']} MJ/m²"
            for row in examples
        ])

        message = f"Found {duplicate_count} duplicate records in the dataset.\nExamples:\n{example_str}"
    else:
        message = f"Found {duplicate_count} duplicate records in the dataset"

    return {
        "passed": duplicate_count == 0,
        "message": message,
        "affected_rows": duplicate_count
    }

def check_windspeed_range(df):
    out_of_range = df.filter((F.col("windspeed") < 0) | (F.col("windspeed") > 150))
    out_of_range_count = out_of_range.count()

    if out_of_range_count > 0:
        examples = out_of_range.select("timestamp", "windspeed").orderBy(
            F.when(F.col("windspeed") < 0, F.col("windspeed")).otherwise(F.lit(1000) - F.col("windspeed"))
        ).limit(5).collect()

        example_str = "\n".join([
            f"  - At {row['timestamp']}: Windspeed of {row['windspeed']} km/h"
            for row in examples
        ])

        message = f"Found {out_of_range_count} windspeed values outside reasonable range (0-150 km/h).\nExamples:\n{example_str}"
    else:
        message = f"Found {out_of_range_count} windspeed values outside reasonable range (0-150 km/h)"

    return {
        "passed": out_of_range_count == 0,
        "message": message,
        "affected_rows": out_of_range_count
    }

def check_solarenergy_range(df):
    out_of_range = df.filter((F.col("solarenergy") < 0) | (F.col("solarenergy") > 35))
    out_of_range_count = out_of_range.count()

    if out_of_range_count > 0:
        examples = out_of_range.select("timestamp", "solarenergy").orderBy(
            F.when(F.col("solarenergy") < 0, F.col("solarenergy")).otherwise(F.lit(1000) - F.col("solarenergy"))
        ).limit(5).collect()

        example_str = "\n".join([
            f"  - At {row['timestamp']}: Solar energy of {row['solarenergy']} MJ/m²"
            for row in examples
        ])

        message = f"Found {out_of_range_count} solar energy values outside reasonable range (0-35 MJ/m²).\nExamples:\n{example_str}"
    else:
        message = f"Found {out_of_range_count} solar energy values outside reasonable range (0-35 MJ/m²)"

    return {
        "passed": out_of_range_count == 0,
        "message": message,
        "affected_rows": out_of_range_count
    }

# Define quality check rules with the updated functions
quality_rules = [
    {
        "name": "No Missing Timestamps",
        "check": check_missing_timestamps
    },
    {
        "name": "Temperature Within Range",
        "check": check_temperature_range
    },
    {
        "name": "No Duplicate Records",
        "check": check_duplicates
    },
    {
        "name": "Hourly Data Completeness",
        "check": check_hourly_completeness
    },
    {
        "name": "No Unusual Hourly Temperature Changes",
        "check": check_unusual_temp_changes
    },
    {
        "name": "Windspeed Within Range",
        "check": check_windspeed_range
    },
    {
        "name": "Solar Energy Within Range",
        "check": check_solarenergy_range
    },
    {
        "name": "Solar Energy Day/Night Pattern",
        "check": check_solar_day_night_pattern
    },
    {
        "name": "Windspeed Consistency Check",
        "check": check_windspeed_consistency
    }
]

In [47]:
def enhance_weather_data(df):
    print("Enhancing data with filterable columns...")

    # Import math functions for cyclical encoding
    from pyspark.sql.functions import sin, cos, lit, pi

    # Add temperature categories
    df = df.withColumn(
        "temp_category",
        F.when(F.col("temp") < 0, "freezing")
        .when(F.col("temp") < 10, "cold")
        .when(F.col("temp") < 20, "cool")
        .when(F.col("temp") < 30, "warm")
        .otherwise("hot")
    )

    # Add time-based columns
    df = df.withColumn("hour_of_day", F.hour(F.col("timestamp")))
    df = df.withColumn("day_of_week", F.dayofweek(F.col("timestamp")))
    df = df.withColumn("is_weekend", F.when(F.dayofweek(F.col("timestamp")).isin([1, 7]), 1).otherwise(0))
    df = df.withColumn("month", F.month(F.col("timestamp")))
    df = df.withColumn("day_of_month", F.dayofmonth(F.col("timestamp")))
    df = df.withColumn("day_of_year", F.dayofyear(F.col("timestamp")))
    df = df.withColumn("date", F.date_format(F.col("timestamp"), "yyyy-MM-dd"))
    df = df.withColumn(
        "season",
        F.when((F.month(F.col("timestamp")) >= 3) & (F.month(F.col("timestamp")) <= 5), "spring")
        .when((F.month(F.col("timestamp")) >= 6) & (F.month(F.col("timestamp")) <= 8), "summer")
        .when((F.month(F.col("timestamp")) >= 9) & (F.month(F.col("timestamp")) <= 11), "autumn")
        .otherwise("winter")
    )

    # Add cyclical encoding for hour of day (24-hour cycle)
    # This preserves the cyclical nature where 23:00 is close to 00:00
    df = df.withColumn("hour_sin", sin(df["hour_of_day"] * 2 * pi() / 24))
    df = df.withColumn("hour_cos", cos(df["hour_of_day"] * 2 * pi() / 24))

    # Add cyclical encoding for day of week (7-day cycle)
    # This preserves the cyclical nature where Sunday is close to Monday
    df = df.withColumn("day_of_week_sin", sin(df["day_of_week"] * 2 * pi() / 7))
    df = df.withColumn("day_of_week_cos", cos(df["day_of_week"] * 2 * pi() / 7))

    # Add cyclical encoding for month (12-month cycle)
    # This preserves the cyclical nature where December is close to January
    df = df.withColumn("month_sin", sin(df["month"] * 2 * pi() / 12))
    df = df.withColumn("month_cos", cos(df["month"] * 2 * pi() / 12))

    # Add cyclical encoding for day of year (365-day cycle)
    # This preserves the cyclical nature where December 31 is close to January 1
    df = df.withColumn("day_of_year_sin", sin(df["day_of_year"] * 2 * pi() / 365.25))
    df = df.withColumn("day_of_year_cos", cos(df["day_of_year"] * 2 * pi() / 365.25))

    # Add weather condition categories
    if "conditions" in df.columns:
        df = df.withColumn(
            "weather_group",
            F.when(F.lower(F.col("conditions")).like("%rain%"), "rainy")
            .when(F.lower(F.col("conditions")).like("%snow%"), "snowy")
            .when(F.lower(F.col("conditions")).like("%cloud%"), "cloudy")
            .when(F.lower(F.col("conditions")).like("%clear%"), "clear")
            .when(F.lower(F.col("conditions")).like("%sun%"), "sunny")
            .when(F.lower(F.col("conditions")).like("%fog%"), "foggy")
            .otherwise("other")
        )

    # Add precipitation categories
    if "precip" in df.columns:
        df = df.withColumn(
            "precip_category",
            F.when(F.col("precip") == 0, "none")
            .when(F.col("precip") < 2, "light")
            .when(F.col("precip") < 10, "moderate")
            .otherwise("heavy")
        )

    # Add wind speed categories
    if "windspeed" in df.columns:
        df = df.withColumn(
            "wind_category",
            F.when(F.col("windspeed") < 5, "calm")
            .when(F.col("windspeed") < 15, "light breeze")
            .when(F.col("windspeed") < 25, "moderate breeze")
            .when(F.col("windspeed") < 35, "strong breeze")
            .otherwise("high wind")
        )

    print("Filterable columns and cyclical time encodings added successfully.")
    print("Added cyclical encodings for: hour of day, day of week, month, and day of year.")
    return df

# Assuming parquet_df is the DataFrame loaded from the Parquet file
enhanced_df = enhance_weather_data(parquet_df)

# Save the enhanced data
enhanced_path = "/content/drive/MyDrive/basel_weather_analysis/enhanced_data.parquet"
enhanced_df.write.mode("overwrite").parquet(enhanced_path)

print(f"Enhanced data saved to: {enhanced_path}")

Enhancing data with filterable columns...
Filterable columns and cyclical time encodings added successfully.
Added cyclical encodings for: hour of day, day of week, month, and day of year.
Enhanced data saved to: /content/drive/MyDrive/basel_weather_analysis/enhanced_data.parquet


In [48]:
# Calculate 96-hour moving averages
def calculate_moving_averages(df):
    print("\n" + "="*50)
    print("CALCULATING 96-HOUR MOVING AVERAGES")
    print("="*50)

    # Create window specification for 96-hour moving average
    window_spec = Window.orderBy("timestamp").rowsBetween(-95, 0)

    # Apply moving averages to relevant columns
    moving_avg_df = df

    if "temp" in df.columns:
        moving_avg_df = moving_avg_df.withColumn(
            "temp_96hr_avg",
            F.avg("temp").over(window_spec)
        )
    if "solarenergy" in df.columns:
        moving_avg_df = moving_avg_df.withColumn(
            "solar_96hr_avg",
            F.avg("solarenergy").over(window_spec)
        )
    if "humidity" in moving_avg_df.columns:
        moving_avg_df = moving_avg_df.withColumn(
            "humidity_96hr_avg",
            F.avg("humidity").over(window_spec)
        )

    if "windspeed" in moving_avg_df.columns:
        moving_avg_df = moving_avg_df.withColumn(
            "windspeed_96hr_avg",
            F.avg("windspeed").over(window_spec)
        )

    if "precip" in moving_avg_df.columns:
        moving_avg_df = moving_avg_df.withColumn(
            "precip_96hr_avg",
            F.avg("precip").over(window_spec)
        )

    # Save data with moving averages
    moving_avg_path = "/content/drive/MyDrive/basel_weather_analysis/moving_avg_data.parquet"
    moving_avg_df.write.mode("overwrite").parquet(moving_avg_path)

    print(f"Data with 96-hour moving averages saved to: {moving_avg_path}")

    # Display a sample of the data with moving averages
    print("Sample of data with 96-hour moving averages:")
    moving_avg_df.select(
        "timestamp",
        "temp", "temp_96hr_avg",
        "solarenergy", "solar_96hr_avg",
        "humidity", "humidity_96hr_avg",
        "windspeed", "windspeed_96hr_avg",
        "precip", "precip_96hr_avg"
    ).show(5)

    return moving_avg_df

In [49]:
def implement_indexing_strategies(df):
    print("\nImplementing indexing strategies...")

    # Strategy 1: Time-based Partitioning (Primary)
    print("1. Time-based Partitioning by month and day_of_week")
    partitioned_path = "/content/drive/MyDrive/basel_weather_analysis/partitioned_data.parquet"
    # Partition by month and day_of_week for efficient time-based queries
    df.write.partitionBy("month", "day_of_week") \
        .mode("overwrite") \
        .parquet(partitioned_path)
    print(f"Time-partitioned data saved to: {partitioned_path}")

    # Strategy 2: Sorting within Partitions by Weather Categories (Secondary)
    print("\n2. Sorting within partitions by temp_category and weather_group")
    # Read back the partitioned data and sort within each partition
    partitioned_df = spark.read.parquet(partitioned_path)
    sorted_path = "/content/drive/MyDrive/basel_weather_analysis/partitioned_sorted_data.parquet"
    partitioned_df.orderBy("month", "day_of_week", "temp_category", "weather_group") \
        .write.mode("overwrite").parquet(sorted_path)
    print(f"Partitioned and sorted data saved to: {sorted_path}")

    # Strategy 3: SQL-based Indexing for Flexible Queries
    print("\n3. SQL-based Indexing (Temporary View)")
    df.createOrReplaceTempView("weather_data")
    print("Temporary view 'weather_data' created for SQL querying.")

    # Example query: Average windspeed and solar energy by hour for weekends in April
    print("\nExample query using the temporary view:")
    spark.sql("""
    SELECT
        hour_of_day,
        AVG(windspeed) as avg_windspeed,
        AVG(solarenergy) as avg_solarenergy
    FROM weather_data
    WHERE month = 4 AND is_weekend = 1  -- Weekend days in April
    GROUP BY hour_of_day
    ORDER BY hour_of_day
    """).show(24)

    return "Indexing strategies implemented successfully"

In [50]:
# Execute the complete workflow if data is available
if 'parquet_df' in locals():
    print("\n" + "="*50)
    print("EXECUTING COMPLETE DATA ENGINEERING WORKFLOW")
    print("="*50)

    # 1. Initial data quality assessment with local timestamps
    print("\nStep 1: Initial data quality assessment with local timestamps")
    print("Checking for time series completeness issues...")
    assessment_result = assess_data_quality(parquet_df)

    # 2. Identify and fix the missing hour due to DST
    print("\nStep 2: Addressing the missing hour due to Daylight Saving Time")
    print("Converting timestamps to UTC to create a continuous time series...")

    # Convert to UTC
    utc_df = parquet_df.withColumn(
        "timestamp_utc",
        F.to_utc_timestamp(F.col("timestamp"), "Europe/Zurich")
    )

    # Make a copy with UTC as the primary timestamp for analysis
    analysis_df = utc_df.select(
        F.col("timestamp").alias("timestamp_local"),
        F.col("timestamp_utc").alias("timestamp"),
        *[F.col(c) for c in utc_df.columns if c not in ["timestamp", "timestamp_utc"]]
    )

    # Save the UTC version
    utc_path = "/content/drive/MyDrive/basel_weather_analysis/basel_weather_data_utc.parquet"
    analysis_df.write.mode("overwrite").parquet(utc_path)
    print(f"UTC-based data saved to: {utc_path}")

    # Verify the fix worked by reassessing data quality with UTC timestamps
    print("\nVerifying time series completeness with UTC timestamps:")
    utc_assessment = assess_data_quality(analysis_df)

    # 3. Create synthetic dirty data for testing quality checks
    print("\nStep 3: Creating synthetic dirty data for testing")
    dirty_df = create_dirty_data(analysis_df)

    # 4. Validate data quality with error messaging
    print("\nStep 4: Validating data quality")
    print("Original data with local timestamps:")
    original_validation = validate_data_quality(parquet_df, quality_rules)

    print("\nFixed data with UTC timestamps:")
    utc_validation = validate_data_quality(analysis_df, quality_rules)

    print("\nSynthetic dirty data:")
    dirty_validation = validate_data_quality(dirty_df, quality_rules)

    # 5. Enhance data with filterable columns
    print("\nStep 5: Enhancing data with filterable columns")
    enhanced_df = enhance_weather_data(analysis_df)

    # 6. Calculate 96-hour moving averages
    print("\nStep 6: Calculating 96-hour moving averages")
    moving_avg_df = calculate_moving_averages(enhanced_df)

    # 7. Implement indexing strategies
    print("\nStep 7: Implementing indexing strategies")
    indexing_result = implement_indexing_strategies(moving_avg_df)

    # 8. Demonstrate the impact of using UTC vs local time for analysis
    print("\nStep 8: Demonstrating the impact of UTC conversion on analysis")

    # Create a view for SQL comparison
    parquet_df.createOrReplaceTempView("local_time_data")
    analysis_df.createOrReplaceTempView("utc_time_data")

    print("\nComparison of hourly record counts around DST transition:")
    # Find the DST transition date (last Sunday in March)
    dst_date = "2025-03-30"  # This was the DST transition date in 2025

    # Query showing the gap in local time
    print("\nLocal time data around DST transition:")
    spark.sql(f"""
    SELECT
        date_format(timestamp, 'yyyy-MM-dd') as date,
        hour(timestamp) as hour,
        count(*) as record_count
    FROM local_time_data
    WHERE date_format(timestamp, 'yyyy-MM-dd') = '{dst_date}'
    GROUP BY date_format(timestamp, 'yyyy-MM-dd'), hour(timestamp)
    ORDER BY date, hour
    """).show(24)

    # Query showing continuous data in UTC
    print("\nUTC time data around DST transition:")
    spark.sql(f"""
    SELECT
        date_format(timestamp, 'yyyy-MM-dd') as date,
        hour(timestamp) as hour,
        count(*) as record_count
    FROM utc_time_data
    WHERE date_format(timestamp_local, 'yyyy-MM-dd') = '{dst_date}'
    GROUP BY date_format(timestamp, 'yyyy-MM-dd'), hour(timestamp)
    ORDER BY date, hour
    """).show(24)

    print("\n" + "="*50)
    print("DATA ENGINEERING PROJECT SUMMARY")
    print("="*50)
    print(f"1. Collected {parquet_df.count()} hourly weather records for Basel, Switzerland")
    print(f"2. Date range: {parquet_df.select(F.min('timestamp')).collect()[0][0]} to {parquet_df.select(F.max('timestamp')).collect()[0][0]}")
    print("3. Data stored in Parquet format (columnar, non-CSV)")
    print("4. Identified and resolved a time series completeness issue caused by Daylight Saving Time")
    print("5. Implemented comprehensive data quality checks with clear error messaging")
    print("6. Created synthetic dirty data to test quality check robustness")
    print("7. Enhanced data with filterable columns for improved analysis capabilities")
    print("8. Calculated 90-hour moving averages for key weather metrics")
    print("9. Implemented and discussed multiple indexing strategies for optimized data access")
    print("\nProject successfully completed!")
else:
    print("Error: Weather data not available. Please check the data collection steps.")


EXECUTING COMPLETE DATA ENGINEERING WORKFLOW

Step 1: Initial data quality assessment with local timestamps
Checking for time series completeness issues...

DATA QUALITY ASSESSMENT REPORT

1. MISSING VALUES ANALYSIS:
+--------+-------------+----+---------+--------+---+------+----------+----+---------+----------+--------+---------+-------+--------+----------+----------+--------------+-----------+-------+----------+----+--------+------+----+-------+--------+---------+--------+--------+----------+---------+
|datetime|datetimeEpoch|temp|feelslike|humidity|dew|precip|precipprob|snow|snowdepth|preciptype|windgust|windspeed|winddir|pressure|visibility|cloudcover|solarradiation|solarenergy|uvindex|conditions|icon|stations|source|date|address|latitude|longitude|timezone|tzoffset|severerisk|timestamp|
+--------+-------------+----+---------+--------+---+------+----------+----+---------+----------+--------+---------+-------+--------+----------+----------+--------------+-----------+-------+-------

In [51]:
# Load the enhanced Parquet file
df = spark.read.parquet(enhanced_path)

# Display the first few rows
print("Sample data from Parquet file:")
df.show(10)

# Get schema information
print("\nSchema information:")
df.printSchema()

# Get basic statistics
print("\nSummary statistics:")
df.describe().show()

# Run a sample query
print("\nSample query - Average temperature by month:")
df.groupBy("month").agg(F.avg("temp").alias("avg_temp")).orderBy("month").show()

Sample data from Parquet file:
+--------+-------------+----+---------+--------+---+------+----------+----+---------+----------+--------+---------+-------+--------+----------+----------+--------------+-----------+-------+--------------------+-------------------+--------------------+------+----------+-----------------+--------+---------+-------------+--------+----------+-------------------+-------------+-----------+-----------+----------+-----+------------+-----------+------+--------------------+--------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+-------------+---------------+-------------+
|datetime|datetimeEpoch|temp|feelslike|humidity|dew|precip|precipprob|snow|snowdepth|preciptype|windgust|windspeed|winddir|pressure|visibility|cloudcover|solarradiation|solarenergy|uvindex|          conditions|               icon|            stations|source|      date|          address|latitude|longitude|     timezone

In [52]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
import seaborn as sns

# Read the Parquet file into a pandas DataFrame
moving_avg_path = "/content/drive/MyDrive/basel_weather_analysis/moving_avg_data.parquet"
df = pd.read_parquet(moving_avg_path)

print(f"Successfully loaded {len(df)} records with {len(df.columns)} columns")
print(f"Columns include: {', '.join(df.columns)}")
print(f"Filterable columns include: {[col for col in df.columns if col.endswith('_category') or col in ['season', 'weather_group', 'is_weekend']]}")


Successfully loaded 2495 records with 57 columns
Columns include: timestamp_local, timestamp, datetime, datetimeEpoch, temp, feelslike, humidity, dew, precip, precipprob, snow, snowdepth, preciptype, windgust, windspeed, winddir, pressure, visibility, cloudcover, solarradiation, solarenergy, uvindex, conditions, icon, stations, source, date, address, latitude, longitude, timezone, tzoffset, severerisk, temp_category, hour_of_day, day_of_week, is_weekend, month, day_of_month, day_of_year, season, hour_sin, hour_cos, day_of_week_sin, day_of_week_cos, month_sin, month_cos, day_of_year_sin, day_of_year_cos, weather_group, precip_category, wind_category, temp_96hr_avg, solar_96hr_avg, humidity_96hr_avg, windspeed_96hr_avg, precip_96hr_avg
Filterable columns include: ['temp_category', 'is_weekend', 'season', 'weather_group', 'precip_category', 'wind_category']


In [53]:
# Create an interactive dashboard with dropdown filters
import ipywidgets as widgets
from IPython.display import display
import plotly.io as pio
pio.renderers.default = "colab"

# Define filter widgets
temp_filter = widgets.Dropdown(
    options=['All'] + sorted(df['temp_category'].unique().tolist()),
    value='All',
    description='Temperature:',
    style={'description_width': 'initial'}
)

weather_filter = widgets.Dropdown(
    options=['All'] + sorted(df['weather_group'].unique().tolist()),
    value='All',
    description='Weather:',
    style={'description_width': 'initial'}
)

season_filter = widgets.Dropdown(
    options=['All'] + sorted(df['season'].unique().tolist()),
    value='All',
    description='Season:',
    style={'description_width': 'initial'}
)

weekend_filter = widgets.Dropdown(
    options=['All', 'Weekday', 'Weekend'],
    value='All',
    description='Day type:',
    style={'description_width': 'initial'}
)

# Function to update visualization based on filters
def update_viz(temp_cat, weather_group, season, weekend):
    filtered_df = df.copy()

    if temp_cat != 'All':
        filtered_df = filtered_df[filtered_df['temp_category'] == temp_cat]

    if weather_group != 'All':
        filtered_df = filtered_df[filtered_df['weather_group'] == weather_group]

    if season != 'All':
        filtered_df = filtered_df[filtered_df['season'] == season]

    if weekend != 'All':
        is_weekend_val = 1 if weekend == 'Weekend' else 0
        filtered_df = filtered_df[filtered_df['is_weekend'] == is_weekend_val]

    # Create visualization with temperature, solar energy, and windspeed with 96-hour moving averages

    # Create a figure with three subplots stacked vertically
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 15), sharex=True)

    # Temperature plot (top subplot)
    ax1.plot(filtered_df['timestamp'], filtered_df['temp'], label='Temperature', alpha=0.7)
    ax1.plot(filtered_df['timestamp'], filtered_df['temp_96hr_avg'], label='96-hour Moving Average',
            linestyle='--', linewidth=2, color='red')
    ax1.set_title(f"Temperature Data for Basel ({len(filtered_df)} records)")
    ax1.set_ylabel('Temperature (°C)')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Solar energy plot (middle subplot)
    ax2.plot(filtered_df['timestamp'], filtered_df['solarenergy'], label='Solar Energy', alpha=0.7, color='orange')
    ax2.plot(filtered_df['timestamp'], filtered_df['solar_96hr_avg'], label='96-hour Moving Average',
            linestyle='--', linewidth=2, color='darkred')
    ax2.set_title("Solar Energy Data")
    ax2.set_ylabel('Solar Energy (MJ/m²)')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Windspeed plot (bottom subplot)
    ax3.plot(filtered_df['timestamp'], filtered_df['windspeed'], label='Wind Speed', alpha=0.7, color='green')
    ax3.plot(filtered_df['timestamp'], filtered_df['windspeed_96hr_avg'], label='96-hour Moving Average',
            linestyle='--', linewidth=2, color='darkgreen')
    ax3.set_title("Wind Speed Data")
    ax3.set_xlabel('Date')
    ax3.set_ylabel('Wind Speed (km/h)')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()


# Create interactive widget
interactive_plot = widgets.interactive(
    update_viz,
    temp_cat=temp_filter,
    weather_group=weather_filter,
    season=season_filter,
    weekend=weekend_filter
)

# Display the interactive dashboard
display(interactive_plot)

interactive(children=(Dropdown(description='Temperature:', options=('All', 'cold', 'cool', 'freezing', 'warm')…