## 1. Setup & Configuration

In [None]:
# Install required packages (run once)
# !pip install requests pandas python-dotenv matplotlib seaborn

In [1]:
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import time
import os
from dotenv import load_dotenv
import json

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Set display options
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

print("‚úÖ Libraries imported successfully!")

‚úÖ Libraries imported successfully!


In [2]:
# Load environment variables
load_dotenv('../.env')

# Configuration
API_KEY = os.getenv('OPENWEATHER_API_KEY', 'your_api_key_here')

# Islamabad, Pakistan coordinates
LAT = 33.6844
LON = 73.0479

print(f"üìç Location: Islamabad, Pakistan")
print(f"üìç Coordinates: Lat={LAT}, Lon={LON}")
print(f"üîë API Key configured: {'Yes ‚úÖ' if API_KEY != 'your_api_key_here' else 'No ‚ùå - Please set OPENWEATHER_API_KEY'}")
print(f"üîë API Key (masked): {API_KEY[:8]}...{API_KEY[-4:]}")

üìç Location: Islamabad, Pakistan
üìç Coordinates: Lat=33.6844, Lon=73.0479
üîë API Key configured: Yes ‚úÖ
üîë API Key (masked): 91bd932c...a08d


---
## 2. EXTRACT: Test API Connections

### 2.1 Test One Call 3.0 API (Weather Data)

In [3]:
# One Call 3.0 API - Current Weather
ONECALL_URL = "https://api.openweathermap.org/data/3.0/onecall"

def test_onecall_current():
    """Test One Call 3.0 API for current weather data."""
    params = {
        'lat': LAT,
        'lon': LON,
        'appid': API_KEY,
        'units': 'metric',
        'exclude': 'minutely,hourly,daily,alerts'
    }
    
    try:
        response = requests.get(ONECALL_URL, params=params, timeout=10)
        print(f"Status Code: {response.status_code}")
        
        if response.status_code == 200:
            data = response.json()
            print("\n‚úÖ One Call 3.0 API - SUCCESS!")
            return data
        else:
            print(f"\n‚ùå Error: {response.json()}")
            return None
    except Exception as e:
        print(f"\n‚ùå Exception: {e}")
        return None

weather_response = test_onecall_current()

Status Code: 200

‚úÖ One Call 3.0 API - SUCCESS!


In [4]:
# Inspect weather response structure
if weather_response:
    print("üìä Weather Response Structure:")
    print(json.dumps(weather_response, indent=2))
else:
    print("‚ö†Ô∏è No weather data to display. Check API key.")

üìä Weather Response Structure:
{
  "lat": 33.6844,
  "lon": 73.0479,
  "timezone": "Asia/Karachi",
  "timezone_offset": 18000,
  "current": {
    "dt": 1768047440,
    "sunrise": 1768011179,
    "sunset": 1768047416,
    "temp": 12.86,
    "feels_like": 10.75,
    "pressure": 1018,
    "humidity": 21,
    "dew_point": -7.93,
    "uvi": 0,
    "clouds": 0,
    "visibility": 10000,
    "wind_speed": 2.64,
    "wind_deg": 292,
    "wind_gust": 3.32,
    "weather": [
      {
        "id": 800,
        "main": "Clear",
        "description": "clear sky",
        "icon": "01n"
      }
    ]
  }
}


In [5]:
# Extract weather features from response
def parse_weather_current(data):
    """Parse current weather data from One Call 3.0 API."""
    if not data or 'current' not in data:
        return None
    
    current = data['current']
    
    weather_features = {
        'unix_time': current.get('dt'),
        'datetime': datetime.utcfromtimestamp(current.get('dt')),
        'temp': current.get('temp'),
        'feels_like': current.get('feels_like'),
        'humidity': current.get('humidity'),
        'pressure': current.get('pressure'),
        'wind_speed': current.get('wind_speed'),
        'wind_deg': current.get('wind_deg'),
        'clouds': current.get('clouds'),
        'visibility': current.get('visibility'),
        'dew_point': current.get('dew_point'),
        'uvi': current.get('uvi'),
    }
    
    return weather_features

if weather_response:
    weather_features = parse_weather_current(weather_response)
    print("üå§Ô∏è Extracted Weather Features:")
    for key, value in weather_features.items():
        print(f"  {key}: {value}")

üå§Ô∏è Extracted Weather Features:
  unix_time: 1768047440
  datetime: 2026-01-10 12:17:20
  temp: 12.86
  feels_like: 10.75
  humidity: 21
  pressure: 1018
  wind_speed: 2.64
  wind_deg: 292
  clouds: 0
  visibility: 10000
  dew_point: -7.93
  uvi: 0


### 2.2 Test Air Pollution API (Pollutant Data)

In [4]:
# Air Pollution API - Current
POLLUTION_URL = "http://api.openweathermap.org/data/2.5/air_pollution"

def test_pollution_current():
    """Test Air Pollution API for current pollution data."""
    params = {
        'lat': LAT,
        'lon': LON,
        'appid': API_KEY
    }
    
    try:
        response = requests.get(POLLUTION_URL, params=params, timeout=10)
        print(f"Status Code: {response.status_code}")
        
        if response.status_code == 200:
            data = response.json()
            print("\n‚úÖ Air Pollution API - SUCCESS!")
            return data
        else:
            print(f"\n‚ùå Error: {response.json()}")
            return None
    except Exception as e:
        print(f"\n‚ùå Exception: {e}")
        return None

pollution_response = test_pollution_current()

Status Code: 200

‚úÖ Air Pollution API - SUCCESS!


In [7]:
# Inspect pollution response structure
if pollution_response:
    print("üìä Pollution Response Structure:")
    print(json.dumps(pollution_response, indent=2))
else:
    print("‚ö†Ô∏è No pollution data to display. Check API key.")

üìä Pollution Response Structure:
{
  "coord": {
    "lon": 73.0479,
    "lat": 33.6844
  },
  "list": [
    {
      "main": {
        "aqi": 5
      },
      "components": {
        "co": 1474.22,
        "no": 0.09,
        "no2": 13.21,
        "o3": 132.26,
        "so2": 4.1,
        "pm2_5": 255.13,
        "pm10": 326.34,
        "nh3": 18.19
      },
      "dt": 1768047452
    }
  ]
}


In [8]:
# Extract pollution features from response
def parse_pollution_current(data):
    """Parse current pollution data from Air Pollution API."""
    if not data or 'list' not in data or len(data['list']) == 0:
        return None
    
    item = data['list'][0]
    components = item.get('components', {})
    main = item.get('main', {})
    
    pollution_features = {
        'unix_time': item.get('dt'),
        'datetime': datetime.utcfromtimestamp(item.get('dt')),
        'aqi': main.get('aqi'),  # 1-5 scale
        'pm2_5': components.get('pm2_5'),
        'pm10': components.get('pm10'),
        'no2': components.get('no2'),
        'so2': components.get('so2'),
        'co': components.get('co'),
        'o3': components.get('o3'),
        'nh3': components.get('nh3'),
        'no': components.get('no'),
    }
    
    return pollution_features

if pollution_response:
    pollution_features = parse_pollution_current(pollution_response)
    print("üè≠ Extracted Pollution Features:")
    for key, value in pollution_features.items():
        print(f"  {key}: {value}")

üè≠ Extracted Pollution Features:
  unix_time: 1768047452
  datetime: 2026-01-10 12:17:32
  aqi: 5
  pm2_5: 255.13
  pm10: 326.34
  no2: 13.21
  so2: 4.1
  co: 1474.22
  o3: 132.26
  nh3: 18.19
  no: 0.09


### 2.3 Test Historical Data APIs

In [None]:
# One Call 3.0 Time Machine API - Historical Weather
TIMEMACHINE_URL = "https://api.openweathermap.org/data/3.0/onecall/timemachine"

def fetch_historical_weather(target_date):
    """Fetch historical weather data for a specific date."""
    unix_ts = int(target_date.timestamp())
    
    params = {
        'lat': LAT,
        'lon': LON,
        'dt': unix_ts,
        'appid': API_KEY,
        'units': 'metric'
    }
    
    try:
        response = requests.get(TIMEMACHINE_URL, params=params, timeout=10)
        if response.status_code == 200:
            return response.json()
        else:
            print(f"‚ùå Error: {response.status_code} - {response.text}")
            return None
    except Exception as e:
        print(f"‚ùå Exception: {e}")
        return None

# Test with yesterday's date
yesterday = datetime.now() - timedelta(days=1)
print(f"üìÖ Fetching historical weather for: {yesterday.strftime('%Y-%m-%d')}")

hist_weather = fetch_historical_weather(yesterday)
if hist_weather:
    print("‚úÖ Historical weather data fetched successfully!")
    print(f"\nData keys: {hist_weather.keys()}")

In [None]:
# Air Pollution History API
POLLUTION_HISTORY_URL = "http://api.openweathermap.org/data/2.5/air_pollution/history"

def fetch_historical_pollution(start_date, end_date):
    """Fetch historical pollution data for a date range."""
    start_unix = int(start_date.timestamp())
    end_unix = int(end_date.timestamp())
    
    params = {
        'lat': LAT,
        'lon': LON,
        'start': start_unix,
        'end': end_unix,
        'appid': API_KEY
    }
    
    try:
        response = requests.get(POLLUTION_HISTORY_URL, params=params, timeout=10)
        if response.status_code == 200:
            return response.json()
        else:
            print(f"‚ùå Error: {response.status_code} - {response.text}")
            return None
    except Exception as e:
        print(f"‚ùå Exception: {e}")
        return None

# Test with last 3 days
end_date = datetime.now()
start_date = end_date - timedelta(days=3)
print(f"üìÖ Fetching pollution history: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")

hist_pollution = fetch_historical_pollution(start_date, end_date)
if hist_pollution:
    print(f"‚úÖ Historical pollution data fetched successfully!")
    print(f"üìä Number of records: {len(hist_pollution.get('list', []))}")

---
## 3. TRANSFORM: Data Processing & Merging

In [None]:
def parse_historical_weather(data):
    """Parse historical weather data into a DataFrame."""
    if not data or 'data' not in data:
        return pd.DataFrame()
    
    records = []
    for item in data['data']:
        record = {
            'unix_time': item.get('dt'),
            'temp': item.get('temp'),
            'feels_like': item.get('feels_like'),
            'humidity': item.get('humidity'),
            'pressure': item.get('pressure'),
            'wind_speed': item.get('wind_speed'),
            'wind_deg': item.get('wind_deg'),
            'clouds': item.get('clouds'),
            'visibility': item.get('visibility'),
            'dew_point': item.get('dew_point'),
            'uvi': item.get('uvi'),
        }
        records.append(record)
    
    df = pd.DataFrame(records)
    if not df.empty:
        df['datetime'] = pd.to_datetime(df['unix_time'], unit='s')
        # Round to nearest hour for merging
        df['hour_ts'] = (df['unix_time'] // 3600) * 3600
    
    return df

# Parse historical weather
if hist_weather:
    weather_df = parse_historical_weather(hist_weather)
    print(f"üå§Ô∏è Weather DataFrame Shape: {weather_df.shape}")
    display(weather_df.head())

In [None]:
def parse_historical_pollution(data):
    """Parse historical pollution data into a DataFrame."""
    if not data or 'list' not in data:
        return pd.DataFrame()
    
    records = []
    for item in data['list']:
        components = item.get('components', {})
        main = item.get('main', {})
        
        record = {
            'unix_time': item.get('dt'),
            'aqi': main.get('aqi'),
            'pm2_5': components.get('pm2_5'),
            'pm10': components.get('pm10'),
            'no2': components.get('no2'),
            'so2': components.get('so2'),
            'co': components.get('co'),
            'o3': components.get('o3'),
            'nh3': components.get('nh3'),
            'no': components.get('no'),
        }
        records.append(record)
    
    df = pd.DataFrame(records)
    if not df.empty:
        df['datetime'] = pd.to_datetime(df['unix_time'], unit='s')
        # Round to nearest hour for merging
        df['hour_ts'] = (df['unix_time'] // 3600) * 3600
    
    return df

# Parse historical pollution
if hist_pollution:
    pollution_df = parse_historical_pollution(hist_pollution)
    print(f"üè≠ Pollution DataFrame Shape: {pollution_df.shape}")
    display(pollution_df.head())

In [None]:
def merge_weather_pollution(weather_df, pollution_df):
    """
    Merge weather and pollution data by hour timestamp.
    This creates the complete feature set for ML training.
    """
    if weather_df.empty or pollution_df.empty:
        print("‚ö†Ô∏è One or both DataFrames are empty!")
        return pd.DataFrame()
    
    # Merge on hour_ts
    merged_df = pd.merge(
        weather_df,
        pollution_df,
        on='hour_ts',
        how='inner',
        suffixes=('_weather', '_pollution')
    )
    
    # Clean up duplicate columns
    if 'unix_time_weather' in merged_df.columns:
        merged_df['unix_time'] = merged_df['unix_time_weather']
        merged_df = merged_df.drop(['unix_time_weather', 'unix_time_pollution'], axis=1, errors='ignore')
    
    if 'datetime_weather' in merged_df.columns:
        merged_df['datetime'] = merged_df['datetime_weather']
        merged_df = merged_df.drop(['datetime_weather', 'datetime_pollution'], axis=1, errors='ignore')
    
    # Reorder columns
    cols_order = ['datetime', 'unix_time', 'hour_ts', 
                  'temp', 'feels_like', 'humidity', 'pressure', 'wind_speed', 'wind_deg',
                  'clouds', 'visibility', 'dew_point', 'uvi',
                  'aqi', 'pm2_5', 'pm10', 'no2', 'so2', 'co', 'o3', 'nh3', 'no']
    
    cols_present = [c for c in cols_order if c in merged_df.columns]
    merged_df = merged_df[cols_present]
    
    return merged_df

print("üîó This function will be used once we have matching data from both APIs.")

---
## 4. Complete ETL Pipeline Test

In [None]:
def fetch_and_merge_data(num_days=7):
    """
    Complete ETL pipeline to fetch and merge weather + pollution data.
    
    Args:
        num_days: Number of historical days to fetch
        
    Returns:
        DataFrame with merged weather and pollution features
    """
    print(f"\n{'='*60}")
    print(f"üöÄ Starting ETL Pipeline for {num_days} days")
    print(f"{'='*60}\n")
    
    all_weather = []
    all_pollution = []
    
    end_date = datetime.now()
    start_date = end_date - timedelta(days=num_days)
    
    # Step 1: Fetch Weather Data (day by day for Time Machine API)
    print("üì° Step 1: Extracting Weather Data...")
    for day_offset in range(num_days):
        target_date = end_date - timedelta(days=day_offset+1)
        print(f"  Fetching weather for {target_date.strftime('%Y-%m-%d')}...", end=" ")
        
        weather_data = fetch_historical_weather(target_date)
        if weather_data:
            df = parse_historical_weather(weather_data)
            if not df.empty:
                all_weather.append(df)
                print(f"‚úÖ ({len(df)} records)")
            else:
                print("‚ö†Ô∏è No data")
        else:
            print("‚ùå Failed")
        
        time.sleep(0.5)  # Rate limiting
    
    # Step 2: Fetch Pollution Data (range query)
    print(f"\nüì° Step 2: Extracting Pollution Data...")
    pollution_data = fetch_historical_pollution(start_date, end_date)
    if pollution_data:
        pollution_df = parse_historical_pollution(pollution_data)
        if not pollution_df.empty:
            print(f"  ‚úÖ Pollution data: {len(pollution_df)} records")
            all_pollution.append(pollution_df)
    
    # Step 3: Combine and Merge
    print(f"\nüîß Step 3: Transforming & Merging Data...")
    
    if all_weather:
        weather_df = pd.concat(all_weather, ignore_index=True)
        print(f"  Weather records: {len(weather_df)}")
    else:
        weather_df = pd.DataFrame()
        print("  ‚ö†Ô∏è No weather data collected")
    
    if all_pollution:
        pollution_df = pd.concat(all_pollution, ignore_index=True)
        print(f"  Pollution records: {len(pollution_df)}")
    else:
        pollution_df = pd.DataFrame()
        print("  ‚ö†Ô∏è No pollution data collected")
    
    # Merge
    if not weather_df.empty and not pollution_df.empty:
        merged_df = merge_weather_pollution(weather_df, pollution_df)
        print(f"  ‚úÖ Merged records: {len(merged_df)}")
    else:
        merged_df = pd.DataFrame()
        print("  ‚ùå Cannot merge - missing data")
    
    print(f"\n{'='*60}")
    print(f"‚úÖ ETL Pipeline Complete!")
    print(f"{'='*60}\n")
    
    return merged_df

# Run the pipeline (use small number for testing)
# merged_data = fetch_and_merge_data(num_days=3)
print("‚ö†Ô∏è Uncomment the line above to run the full ETL pipeline")

‚ö†Ô∏è Uncomment the line above to run the full ETL pipeline


---
## 5. Data Quality Checks

In [10]:
def data_quality_report(df, name="Dataset"):
    """
    Generate a data quality report for a DataFrame.
    """
    print(f"\n{'='*60}")
    print(f"üìä Data Quality Report: {name}")
    print(f"{'='*60}\n")
    
    print(f"üìè Shape: {df.shape[0]} rows √ó {df.shape[1]} columns")
    print(f"\nüìã Column Types:")
    print(df.dtypes)
    
    print(f"\nüîç Missing Values:")
    missing = df.isnull().sum()
    missing_pct = (missing / len(df) * 100).round(2)
    missing_report = pd.DataFrame({
        'Missing': missing,
        'Percentage': missing_pct
    })
    print(missing_report[missing_report['Missing'] > 0])
    
    print(f"\nüìà Numeric Statistics:")
    display(df.describe())
    
    return missing_report

# Example usage (will work once we have data)
# quality_report = data_quality_report(merged_data, "Merged AQI Data")
print("‚ö†Ô∏è Run the ETL pipeline first, then call data_quality_report()")

‚ö†Ô∏è Run the ETL pipeline first, then call data_quality_report()


---
## 6. Data Visualization

In [9]:
def visualize_aqi_data(df):
    """
    Create visualizations for AQI data.
    """
    if df.empty:
        print("‚ö†Ô∏è No data to visualize")
        return
    
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # 1. PM2.5 over time
    if 'pm2_5' in df.columns and 'datetime' in df.columns:
        axes[0, 0].plot(df['datetime'], df['pm2_5'], color='red', alpha=0.7)
        axes[0, 0].set_title('PM2.5 Concentration Over Time')
        axes[0, 0].set_xlabel('Date')
        axes[0, 0].set_ylabel('PM2.5 (Œºg/m¬≥)')
        axes[0, 0].tick_params(axis='x', rotation=45)
    
    # 2. Temperature vs PM2.5
    if 'temp' in df.columns and 'pm2_5' in df.columns:
        axes[0, 1].scatter(df['temp'], df['pm2_5'], alpha=0.5, c='blue')
        axes[0, 1].set_title('Temperature vs PM2.5')
        axes[0, 1].set_xlabel('Temperature (¬∞C)')
        axes[0, 1].set_ylabel('PM2.5 (Œºg/m¬≥)')
    
    # 3. Pollutant distribution
    pollutants = ['pm2_5', 'pm10', 'no2', 'o3']
    pollutants_present = [p for p in pollutants if p in df.columns]
    if pollutants_present:
        df[pollutants_present].boxplot(ax=axes[1, 0])
        axes[1, 0].set_title('Pollutant Distributions')
        axes[1, 0].set_ylabel('Concentration')
    
    # 4. Correlation heatmap
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 1:
        corr_matrix = df[numeric_cols].corr()
        sns.heatmap(corr_matrix, annot=False, cmap='coolwarm', ax=axes[1, 1])
        axes[1, 1].set_title('Feature Correlation Matrix')
    
    plt.tight_layout()
    plt.show()

# Example usage
# visualize_aqi_data(merged_data)
print("‚ö†Ô∏è Run the ETL pipeline first, then call visualize_aqi_data()")

‚ö†Ô∏è Run the ETL pipeline first, then call visualize_aqi_data()


---
## 7. LOAD: Save Data to Storage

In [None]:
def save_to_local(df, filename="aqi_data.csv"):
    """
    Save DataFrame to local storage.
    """
    output_path = f"../data/raw/{filename}"
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    df.to_csv(output_path, index=False)
    print(f"‚úÖ Data saved to: {output_path}")
    print(f"üìä Shape: {df.shape}")
    
    return output_path

# Example usage
# save_to_local(merged_data, "aqi_training_data.csv")
print("‚ö†Ô∏è Run the ETL pipeline first, then save the data")

---
## 8. Quick Test (Minimal API Calls)

Run this section to test the APIs with minimal calls.

In [None]:
def quick_api_test():
    """
    Quick test to verify both APIs are working.
    Makes only 2 API calls (1 per API).
    """
    print("üß™ Quick API Test\n")
    
    results = {
        'one_call_api': False,
        'pollution_api': False
    }
    
    # Test 1: One Call 3.0 API
    print("1Ô∏è‚É£ Testing One Call 3.0 API...", end=" ")
    weather = test_onecall_current()
    if weather and 'current' in weather:
        results['one_call_api'] = True
        temp = weather['current'].get('temp', 'N/A')
        print(f"   ‚Üí Current temp: {temp}¬∞C")
    
    # Test 2: Air Pollution API  
    print("\n2Ô∏è‚É£ Testing Air Pollution API...", end=" ")
    pollution = test_pollution_current()
    if pollution and 'list' in pollution:
        results['pollution_api'] = True
        pm25 = pollution['list'][0]['components'].get('pm2_5', 'N/A')
        print(f"   ‚Üí Current PM2.5: {pm25} Œºg/m¬≥")
    
    # Summary
    print(f"\n{'='*40}")
    print("üìã Test Results:")
    print(f"   One Call 3.0 API: {'‚úÖ Working' if results['one_call_api'] else '‚ùå Failed'}")
    print(f"   Air Pollution API: {'‚úÖ Working' if results['pollution_api'] else '‚ùå Failed'}")
    print(f"{'='*40}")
    
    if all(results.values()):
        print("\nüéâ All APIs working! Ready for full ETL pipeline.")
    else:
        print("\n‚ö†Ô∏è Some APIs failed. Check your API key and subscription.")
    
    return results

# Run quick test
# api_results = quick_api_test()
print("‚ö†Ô∏è Uncomment the line above to run the quick API test")

---
## üìå Summary

This notebook demonstrates the complete ETL pipeline for the Pearls AQI Predictor:

| Step | Description | Status |
|------|-------------|--------|
| **Extract** | Fetch data from One Call 3.0 & Air Pollution APIs | ‚úÖ Implemented |
| **Transform** | Parse, clean, and merge data by timestamp | ‚úÖ Implemented |
| **Load** | Save to local CSV (Feature Store ready) | ‚úÖ Implemented |

### Next Steps:
1. Set your `OPENWEATHER_API_KEY` in `.env` file
2. Run `quick_api_test()` to verify API connectivity
3. Run `fetch_and_merge_data()` to collect training data
4. Run `data_quality_report()` to validate data
5. Proceed to feature engineering notebook