# Weather Data Engineering Pipeline - Google Colab Version
# =========================================================
Run this notebook in Google Colab for a complete data engineering workflow

In [None]:
# Cell 1: Setup and Installation
print("Installing dependencies...")
!pip install -q pyspark requests great-expectations plotly sqlalchemy

print("Setup complete!")

In [None]:
# Cell 2: Import Libraries
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import json
from datetime import datetime
import time
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
warnings.filterwarnings('ignore')

print("All libraries imported successfully!")

In [None]:
# Cell 3: Initialize Spark Session
spark = SparkSession.builder \
    .appName("WeatherPipeline") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print(f"Spark version: {spark.version}")
print("Spark session initialized!")

In [None]:
# Cell 4: Configuration
# Get your free API key from: https://openweathermap.org/api
API_KEY = "34a91dc9a353902fd151448547fde35c"  # Replace with your API key

CITIES = [
    "London,UK", "New York,US", "Tokyo,JP", "Paris,FR",
    "Sydney,AU", "Mumbai,IN", "Singapore,SG", "Dubai,AE",
    "Toronto,CA", "Berlin,DE", "São Paulo,BR", "Moscow,RU"
]

BASE_URL = "http://api.openweathermap.org/data/2.5/weather"

print(f"Pipeline configured for {len(CITIES)} cities")

In [None]:
# Cell 5: Extract - Fetch Weather Data from API
def extract_weather_data(api_key, cities, base_url):
    """Extract current weather data from OpenWeatherMap API"""
    weather_data = []
    failed_cities = []

    print(f"Extracting data for {len(cities)} cities...")

    for city in cities:
        try:
            params = {
                'q': city,
                'appid': api_key,
                'units': 'metric'
            }
            response = requests.get(base_url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()

            # Flatten nested JSON structure
            # Ensure numeric fields that might vary between int/float are cast to float for consistent schema inference
            flattened = {
                'city': data['name'],
                'country': data['sys']['country'],
                'latitude': float(data['coord']['lat']),
                'longitude': float(data['coord']['lon']),
                'temperature': float(data['main']['temp']),
                'feels_like': float(data['main']['feels_like']),
                'temp_min': float(data['main']['temp_min']),
                'temp_max': float(data['main']['temp_max']),
                'pressure': float(data['main']['pressure']),
                'humidity': float(data['main']['humidity']),
                'weather_main': data['weather'][0]['main'],
                'weather_description': data['weather'][0]['description'],
                'wind_speed': float(data['wind']['speed']),
                'wind_deg': float(data['wind'].get('deg', 0)),
                'clouds': float(data['clouds']['all']),
                'visibility': float(data.get('visibility', 10000)),
                'rain_1h': float(data.get('rain', {}).get('1h', 0)),
                'snow_1h': float(data.get('snow', {}).get('1h', 0)),
                'sunrise': float(data['sys']['sunrise']),
                'sunset': float(data['sys']['sunset']),
                'timezone': float(data['timezone']),
                'timestamp': float(data['dt']),
                'extraction_time': datetime.now().isoformat()
            }
            weather_data.append(flattened)
            print(f"✓ {city}")
            time.sleep(0.3)  # Rate limiting

        except Exception as e:
            failed_cities.append(city)
            print(f"✗ {city}: {str(e)}")

    print(f"\nSuccessfully extracted: {len(weather_data)}/{len(cities)} cities")
    if failed_cities:
        print(f"Failed cities: {', '.join(failed_cities)}")

    return weather_data

# Execute extraction
raw_data = extract_weather_data(API_KEY, CITIES, BASE_URL)

# Display sample
if raw_data:
    print("\nSample raw data:")
    print(json.dumps(raw_data[0], indent=2))

In [None]:
# Cell 6: Load Raw Data into Spark DataFrame
if not raw_data:
    raise ValueError("No data extracted! Check your API key and internet connection.")

df_raw = spark.createDataFrame(raw_data)
print(f"Raw DataFrame created with {df_raw.count()} rows and {len(df_raw.columns)} columns")
print("\nSchema:")
df_raw.printSchema()
print("\nSample data:")
df_raw.show(5, truncate=False)

In [None]:
# Cell 7: Transform - Data Processing with PySpark
def transform_weather_data(df):
    """Apply comprehensive transformations to weather data"""

    # Datetime conversions
    df_transformed = df \
        .withColumn('date', from_unixtime(col('timestamp')).cast('date')) \
        .withColumn('datetime', from_unixtime(col('timestamp')).cast('timestamp')) \
        .withColumn('hour', hour(from_unixtime(col('timestamp')))) \
        .withColumn('day_of_week', date_format(from_unixtime(col('timestamp')), 'EEEE')) \
        .withColumn('sunrise_time', from_unixtime(col('sunrise')).cast('timestamp')) \
        .withColumn('sunset_time', from_unixtime(col('sunset')).cast('timestamp'))

    # Derived metrics
    df_transformed = df_transformed \
        .withColumn('temp_range', col('temp_max') - col('temp_min')) \
        .withColumn('is_raining', when(col('rain_1h') > 0, True).otherwise(False)) \
        .withColumn('is_snowing', when(col('snow_1h') > 0, True).otherwise(False)) \
        .withColumn('visibility_km', col('visibility') / 1000)

    # Categorizations
    df_transformed = df_transformed \
        .withColumn('temp_category',
                   when(col('temperature') < 0, 'Freezing')\
                   .when(col('temperature') < 10, 'Cold')\
                   .when(col('temperature') < 20, 'Cool')\
                   .when(col('temperature') < 25, 'Moderate')\
                   .when(col('temperature') < 30, 'Warm')\
                   .otherwise('Hot')) \
        .withColumn('humidity_category',
                   when(col('humidity') < 30, 'Low')\
                   .when(col('humidity') < 60, 'Moderate')\
                   .otherwise('High')) \
        .withColumn('wind_category',
                   when(col('wind_speed') < 5, 'Calm')\
                   .when(col('wind_speed') < 10, 'Light')\
                   .when(col('wind_speed') < 20, 'Moderate')\
                   .otherwise('Strong'))

    # Comfort index (simplified heat index calculation)
    df_transformed = df_transformed \
        .withColumn('heat_index',
                   col('temperature') + 0.5 * (col('humidity') / 100) *\
                   (col('temperature') - 14.5)) \
        .withColumn('comfort_score',
                   when((col('temperature').between(18, 24)) &\
                        (col('humidity').between(30, 60)) &\
                        (col('wind_speed') < 10), 100)\
                   .otherwise(100 - abs(col('temperature') - 21) * 3 -\
                             abs(col('humidity') - 50) * 0.5))

    # Weather severity score
    df_transformed = df_transformed \
        .withColumn('weather_severity',
                   (col('wind_speed') / 5) +\
                   (col('rain_1h') * 2) +\
                   (col('snow_1h') * 3) +\
                   when(col('visibility') < 1000, 5).otherwise(0))

    # Window functions for ranking
    window_temp = Window.orderBy(col('temperature').desc())
    window_humidity = Window.orderBy(col('humidity').desc())

    df_transformed = df_transformed \
        .withColumn('temp_rank', rank().over(window_temp)) \
        .withColumn('humidity_rank', rank().over(window_humidity))

    # Hemisphere classification
    df_transformed = df_transformed \
        .withColumn('hemisphere',
                   when(col('latitude') >= 0, 'Northern').otherwise('Southern'))

    return df_transformed

# Apply transformations
df_processed = transform_weather_data(df_raw)
print(f"Transformed DataFrame: {df_processed.count()} rows, {len(df_processed.columns)} columns")
print("\nNew columns added during transformation:")
new_cols = set(df_processed.columns) - set(df_raw.columns)
print(", ".join(sorted(new_cols)))

In [None]:
# Cell 8: Data Quality Checks
def perform_quality_checks(df):
    """Comprehensive data quality validation"""

    checks = {}

    # Null checks
    null_counts = df.select([
        sum(col(c).isNull().cast("int")).alias(c)
        for c in df.columns
    ])
    checks['null_check'] = null_counts

    # Range checks
    checks['temp_range'] = df.filter(
        (col('temperature') < -50) | (col('temperature') > 60)
    ).count()

    checks['humidity_range'] = df.filter(
        (col('humidity') < 0) | (col('humidity') > 100)
    ).count()

    checks['pressure_range'] = df.filter(
        (col('pressure') < 900) | (col('pressure') > 1100)
    ).count()

    # Duplicate checks
    checks['duplicate_cities'] = df.count() - df.select('city', 'country').distinct().count()

    # Completeness check
    checks['total_records'] = df.count()
    checks['complete_records'] = df.dropna().count()
    checks['completeness_pct'] = (checks['complete_records'] / checks['total_records']) * 100

    return checks

quality_results = perform_quality_checks(df_processed)

print("=== Data Quality Report ===\n")
print(f"Total records: {quality_results['total_records']}")
print(f"Complete records: {quality_results['complete_records']}")
print(f"Completeness: {quality_results['completeness_pct']:.2f}%\n")

print("Range validation issues:")
print(f"  Temperature outliers: {quality_results['temp_range']}")
print(f"  Humidity outliers: {quality_results['humidity_range']}")
print(f"  Pressure outliers: {quality_results['pressure_range']}")
print(f"  Duplicate cities: {quality_results['duplicate_cities']}")