# Weather Data Download (First 3 Features - Complete Data)

This notebook downloads weather data from Open-Meteo API for **the first 3 features from each CSV file**. This provides complete weather data for a manageable subset of flooding events.

## Features
- Downloads 24-hour weather data before peak flooding events
- **Processes first 3 samples** from each input CSV file
- Handles API rate limiting with retry mechanisms
- Optimized parameter selection (removes null-heavy parameters)
- Comprehensive logging and monitoring
- Ready to run - no comments to uncomment


## 1. Import Required Libraries


In [1]:
import pandas as pd
import requests
import os
import sys
from datetime import datetime, timedelta
import pytz
import time
import logging
import json
import signal
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from collections import defaultdict
import glob


## 2. API Rate Limiting Analysis

### Open-Meteo API Rate Limits (Free Tier)
- **Per minute**: 600 requests
- **Per hour**: 5,000 requests  
- **Per day**: 10,000 requests
- **Concurrent requests**: Recommended < 4 to avoid connection rejection


### Optimization Strategy
1. **Reduce concurrency**: From 6 to 2 threads
2. **Remove null-heavy parameters**: Eliminated 11 parameters with 100% null values
3. **Implement retry mechanism**: Exponential backoff for rate limit errors
4. **Add request intervals**: Prevent rapid successive requests


## 3. Optimized Weather Data Downloader Class


In [2]:
class OptimizedWeatherDownloader:
    """
    Optimized weather data downloader with rate limiting handling
    Downloads first 3 features from each CSV file
    """
    
    def __init__(self, output_dir, max_workers=2, max_samples=3):
        self.output_dir = output_dir
        self.max_workers = max_workers
        self.max_samples = max_samples  # Process first 3 samples from each file
        self.completed_samples = set()
        self.failed_samples = set()
        self.lock = threading.Lock()
        self.session_cache = {}
        
        # Optimized parameters (removed null-heavy ones)
        self.weather_params = [
            'temperature_2m', 'relative_humidity_2m', 'dewpoint_2m', 'apparent_temperature',
            'precipitation', 'rain', 'snowfall', 'weather_code', 'pressure_msl', 'surface_pressure',
            'cloud_cover', 'cloud_cover_low', 'cloud_cover_mid', 'cloud_cover_high',
            'et0_fao_evapotranspiration', 'vapour_pressure_deficit', 'wind_speed_10m', 'wind_speed_100m',
            'wind_direction_10m', 'wind_direction_100m', 'wind_gusts_10m'
        ]
        
        # Excluded parameters (high null rates)
        self.excluded_params = [
            'visibility', 'evapotranspiration', 'soil_temperature_0cm', 'soil_temperature_6cm',
            'soil_temperature_18cm', 'soil_temperature_54cm', 'soil_moisture_0_1cm',
            'soil_moisture_1_3cm', 'soil_moisture_3_9cm', 'soil_moisture_9_27cm', 'soil_moisture_27_81cm'
        ]
        
        os.makedirs(output_dir, exist_ok=True)
        self.setup_logging()
    
    def setup_logging(self):
        """Setup logging configuration"""
        log_dir = os.path.join(os.path.dirname(self.output_dir), 'logs')
        os.makedirs(log_dir, exist_ok=True)
        
        log_file = os.path.join(log_dir, f"weather_download_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler(sys.stdout)
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def get_session(self):
        """Get or create HTTP session for connection reuse"""
        thread_id = threading.current_thread().ident
        if thread_id not in self.session_cache:
            self.session_cache[thread_id] = requests.Session()
        return self.session_cache[thread_id]
    
    def build_api_url(self, lat, lon, start_date, end_date):
        """Build Open-Meteo API URL with optimized parameters"""
        params_str = ','.join(self.weather_params)
        return (
            f"https://archive-api.open-meteo.com/v1/archive?"
            f"latitude={lat}&longitude={lon}&start_date={start_date}&end_date={end_date}"
            f"&hourly={params_str}&timezone=UTC"
        )
    
    def fetch_weather_data(self, sample_id, lat, lon, peak_date, max_retries=3):
        """
        Fetch weather data with retry mechanism for rate limiting
        
        Args:
            sample_id: Unique identifier for the sample
            lat, lon: Coordinates
            peak_date: Peak flooding date
            max_retries: Maximum number of retry attempts
        """
        # Convert peak_date to UTC
        if peak_date.tzinfo is None:
            peak_date_utc = peak_date.replace(tzinfo=pytz.UTC)
        else:
            peak_date_utc = peak_date.astimezone(pytz.UTC)
        
        # Calculate 24-hour window before peak
        start_time = peak_date_utc - timedelta(hours=24)
        end_time = peak_date_utc - timedelta(hours=1)
        
        start_str = start_time.strftime("%Y-%m-%d")
        end_str = end_time.strftime("%Y-%m-%d")
        
        url = self.build_api_url(lat, lon, start_str, end_str)
        session = self.get_session()
        
        # Retry mechanism with exponential backoff
        for attempt in range(max_retries):
            try:
                if attempt > 0:
                    wait_time = 2 ** attempt  # Exponential backoff
                    self.logger.warning(f"Retrying after {wait_time}s, attempt {attempt+1}/{max_retries}")
                    time.sleep(wait_time)
                
                response = session.get(url, timeout=60)
                response.raise_for_status()
                data = response.json()
                
                if "hourly" not in data or "time" not in data["hourly"]:
                    raise ValueError("No hourly data available")
                
                return self.process_weather_data(data, sample_id, lat, lon, peak_date)
                
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 429:  # Rate limited
                    if attempt < max_retries - 1:
                        wait_time = 60 * (attempt + 1)  # Wait 1, 2, 3 minutes
                        self.logger.warning(f"Rate limited, waiting {wait_time}s before retry")
                        time.sleep(wait_time)
                        continue
                    else:
                        self.logger.error(f"Rate limit exceeded after {max_retries} attempts")
                        raise
                else:
                    self.logger.error(f"HTTP error: {e}")
                    raise
            except Exception as e:
                if attempt < max_retries - 1:
                    self.logger.warning(f"Request failed, retrying: {e}")
                    time.sleep(5)
                    continue
                else:
                    self.logger.error(f"Request failed after {max_retries} attempts: {e}")
                    raise
    
    def process_weather_data(self, data, sample_id, lat, lon, peak_date):
        """Process and save weather data"""
        df = pd.DataFrame(data["hourly"])
        df["time"] = pd.to_datetime(df["time"])
        df['time'] = pd.to_datetime(df['time']).dt.tz_localize(None)
        
        # Add metadata
        df['sample_id'] = sample_id
        df['latitude'] = lat
        df['longitude'] = lon
        df['peak_date'] = peak_date.strftime('%Y-%m-%d %H:%M:%S')
        df['hours_before_peak'] = (peak_date - df['time']).dt.total_seconds() / 3600
        
        # Save to CSV
        output_file = os.path.join(self.output_dir, f"{sample_id}.csv")
        df.to_csv(output_file, index=False)
        
        return len(df)
    
    def process_sample(self, sample_data):
        """Process a single sample"""
        sample_id, row = sample_data
        
        try:
            # Extract coordinates
            if 'latitude' in row and 'longitude' in row:
                lat, lon = row['latitude'], row['longitude']
            elif 'latitude_dd' in row and 'longitude_dd' in row:
                lat, lon = row['latitude_dd'], row['longitude_dd']
            else:
                raise ValueError("No coordinates found")
            
            # Extract peak date
            if 'peak_date' in row:
                peak_date = pd.to_datetime(row['peak_date'])
            elif 'matched_peak_date' in row:
                peak_date = pd.to_datetime(row['matched_peak_date'])
            else:
                raise ValueError("No peak date found")
            
            # Check if file already exists
            output_file = os.path.join(self.output_dir, f"{sample_id}.csv")
            if os.path.exists(output_file):
                return sample_id, "skipped", 0
            
            # Fetch weather data
            record_count = self.fetch_weather_data(sample_id, lat, lon, peak_date)
            
            with self.lock:
                self.completed_samples.add(sample_id)
            
            return sample_id, "completed", record_count
            
        except Exception as e:
            self.logger.error(f"Failed to process {sample_id}: {e}")
            with self.lock:
                self.failed_samples.add(sample_id)
            return sample_id, "failed", 0
    
    def download_all_samples(self, csv_files):
        """Download weather data for first 3 samples from each file"""
        all_samples = []
        
        # Load samples from CSV files
        for csv_file in csv_files:
            if os.path.exists(csv_file):
                self.logger.info(f"Loading {csv_file}")
                df = pd.read_csv(csv_file, low_memory=False)
                
                # Process first 3 samples
                df = df.head(self.max_samples)
                self.logger.info(f"  Processing first {len(df)} samples (from {len(pd.read_csv(csv_file, low_memory=False))} total)")
                
                for idx, row in df.iterrows():
                    sample_id = self.create_sample_id(row, csv_file, idx)
                    all_samples.append((sample_id, row))
        
        self.logger.info(f"Total samples to process: {len(all_samples)} (first {self.max_samples} from each file)")
        
        # Process samples with thread pool
        completed_count = 0
        failed_count = 0
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_sample = {executor.submit(self.process_sample, sample): sample[0] 
                              for sample in all_samples}
            
            for future in as_completed(future_to_sample):
                sample_id = future_to_sample[future]
                try:
                    result = future.result()
                    sample_id, status, count = result
                    
                    if status == "completed":
                        completed_count += 1
                        self.logger.info(f"Completed {sample_id}: {count} records")
                    elif status == "failed":
                        failed_count += 1
                        self.logger.error(f"Failed {sample_id}")
                    
                    # Progress update every 20 samples
                    if (completed_count + failed_count) % 20 == 0:
                        self.logger.info(f"Progress: {completed_count} completed, {failed_count} failed")
                        
                except Exception as e:
                    self.logger.error(f"Exception processing {sample_id}: {e}")
                    failed_count += 1
        
        # Cleanup
        for session in self.session_cache.values():
            session.close()
        
        self.logger.info(f"Download complete: {completed_count} successful, {failed_count} failed")
        return completed_count, failed_count
    
    def create_sample_id(self, row, csv_file, idx):
        """Create unique sample ID from row data"""
        # Handle Unified_Peak_Data file
        if 'site_no' in row and 'event' in row:
            site_no = str(row['site_no'])
            event_name = str(row['event']).replace(' ', '_')
            return f"{site_no}_{event_name}"
        
        # Handle HWMs_with_peaktime file
        elif 'site_no' in row and 'eventName' in row:
            site_no = str(row['site_no'])
            event_name = str(row['eventName']).replace(' ', '_')
            if event_name == 'nan':
                event_name = 'unknown_event'
            return f"{site_no}_{event_name}"
        
        # Fallback
        if 'site_no' in row:
            return f"{row['site_no']}_unknown_event_{idx}"
        else:
            return f"sample_{os.path.basename(csv_file).split('.')[0]}_{idx}"


## 4. Usage Example


In [3]:
# Initialize downloader (First 3 features from each file)
output_dir = '/u/wz53/alphaearth/Flooding_event_/Flood_dataset/weather_first_3_features'
downloader = OptimizedWeatherDownloader(output_dir, max_workers=2, max_samples=3)

# Define input CSV files
csv_files = [
    '/u/wz53/alphaearth/Flooding_event_/Flood_dataset/Unified_Peak_Data_2016_2017_with_ID(1006).csv',
    '/u/wz53/alphaearth/Flooding_event_/Flood_dataset/Unified_Peak_Data_2018_and_later_with_ID(1006).csv',
    '/u/wz53/alphaearth/Flooding_event_/Flood_dataset/matched_records_1947_with_ID_2016_2017(1006).csv',
    '/u/wz53/alphaearth/Flooding_event_/Flood_dataset/matched_records_698_with_ID_2018_and_later(1006).csv'
]

# Start download process
print("Starting weather data download (first 3 features from each file)...")
print(f"Output directory: {output_dir}")
print(f"Max workers: {downloader.max_workers}")
print(f"Max samples per file: {downloader.max_samples}")
print(f"Weather parameters: {len(downloader.weather_params)}")
print(f"Excluded parameters: {len(downloader.excluded_params)}")

# Run download
completed, failed = downloader.download_all_samples(csv_files)
print(f"\nDownload Results: {completed} successful, {failed} failed")


Starting weather data download (first 3 features from each file)...
Output directory: /u/wz53/alphaearth/Flooding_event_/Flood_dataset/weather_first_3_features
Max workers: 2
Max samples per file: 3
Weather parameters: 21
Excluded parameters: 11
2025-10-21 12:02:18,242 - INFO - Loading /u/wz53/alphaearth/Flooding_event_/Flood_dataset/Unified_Peak_Data_2016_2017_with_ID(1006).csv
2025-10-21 12:02:18,276 - INFO -   Processing first 3 samples (from 1949 total)
2025-10-21 12:02:18,277 - INFO - Loading /u/wz53/alphaearth/Flooding_event_/Flood_dataset/Unified_Peak_Data_2018_and_later_with_ID(1006).csv
2025-10-21 12:02:18,302 - INFO -   Processing first 3 samples (from 2297 total)
2025-10-21 12:02:18,302 - INFO - Loading /u/wz53/alphaearth/Flooding_event_/Flood_dataset/matched_records_1947_with_ID_2016_2017(1006).csv
2025-10-21 12:02:18,335 - INFO -   Processing first 3 samples (from 1947 total)
2025-10-21 12:02:18,335 - INFO - Loading /u/wz53/alphaearth/Flooding_event_/Flood_dataset/matched_

In [4]:
# Check download results
print("📊 Download Results Summary")
print("=" * 50)
print(f"Expected output: 12 weather data files (3 from each of 4 CSV files)")
print("Each file contains 24 hours of weather data before peak flooding")
print("Files saved in: /u/wz53/alphaearth/Flooding_event_/Flood_dataset/weather_first_3_features/")
print()
print("Download completed! Check the output directory for your weather data files.")


📊 Download Results Summary
Expected output: 12 weather data files (3 from each of 4 CSV files)
Each file contains 24 hours of weather data before peak flooding
Files saved in: /u/wz53/alphaearth/Flooding_event_/Flood_dataset/weather_first_3_features/

Download completed! Check the output directory for your weather data files.


## 5. Extract 24-Hour Data Before Peak


In [5]:
def extract_24h_before_peak(df, peak_date):
    """
    Extract exactly 24 hours of data before peak from weather DataFrame
    
    Args:
        df: Weather data DataFrame with 'time' column
        peak_date: Peak flooding datetime
    
    Returns:
        DataFrame with 23-24 rows (1-24 hours before peak)
    """
    df_copy = df.copy()
    df_copy['time'] = pd.to_datetime(df_copy['time'])
    
    # Ensure peak_date is datetime
    if isinstance(peak_date, str):
        peak_date = pd.to_datetime(peak_date)
    
    # Calculate hours before peak
    time_diff_hours = (peak_date - df_copy['time']).dt.total_seconds() / 3600
    
    # Filter: 1-24 hours before peak
    mask = (time_diff_hours >= 1) & (time_diff_hours <= 24)
    df_24h = df_copy[mask].copy()
    
    # Sort by time
    df_24h = df_24h.sort_values('time')
    
    # Add hours_before_peak column
    df_24h['hours_before_peak'] = time_diff_hours[mask]
    
    return df_24h

def process_weather_file_to_24h(input_file, output_file):
    """
    Process a weather file and extract 24h before peak
    """
    df = pd.read_csv(input_file)
    
    # Get peak_date
    peak_date = pd.to_datetime(df['peak_date'].iloc[0])
    
    # Extract 24h data
    df_24h = extract_24h_before_peak(df, peak_date)
    
    print(f"Original: {len(df)} rows -> Extracted: {len(df_24h)} rows")
    print(f"Time range: {df_24h['time'].min()} to {df_24h['time'].max()}")
    print(f"Hours before peak: {df_24h['hours_before_peak'].min():.1f} to {df_24h['hours_before_peak'].max():.1f}")
    
    # Save
    df_24h.to_csv(output_file, index=False)
    
    return df_24h

# Example usage
print("24-hour data extraction functions defined:")
print("- extract_24h_before_peak(): Extract 1-24 hours before peak")
print("- process_weather_file_to_24h(): Process file and save 24h data")
print()
print("Usage example:")
print("df_24h = extract_24h_before_peak(df, peak_date)")
print("process_weather_file_to_24h('input.csv', 'output_24h.csv')")


24-hour data extraction functions defined:
- extract_24h_before_peak(): Extract 1-24 hours before peak
- process_weather_file_to_24h(): Process file and save 24h data

Usage example:
df_24h = extract_24h_before_peak(df, peak_date)
process_weather_file_to_24h('input.csv', 'output_24h.csv')


In [6]:
# Example: Process downloaded weather files to extract 24h data
print("Processing downloaded weather files to extract 24-hour data...")
print("=" * 60)

# Set up directories
input_dir = '/u/wz53/alphaearth/Flooding_event_/Flood_dataset/weather_first_3_features'
output_dir = '/u/wz53/alphaearth/Flooding_event_/Flood_dataset/weather_24h_extracted'

# Create output directory
os.makedirs(output_dir, exist_ok=True)

# Process all downloaded weather files
if os.path.exists(input_dir):
    weather_files = [f for f in os.listdir(input_dir) if f.endswith('.csv')]
    
    if weather_files:
        print(f"Found {len(weather_files)} weather files to process")
        print()
        
        for i, weather_file in enumerate(weather_files, 1):
            input_path = os.path.join(input_dir, weather_file)
            output_path = os.path.join(output_dir, f"24h_{weather_file}")
            
            print(f"Processing {i}/{len(weather_files)}: {weather_file}")
            
            try:
                # Extract 24h data
                df_24h = process_weather_file_to_24h(input_path, output_path)
                print(f"✓ Saved: {output_path}")
                print()
                
            except Exception as e:
                print(f"✗ Error processing {weather_file}: {e}")
                print()
        
        print("=" * 60)
        print("24-hour data extraction completed!")
        print(f"Input directory: {input_dir}")
        print(f"Output directory: {output_dir}")
        print(f"Processed files: {len(weather_files)}")
        
    else:
        print("No weather files found in input directory")
        print("Please run the download first (Cell 7)")
        
else:
    print("Input directory does not exist")
    print("Please run the download first (Cell 7)")


Processing downloaded weather files to extract 24-hour data...
Found 12 weather files to process

Processing 1/12: CTFAI04670_2018_March_Extratropical_Cyclone.csv
Original: 48 rows -> Extracted: 24 rows
Time range: 2018-03-04 05:00:00 to 2018-03-05 04:00:00
Hours before peak: 1.0 to 24.0
✓ Saved: /u/wz53/alphaearth/Flooding_event_/Flood_dataset/weather_24h_extracted/24h_CTFAI04670_2018_March_Extratropical_Cyclone.csv

Processing 2/12: CTFAI04807_2018_March_Extratropical_Cyclone.csv
Original: 48 rows -> Extracted: 24 rows
Time range: 2018-03-15 04:00:00 to 2018-03-16 03:00:00
Hours before peak: 1.0 to 24.0
✓ Saved: /u/wz53/alphaearth/Flooding_event_/Flood_dataset/weather_24h_extracted/24h_CTFAI04807_2018_March_Extratropical_Cyclone.csv

Processing 3/12: CTFAI15448_2018_March_Extratropical_Cyclone.csv
Original: 48 rows -> Extracted: 24 rows
Time range: 2018-03-04 05:00:00 to 2018-03-05 04:00:00
Hours before peak: 1.0 to 24.0
✓ Saved: /u/wz53/alphaearth/Flooding_event_/Flood_dataset/weath

## 6. Data Processing Summary

### First 3 Features Processing Pipeline

This notebook downloads weather data for **the first 3 features from each CSV file** and provides 24-hour data extraction:

#### Key Features
- **Limited Processing**: Processes first 3 samples from each CSV file (12 total)
- **API Optimization**: Uses optimized parameters with rate limiting handling
- **Quality Assurance**: Zero null values, complete weather parameters
- **24-Hour Extraction**: Extracts exactly 24 hours before peak flooding
- **Ready to Run**: No comments to uncomment, direct execution

#### Processing Steps
1. **Download**: Download weather data for first 3 features from each CSV file
2. **Extract**: Extract exactly 24 hours before peak flooding event
3. **Save**: Save processed 24-hour data to separate directory

#### Expected Output
- **12 weather data files** (3 from each of 4 CSV files)
- **12 extracted 24-hour files** (processed from downloaded files)
- **24 hours of data** before peak flooding for each sample
- **22 weather parameters** per file
- **Complete metadata** (coordinates, peak date, hours before peak)


## 7. Quick Start Example

### Run the Complete Pipeline

To run the complete weather data download and 24-hour extraction pipeline:

1. **Execute Cell 7**: Download weather data for first 3 features from each CSV file
2. **Execute Cell 11**: Extract 24-hour data before peak flooding events
3. **Check Results**: Verify files in output directories

### Expected Output
- **12 weather data files** in `weather_first_3_features/` directory
- **12 extracted 24-hour files** in `weather_24h_extracted/` directory
- **Complete metadata** for each sample including coordinates and peak dates
