# 2. Feature Pipeline

## 2.1. Setup

### 2.1.1. Import Libraries and Initialize Hopsworks Connection

In [None]:
# Standard imports
import os
import sys
import json
import time
from datetime import date, datetime, timedelta
import warnings
from pathlib import Path
warnings.filterwarnings("ignore", module="IPython")

#  Establish project root directory
def find_project_root(start: Path):
    for parent in [start] + list(start.parents):
        if (parent / "pyproject.toml").exists():
            return parent
    return start

root_dir = find_project_root(Path().absolute())
print("Project root dir:", root_dir)

if str(root_dir) not in sys.path:
    sys.path.append(str(root_dir))

# Third-party imports
import requests
import pandas as pd
import great_expectations as gx
import hopsworks
from urllib3.exceptions import ProtocolError  
from requests.exceptions import ConnectionError, Timeout
from confluent_kafka import KafkaException
import numpy as np

#  Project imports
from utils import cleaning, config, feature_engineering, fetchers, hopsworks_admin, incremental, metadata

#  Load settings 
settings = config.HopsworksSettings()
HOPSWORKS_API_KEY = settings.HOPSWORKS_API_KEY.get_secret_value()
GITHUB_USERNAME = settings.GH_USERNAME.get_secret_value()

# Login to Hopsworks
project = hopsworks.login(api_key_value=HOPSWORKS_API_KEY)
fs = project.get_feature_store()

### 2.1.2. Repository management

In [None]:
repo_dir = hopsworks_admin.clone_or_update_repo(GITHUB_USERNAME)
os.chdir(repo_dir)

### 2.1.3. Configure API Keys and Secrets

In [None]:
today = date.today()

if settings.AQICN_API_KEY is None:
    print("AQICN_API_KEY missing.")
    sys.exit(1)

AQICN_API_KEY = settings.AQICN_API_KEY.get_secret_value()

secrets = hopsworks.get_secrets_api()
try:
    secret = secrets.get_secret("AQICN_API_KEY")
    if secret is not None:
        secret.delete()
except Exception:
    pass

secrets.create_secret("AQICN_API_KEY", AQICN_API_KEY)

## 2.2. Get Feature Groups

In [None]:
air_quality_fg, weather_fg = hopsworks_admin.create_feature_groups(fs)

## 2.3. Load Sensor Locations from Feature Group

In [None]:
# Load data from air_quality feature group
aq_data = air_quality_fg.read()

if len(aq_data) == 0:
    print("‚ö†Ô∏è No air quality data found. Run pipeline 1 (backfill) first.")
    sys.exit(1)

# Build sensor location dictionary: sensor_id -> (lat, lon, city, street, country, aqicn_url)
sensor_locations = {}
existing_aq_data = air_quality_fg.read()
existing_sensors = set(existing_aq_data["sensor_id"].unique())
print(f"üìã Found {len(existing_sensors)} sensors in feature store")

# Build location dict
for _, row in existing_aq_data[["sensor_id", "latitude", "longitude", "city", "street", "country"]].drop_duplicates(subset=["sensor_id"]).iterrows():
    sensor_locations[row["sensor_id"]] = (
        row["latitude"], 
        row["longitude"], 
        row["city"], 
        row["street"], 
        row["country"]
    )
print(f"üìç Loaded locations for {len(sensor_locations)} existing sensors")

## 2.4. Data Collection
Loop through all sensors to fetch today's air quality data and weather forecasts, format data to match feature group schemas.

### 2.4.1. Initialize Processing

In [None]:
print(f"üîç Processing {len(sensor_locations)} sensor locations.")

### 2.4.2. Load Historical Air Quality Data (Last 4 Days)

In [None]:
historical_start = today - timedelta(days=4)
try:
    historical_df = air_quality_fg.read()
    if not historical_df.empty:
        historical_df["date"] = pd.to_datetime(historical_df["date"]).dt.tz_localize(None)
        today_dt = pd.to_datetime(today)
        historical_start_dt = pd.to_datetime(historical_start)
        
        historical_df = historical_df[
            (historical_df["date"] >= historical_start_dt) & 
            (historical_df["date"] <= today_dt) 
        ][["date", "sensor_id", "pm25"]]
        
        historical_df = historical_df[historical_df["sensor_id"].isin(sensor_locations.keys())]
    else:
        historical_df = pd.DataFrame()
except Exception as e:
    print(f"‚ö†Ô∏è Error reading historical data: {e}")
    historical_df = pd.DataFrame()

### 2.4.3. Identify Missing Dates for Backfill

In [None]:
existing_dates = air_quality_fg.read()["date"].dt.date.unique()

today = datetime.today().date()
start_date = today - timedelta(days=7)  # Check last 7 days for missing data

expected_dates = pd.date_range(start=start_date, end=today, freq="D").date
missing_dates = [d for d in expected_dates if d not in existing_dates]

# print(f"üìÖ Missing dates to backfill: {missing_dates}")
formatted = ", ".join(d.isoformat() for d in missing_dates)
print(f"üìÖ Missing dates to backfill: {formatted}")



In [None]:
# # Initialize containers for results
# aq_list = []
# weather_dict = {}  # sensor_id -> weather_df

# # Determine missing dates
# existing_dates = air_quality_fg.read()["date"].dt.date.unique()

# today = datetime.today().date()
# start_date = today - timedelta(days=7)  # or however far back you want to check

# expected_dates = pd.date_range(start=start_date, end=today, freq="D").date
# missing_dates = [d for d in expected_dates if d not in existing_dates]

### 2.4.4. Prepare Historical Data Window

In [None]:
historical_cutoff = pd.to_datetime(min(missing_dates)) - pd.Timedelta(days=3)
historical = air_quality_fg.read()
historical["date"] = pd.to_datetime(historical["date"]).dt.tz_localize(None)
historical = historical [historical["date"] >= historical_cutoff]

### 2.4.5. Track Existing Sensor-Date Pairs

In [None]:
existing = historical[["sensor_id", "date"]].copy()
existing["date_only"] = existing["date"].dt.date
existing_keys = set(zip(existing["sensor_id"], existing["date_only"]))

### 2.4.6. Initialize Data Containers

In [None]:
all_aq_rows = [historical]
all_weather_rows = []

### 2.4.7. Fetch Missing Air Quality Data

In [None]:
count = 1
for sensor_id, meta in sensor_locations.items():
    print(f"Fetching air quality for sensor {sensor_id}, {count}/{len(sensor_locations)}")
    count += 1
    for day in missing_dates:
        # Skip any sensor date combination that already exists
        if (sensor_id, day) in existing_keys:
            continue
        try:
            aq_df = fetchers.get_pm25(
                meta["aqicn_url"], meta["country"], meta["city"],
                meta["street"], day, AQICN_API_KEY
            )
            if aq_df.empty or aq_df["pm25"].isna().all():
                continue

            aq_df["sensor_id"] = int(sensor_id)
            aq_df["pm25"] = pd.to_numeric(aq_df["pm25"], errors="coerce")
            aq_df["date"] = pd.to_datetime(aq_df["date"]).dt.tz_localize(None)
            
            # Add metadata columns
            aq_df["city"] = meta["city"]
            aq_df["street"] = meta["street"]
            aq_df["country"] = meta["country"]
            aq_df["aqicn_url"] = meta["aqicn_url"]
            aq_df["latitude"] = meta["latitude"]
            aq_df["longitude"] = meta["longitude"]
            
            aq_df = aq_df.drop(columns=["url"], errors="ignore")

            all_aq_rows.append(aq_df)

        except Exception as e:
            print(f"‚ùå Sensor {sensor_id} on {day}: {type(e).__name__}")
            continue

print(f"üìä Collected {len(all_aq_rows)} air quality dataframes")

### 2.4.8. Fetch Missing Weather Forecast Data

In [None]:
count = 1
for sensor_id, meta in sensor_locations.items():
    print(f"Fetching weather for sensor {sensor_id}, {count}/{len(sensor_locations)}")
    count += 1
    
    for day in missing_dates:
        try:
            # Fetch 7-day weather forecast starting from the missing date
            weather_df = fetchers.get_weather_forecast(
                sensor_id=sensor_id,
                latitude=meta["latitude"],
                longitude=meta["longitude"],
                start_date=day,
                end_date=day + timedelta(days=6)
            )
            
            if weather_df.empty:
                continue
            
            weather_df["sensor_id"] = int(sensor_id)
            weather_df["date"] = pd.to_datetime(weather_df["date"]).dt.normalize().dt.tz_localize(None)
            
            all_weather_rows.append(weather_df)
            
        except Exception as e:
            print(f"‚ùå Weather for sensor {sensor_id} on {day}: {type(e).__name__}")
            continue

print(f"üìä Collected {len(all_weather_rows)} weather dataframes")

### 2.4.9. Clean and Align Data Structure

In [None]:
cleaned_aq_rows = []
expected_cols = historical.columns.tolist()

for i, df in enumerate(all_aq_rows):
    if df.empty or "pm25" not in df.columns or df["pm25"].isna().all():
        print(f"‚ö†Ô∏è Skipping empty or invalid df[{i}]")
        continue

    df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None)

    # Skip if too few expected columns are present
    if len(set(df.columns) & set(expected_cols)) < 3:
        print(f"‚ö†Ô∏è Skipping malformed df[{i}] with columns: {list(df.columns)}")
        continue

    # Align columns
    aligned = df.reindex(columns=expected_cols, fill_value=np.nan)

    # Final sanity check
    if aligned.shape[1] != len(expected_cols):
        print(f"‚ùå Still malformed after alignment: df[{i}] shape={aligned.shape}")
        continue

    # Force dtype alignment to match historical
    for col in expected_cols:
        if col in historical.columns:
            try:
                aligned[col] = aligned[col].astype(historical[col].dtype, errors="raise")
            except Exception as e:
                print(f"‚ö†Ô∏è Could not cast column '{col}' in df[{i}]: {e}")
                continue

    cleaned_aq_rows.append(aligned)

# Verify that column names and dtypes match
print("üìã Column names match:", all(df.columns.equals(historical.columns) for df in cleaned_aq_rows))

no_mismatch = True
for i, df in enumerate(cleaned_aq_rows):
    mismatched = [(col, df[col].dtype, historical[col].dtype)
                  for col in df.columns if col in historical.columns and df[col].dtype != historical[col].dtype]
    if mismatched:
        print("üìã Dtype mismatch:")
        print(f"  df[{i}] mismatches: {mismatched}")
        no_mismatch = False
if no_mismatch:
    print("üìã All dtypes match historical data.")
else:
    print("‚ö†Ô∏è Some dtypes do not match historical data.")

### 2.4.10. Combine and Clean Weather Data

In [None]:
if all_weather_rows:
    all_weather = pd.concat(all_weather_rows, ignore_index=True)
    all_weather = all_weather.sort_values(["sensor_id", "date"]).reset_index(drop=True)
    all_weather["date"] = pd.to_datetime(all_weather["date"]).dt.tz_localize(None)
    
    # Remove duplicates (same sensor, same forecast date)
    all_weather = all_weather.drop_duplicates(subset=["sensor_id", "date"], keep="first")
    
    print(f"üå§Ô∏è Total weather records: {len(all_weather)}")
    print(f"üìÖ Weather date range: {all_weather['date'].min()} to {all_weather['date'].max()}")
else:
    all_weather = pd.DataFrame()
    print("‚ö†Ô∏è No weather data collected")

## 2.5. Combine Data and Add Engineered Features

In [None]:
# Combine data
all_aq = pd.concat([historical, *cleaned_aq_rows], ignore_index=True)
all_aq = all_aq.sort_values(["sensor_id", "date"]).reset_index(drop=True)
all_aq["date"] = pd.to_datetime(all_aq["date"]).dt.tz_localize(None)

# Add engineered features
all_aq = feature_engineering.add_rolling_window_feature(all_aq, window_days=3)
all_aq = feature_engineering.add_lagged_features(all_aq, lags=[1, 2, 3])

# Pass sensor_locations dict to nearby sensor feature
all_aq = feature_engineering.add_nearby_sensor_feature(all_aq, sensor_locations, n_closest=3)

## 2.6. Insert Data to Feature Groups

### 2.6.1. Batch Insert Air Quality Data by Date

In [None]:
for day in missing_dates:
    day_rows = all_aq[all_aq["date"].dt.date == day].copy()
    day_rows = day_rows.dropna(subset=["pm25"])

    engineered_cols = [c for c in day_rows.columns if "lag" in c or "rolling" in c or "nearby" in c]
    day_rows = day_rows.dropna(subset=engineered_cols, how="any")

    if not day_rows.empty:
        # Convert types to match feature group schema
        day_rows = day_rows.astype({
            "sensor_id": "int32",
            "pm25": "float64",
            "pm25_lag_1d": "float64",
            "pm25_lag_2d": "float64",
            "pm25_lag_3d": "float64",
            "pm25_rolling_3d": "float64",
            "pm25_nearby_avg": "float64",
            "city": "string",
            "street": "string",
            "country": "string",
            "aqicn_url": "string",
            "latitude": "float64",
            "longitude": "float64",
        })
        
        # Ensure correct column order
        fg_columns = [f.name for f in air_quality_fg.features]
        day_rows = day_rows[fg_columns]
        
        air_quality_fg.insert(day_rows)
        print(f"‚úÖ Inserted {len(day_rows)} rows for {day}")
    else:
        print(f"‚ö†Ô∏è No valid rows for {day}")

### 2.6.2. Verify Air Quality Insertion

In [None]:
print(all_aq[all_aq["date"].dt.date == today][["sensor_id", "date", "pm25", "pm25_lag_1d", "pm25_rolling_3d", "pm25_nearby_avg"]])

In [None]:
print(all_aq[all_aq["date"].dt.date == today - timedelta(days=1)])

### 2.6.3. Batch Insert Weather Forecast Data

In [None]:
if not all_weather.empty:
    # Convert types to match feature group schema
    all_weather = all_weather.astype({
        "sensor_id": "int32",
        "temperature_2m_mean": "float64",
        "precipitation_sum": "float64",
        "wind_speed_10m_max": "float64",
        "wind_direction_10m_dominant": "float64",
    })
    
    # Ensure correct column order
    weather_fg_columns = [f.name for f in weather_fg.features]
    all_weather = all_weather[weather_fg_columns]
    
    # Insert in smaller batches to avoid connection issues
    batch_size = 100
    total_inserted = 0
    
    for i in range(0, len(all_weather), batch_size):
        batch = all_weather.iloc[i:i+batch_size]
        max_retries = 3
        
        for attempt in range(max_retries):
            try:
                weather_fg.insert(batch)
                total_inserted += len(batch)
                print(f"‚úÖ Weather batch {i//batch_size + 1}: {len(batch)} records (total: {total_inserted}/{len(all_weather)})")
                break
            except (ProtocolError, ConnectionError, TimeoutError, KafkaException) as e:
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt
                    print(f"‚ö†Ô∏è Connection error on weather batch {i//batch_size + 1}, retrying in {wait_time}s...")
                    time.sleep(wait_time)
                else:
                    print(f"‚ùå Failed weather batch {i//batch_size + 1}")
                    failed_file = f"{root_dir}/failed_weather_batch_{today}_{i}.csv"
                    batch.to_csv(failed_file, index=False)
                    print(f"üíæ Saved to {failed_file}")
    
    print(f"üå§Ô∏è Total weather inserted: {total_inserted}/{len(all_weather)} records")
else:
    print("‚ö†Ô∏è No weather data to insert")

### 2.6.4. Print Processing Summary

In [None]:
# print(f"\nüìä Summary: ‚úÖ {successful} successful, ‚è≠Ô∏è {skipped} skipped, ‚ùå {failed} failed")

## 2.7. Inspect Inserted Data

In [None]:
if 'all_aq' in locals() and not all_aq.empty:
    print(f"‚úÖ Air quality records inserted: {len(all_aq)}")
    print("\nüìã Sample air quality data:")
    print(all_aq.head())
    print("\nüîß Air quality data types:")
    print(all_aq.dtypes)
    print("\nüìÖ Date range:")
    print(f"From {all_aq['date'].min()} to {all_aq['date'].max()}")

if 'all_weather' in locals() and not all_weather.empty:
    print(f"\nüå§Ô∏è Weather records inserted: {len(all_weather)}")
    print("\nüìã Sample weather data:")
    print(all_weather.head())
    print("\nüîß Weather data types:")
    print(all_weather.dtypes)
    print("\nüìÖ Unique weather dates:")
    print(all_weather['date'].unique())