In [None]:
import pandas as pd
import numpy as np
import ast
import os
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import concurrent.futures
import time
import random

# Ensure we are in the project root
if os.path.basename(os.getcwd()) == "notebooks_clean":
    os.chdir("..")
print(f"Current Working Directory: {os.getcwd()}")

## **1. Load & Clean Data**

In [None]:
traffic_df = pd.read_csv('datasets/traffic_dataset.csv')
incident_df = pd.read_csv('datasets/incidents.csv')
graph_df = pd.read_csv('datasets/graph_dataset.csv')

# Clean Graph Data (Remove suffixes, average lat/long)
graph_df['camera_name'] = graph_df['camera_name'].str.replace(r'-(inbound|outbound)$', '', regex=True)
graph_merged = graph_df.groupby('camera_name').agg({
    'latitude': 'mean',
    'longitude': 'mean',
    'sensor_id': 'first'
}).reset_index()

# Merge Traffic with Graph
suro_df = pd.merge(traffic_df, graph_merged, on='camera_name', how='left')

# Parse Timestamps
suro_df['timestamp'] = suro_df['timestamp'].apply(ast.literal_eval)
suro_df['hour'] = suro_df['timestamp'].apply(lambda x: x[0])
suro_df['minute'] = suro_df['timestamp'].apply(lambda x: x[1])

# Create Datetime Column
suro_df['date'] = suro_df['img_jpg'].apply(lambda x: "-".join(x.split("-")[:3]))
suro_df['datetime'] = pd.to_datetime(
    suro_df['date'] + ' ' + 
    suro_df['hour'].astype(str) + ':' + 
    suro_df['minute'].astype(str) + ':00'
)

print(f"Merged Data Shape: {suro_df.shape}")
suro_df.head(2)

## **2. Merge Incidents (Time-Based)**
Map incidents to traffic data using `merge_asof` to find incidents that occurred shortly before the traffic recording.

In [None]:
# Prepare Incident Data
incident_df['affected_sensors'] = incident_df['affected_sensors'].apply(ast.literal_eval)
incident_exploded = incident_df.explode('affected_sensors').rename(columns={'affected_sensors': 'sensor_id'})
incident_exploded['datetime'] = pd.to_datetime(incident_exploded['time'])
incident_exploded['sensor_id'] = incident_exploded['sensor_id'].astype(int)

# Clean for Merge
suro_clean = suro_df.dropna(subset=['sensor_id', 'datetime']).copy()
suro_clean['sensor_id'] = suro_clean['sensor_id'].astype(int)
suro_sorted = suro_clean.sort_values(['sensor_id', 'datetime']).reset_index(drop=True)

inc_clean = incident_exploded.dropna(subset=['sensor_id', 'datetime']).copy()
inc_sorted = inc_clean.sort_values(['sensor_id', 'datetime']).reset_index(drop=True)
inc_cols = inc_sorted[['sensor_id', 'datetime', 'incident_id']]

# Execute Merge (Chunked by Sensor)
result_chunks = []
unique_sensors = sorted(suro_sorted['sensor_id'].unique())

print("Merging incidents...")
for sensor_id in unique_sensors:
    s_chunk = suro_sorted[suro_sorted['sensor_id'] == sensor_id]
    i_chunk = inc_cols[inc_cols['sensor_id'] == sensor_id]
    
    if len(i_chunk) == 0:
        s_chunk['incident_id'] = None
        result_chunks.append(s_chunk)
    else:
        merged = pd.merge_asof(
            s_chunk,
            i_chunk,
            on='datetime',
            direction='backward',
            tolerance=pd.Timedelta('10min')
        )
        result_chunks.append(merged)

suro_final = pd.concat(result_chunks, ignore_index=True)
suro_final['incident_flag'] = suro_final['incident_id'].notna().astype(int)
suro_final.drop(columns=['incident_id'], inplace=True)

print(f"Incidents mapped. Flag count: {suro_final['incident_flag'].sum()}")

## **3. Enrich with Weather Data (Open-Meteo API)**
Robust fetching with retries, caching, and threading.

In [None]:
CACHE_FILE = 'datasets/weather_checkpoint.csv'
MAX_WORKERS = 2
BATCH_SIZE = 50

# Prepare Request Keys
suro_final['api_date'] = pd.to_datetime(suro_final['date']).dt.strftime('%Y-%m-%d')
unique_requests = suro_final[['camera_name', 'latitude', 'longitude', 'api_date']].drop_duplicates()
unique_requests['key'] = unique_requests['camera_name'] + '_' + unique_requests['api_date']

# Load Cache
if os.path.exists(CACHE_FILE):
    try:
        processed = pd.read_csv(CACHE_FILE, usecols=['camera_name', 'api_date'])
        processed_keys = set(processed['camera_name'] + '_' + processed['api_date'])
        print(f"Loaded {len(processed_keys)} cached weather records.")
    except:
        processed_keys = set()
else:
    processed_keys = set()
    pd.DataFrame(columns=['camera_name', 'api_date', 'hour', 'temperature_2m', 
                          'precipitation', 'rain', 'snowfall', 'wind_speed_10m']).to_csv(CACHE_FILE, index=False)

pending = unique_requests[~unique_requests['key'].isin(processed_keys)]
print(f"Pending requests: {len(pending)}")

# Setup Session
session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))

def fetch_weather(row):
    try:
        time.sleep(random.uniform(0.5, 1.5))
        params = {
            'latitude': row['latitude'],
            'longitude': row['longitude'],
            'start_date': row['api_date'],
            'end_date': row['api_date'],
            'hourly': 'temperature_2m,precipitation,rain,snowfall,wind_speed_10m',
            'timezone': 'UTC'
        }
        resp = session.get("https://archive-api.open-meteo.com/v1/archive", params=params, timeout=30)
        if resp.status_code == 200:
            data = resp.json()
            hourly = data.get('hourly', {})
            return [{
                'camera_name': row['camera_name'],
                'api_date': row['api_date'],
                'hour': h,
                'temperature_2m': hourly['temperature_2m'][h],
                'precipitation': hourly['precipitation'][h],
                'rain': hourly['rain'][h],
                'snowfall': hourly['snowfall'][h],
                'wind_speed_10m': hourly['wind_speed_10m'][h]
            } for h in range(24)]
    except Exception as e:
        print(f"Error fetching {row['camera_name']}: {e}")
    return []

# Execute
if len(pending) > 0:
    buffer = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(fetch_weather, row): row for _, row in pending.iterrows()}
        for i, future in enumerate(concurrent.futures.as_completed(futures)):
            res = future.result()
            if res: buffer.extend(res)
            if len(buffer) > 0 and (i % BATCH_SIZE == 0 or i == len(pending)-1):
                pd.DataFrame(buffer).to_csv(CACHE_FILE, mode='a', header=False, index=False)
                buffer = []
                print(f"Saved progress: {i}/{len(pending)}")

# Merge Weather
weather_data = pd.read_csv(CACHE_FILE)
weather_data.drop_duplicates(subset=['camera_name', 'api_date', 'hour'], inplace=True)
suro_final = pd.merge(suro_final, weather_data, on=['camera_name', 'api_date', 'hour'], how='left')
suro_final.drop(columns=['api_date'], inplace=True)

suro_final.to_csv("datasets/suro_dataset_final.csv", index=False)
print("Final dataset saved to datasets/suro_dataset_final.csv")
suro_final.head()