In [4]:
from dotenv import load_dotenv
from pathlib import Path

notebook_dir = Path().absolute()  # Gets current directory
env_path = notebook_dir.parent.parent / '.env.local'
load_dotenv(dotenv_path=env_path)


True

In [10]:
import pandas as pd
import numpy as np
import requests
from datetime import datetime, timedelta
import time
from concurrent.futures import ThreadPoolExecutor
import os
import json
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

class NOAADataCollector:
    def __init__(self, api_key):
        self.base_url = "https://www.ncdc.noaa.gov/cdo-web/api/v2/"
        self.headers = {"token": api_key}
        
        # Configure retry strategy
        self.session = requests.Session()
        retries = Retry(
            total=5,  # number of retries
            backoff_factor=1,  # wait 1, 2, 4, 8, 16 seconds between retries
            status_forcelist=[429, 500, 502, 503, 504],  # retry on these status codes
            allowed_methods=["GET"]
        )
        self.session.mount('https://', HTTPAdapter(max_retries=retries))
        
        self.stations = {
            'PHILADELPHIA': 'GHCND:USW00013739',
            'PITTSBURGH': 'GHCND:USW00094823',
            'CHICAGO': 'GHCND:USW00094846',
            'COLUMBUS': 'GHCND:USW00014821',
            'WASHINGTON': 'GHCND:USW00013743'
        }
        
        self.datatypes = ['TMAX', 'TMIN', 'TAVG', 'PRCP', 'AWND']
        
    def fetch_with_retry(self, url, params=None, max_retries=5, initial_wait=1):
        """
        Fetch data with exponential backoff retry logic
        """
        attempt = 0
        while attempt < max_retries:
            try:
                response = self.session.get(url, headers=self.headers, params=params)
                
                if response.status_code == 429:  # Rate limit
                    wait_time = 60
                    print(f"Rate limit reached, waiting {wait_time} seconds...")
                    time.sleep(wait_time)
                    continue
                    
                if response.status_code in [500, 502, 503, 504]:  # Server errors
                    wait_time = initial_wait * (2 ** attempt)
                    print(f"Server error {response.status_code}, waiting {wait_time} seconds...")
                    time.sleep(wait_time)
                    attempt += 1
                    continue
                    
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.RequestException as e:
                wait_time = initial_wait * (2 ** attempt)
                print(f"Request failed: {str(e)}, attempt {attempt + 1}/{max_retries}, waiting {wait_time} seconds...")
                time.sleep(wait_time)
                attempt += 1
        
        print(f"Failed to fetch data after {max_retries} attempts")
        return None
        
    def fetch_station_data_by_chunk(self, station_id, start_date, end_date):
        """
        Fetch data for a smaller time chunk
        """
        url = f"{self.base_url}data"
        params = {
            "datasetid": "GHCND",
            "stationid": station_id,
            "startdate": start_date,
            "enddate": end_date,
            "limit": 1000,
            "datatypeid": self.datatypes
        }
        
        all_data = []
        offset = 1
        
        while True:
            if offset > 1:
                params['offset'] = offset
                
            data = self.fetch_with_retry(url, params)
            if not data:
                break
                
            if "results" in data:
                all_data.extend(data["results"])
                print(f"Fetched {len(data['results'])} records for {station_id} - {start_date}")
                
                if len(data["results"]) < 1000:
                    break
                    
                offset += len(data["results"])
            else:
                break
                
            time.sleep(0.5)  # Respect rate limits
            
        return all_data
        
    def fetch_station_data_by_year(self, station_id, year):
        """
        Fetch data for a single station for one year, broken into quarters
        """
        quarters = [
            ("01-01", "03-31"),
            ("04-01", "06-30"),
            ("07-01", "09-30"),
            ("10-01", "12-31")
        ]
        
        all_data = []
        for start_month, end_month in quarters:
            start_date = f"{year}-{start_month}"
            end_date = f"{year}-{end_month}"
            
            print(f"Fetching {station_id} for period {start_date} to {end_date}")
            quarter_data = self.fetch_station_data_by_chunk(station_id, start_date, end_date)
            
            if quarter_data:
                all_data.extend(quarter_data)
            
            time.sleep(1)  # Brief pause between quarters
            
        return all_data

    def process_station_data(self, raw_data):
        """
        Process raw NOAA data into structured format
        """
        if not raw_data:
            return pd.DataFrame()
            
        # Convert to DataFrame
        df = pd.DataFrame(raw_data)
        
        # Extract date from the raw data
        df['date'] = pd.to_datetime(df['date'])
        
        # Pivot the data
        df_pivot = df.pivot_table(
            index='date',
            columns='datatype',
            values='value',
            aggfunc='first'  # Use first value if there are duplicates
        ).reset_index()
        
        # Rename columns
        column_mapping = {
            'TMAX': 'max_temperature',
            'TMIN': 'min_temperature',
            'TAVG': 'avg_temperature',
            'PRCP': 'precipitation',
            'AWND': 'avg_wind_speed'
        }
        
        df_pivot = df_pivot.rename(columns=column_mapping)
        
        return df_pivot
        
    def interpolate_hourly_data(self, daily_data):
        """
        Interpolate daily data to hourly values
        """
        if daily_data.empty:
            return pd.DataFrame()
            
        # Convert to hourly timestamps
        daily_data = daily_data.set_index('date')
        hourly_index = pd.date_range(
            start=daily_data.index.min(),
            end=daily_data.index.max(),
            freq='H'
        )
        
        # Resample to hourly and interpolate
        hourly_data = daily_data.reindex(hourly_index)
        
        # Temperature interpolation
        temp_cols = ['max_temperature', 'min_temperature', 'avg_temperature']
        existing_temp_cols = [col for col in temp_cols if col in hourly_data.columns]
        if existing_temp_cols:
            hourly_data[existing_temp_cols] = hourly_data[existing_temp_cols].interpolate(method='cubic')
        
        # Other variables interpolation
        other_cols = ['precipitation', 'avg_wind_speed']
        existing_other_cols = [col for col in other_cols if col in hourly_data.columns]
        if existing_other_cols:
            hourly_data[existing_other_cols] = hourly_data[existing_other_cols].interpolate(method='linear')
        
        # Add hour of day influence on temperature
        if 'max_temperature' in hourly_data.columns and 'min_temperature' in hourly_data.columns:
            hour_temp_factor = -np.cos(2 * np.pi * hourly_index.hour / 24)
            temp_range = (hourly_data['max_temperature'] - hourly_data['min_temperature']) / 2
            hourly_data['temperature'] = hourly_data['avg_temperature'] + hour_temp_factor * temp_range
        
        return hourly_data.reset_index().rename(columns={'index': 'timestamp'})
        
    def aggregate_station_data(self, all_station_data):
        """
        Aggregate data from all stations into a single dataset
        """
        if not all_station_data:
            raise ValueError("No station data available for aggregation")
            
        weights = {
            'PHILADELPHIA': 0.3,
            'CHICAGO': 0.25,
            'WASHINGTON': 0.2,
            'PITTSBURGH': 0.15,
            'COLUMBUS': 0.1
        }
        
        # Initialize with first available station
        first_station = next(iter(all_station_data))
        combined_data = all_station_data[first_station].copy()
        combined_data = combined_data.multiply(weights[first_station])
        
        # Add other stations
        for station, data in all_station_data.items():
            if station == first_station:
                continue
            if not data.empty:
                # Ensure indexes match
                data = data.set_index('timestamp')
                combined_data = combined_data.set_index('timestamp')
                combined_data = combined_data.add(data.multiply(weights[station]))
        
        return combined_data.reset_index()
    
    def save_data(self, data, output_path, backup=True):
        """
        Save processed data to CSV with backup
        """
        os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)
        
        # Save main file
        data.to_csv(output_path, index=False)
        print(f"Data saved to {output_path}")
        
        # Create backup if requested
        if backup:
            backup_path = f"{output_path}.backup"
            data.to_csv(backup_path, index=False)
            print(f"Backup saved to {backup_path}")

def main():
    api_key = os.getenv('NOAA_API_KEY')
    if not api_key:
        raise ValueError("NOAA API key not found. Set NOAA_API_KEY environment variable.")
    
    collector = NOAADataCollector(api_key)
    
    # Set date range
    start_year = 2002
    end_year = 2018
    
    print("Fetching weather data...")
    all_station_data = {}
    
    # Process one station at a time
    for station_name, station_id in collector.stations.items():
        try:
            print(f"\nProcessing station: {station_name}")
            station_data = []
            
            for year in range(start_year, end_year + 1):
                try:
                    year_data = collector.fetch_station_data_by_year(station_id, year)
                    if year_data:
                        station_data.extend(year_data)
                        # Save intermediate results after each year
                        temp_df = collector.process_station_data(station_data)
                        temp_path = f"temp_{station_name.lower()}_{year}.csv"
                        temp_df.to_csv(temp_path, index=False)
                        print(f"Saved intermediate data for {station_name} year {year}")
                except Exception as e:
                    print(f"Error processing year {year} for station {station_name}: {str(e)}")
                    continue
            
            if station_data:
                processed_data = collector.process_station_data(station_data)
                hourly_data = collector.interpolate_hourly_data(processed_data)
                all_station_data[station_name] = hourly_data
                
                # Save individual station data
                station_path = f"station_{station_name.lower()}.csv"
                collector.save_data(hourly_data, station_path)
                
        except Exception as e:
            print(f"Error processing station {station_name}: {str(e)}")
            continue
    
    if all_station_data:
        print("\nAggregating station data...")
        try:
            combined_data = collector.aggregate_station_data(all_station_data)
            collector.save_data(combined_data, "noaa_data.csv")
        except Exception as e:
            print(f"Error in aggregation/saving: {str(e)}")
    else:
        print("No data collected from any station")

if __name__ == "__main__":
    main()

Fetching weather data...

Processing station: PHILADELPHIA
Fetching GHCND:USW00013739 for period 2002-01-01 to 2002-03-31
Fetched 450 records for GHCND:USW00013739 - 2002-01-01
Fetching GHCND:USW00013739 for period 2002-04-01 to 2002-06-30
Fetched 455 records for GHCND:USW00013739 - 2002-04-01
Fetching GHCND:USW00013739 for period 2002-07-01 to 2002-09-30
Fetched 460 records for GHCND:USW00013739 - 2002-07-01
Fetching GHCND:USW00013739 for period 2002-10-01 to 2002-12-31
Fetched 459 records for GHCND:USW00013739 - 2002-10-01
Saved intermediate data for PHILADELPHIA year 2002
Fetching GHCND:USW00013739 for period 2003-01-01 to 2003-03-31
Fetched 450 records for GHCND:USW00013739 - 2003-01-01
Fetching GHCND:USW00013739 for period 2003-04-01 to 2003-06-30
Fetched 454 records for GHCND:USW00013739 - 2003-04-01
Fetching GHCND:USW00013739 for period 2003-07-01 to 2003-09-30
Fetched 460 records for GHCND:USW00013739 - 2003-07-01
Fetching GHCND:USW00013739 for period 2003-10-01 to 2003-12-31
F

  hourly_index = pd.date_range(


Data saved to station_philadelphia.csv
Backup saved to station_philadelphia.csv.backup

Processing station: PITTSBURGH
Fetching GHCND:USW00094823 for period 2002-01-01 to 2002-03-31
Fetched 450 records for GHCND:USW00094823 - 2002-01-01
Fetching GHCND:USW00094823 for period 2002-04-01 to 2002-06-30
Fetched 455 records for GHCND:USW00094823 - 2002-04-01
Fetching GHCND:USW00094823 for period 2002-07-01 to 2002-09-30
Fetched 460 records for GHCND:USW00094823 - 2002-07-01
Fetching GHCND:USW00094823 for period 2002-10-01 to 2002-12-31
Fetched 460 records for GHCND:USW00094823 - 2002-10-01
Saved intermediate data for PITTSBURGH year 2002
Fetching GHCND:USW00094823 for period 2003-01-01 to 2003-03-31
Fetched 450 records for GHCND:USW00094823 - 2003-01-01
Fetching GHCND:USW00094823 for period 2003-04-01 to 2003-06-30
Fetched 454 records for GHCND:USW00094823 - 2003-04-01
Fetching GHCND:USW00094823 for period 2003-07-01 to 2003-09-30
Fetched 460 records for GHCND:USW00094823 - 2003-07-01
Fetchi

  hourly_index = pd.date_range(


Data saved to station_pittsburgh.csv
Backup saved to station_pittsburgh.csv.backup

Processing station: CHICAGO
Fetching GHCND:USW00094846 for period 2002-01-01 to 2002-03-31
Fetched 450 records for GHCND:USW00094846 - 2002-01-01
Fetching GHCND:USW00094846 for period 2002-04-01 to 2002-06-30
Fetched 455 records for GHCND:USW00094846 - 2002-04-01
Fetching GHCND:USW00094846 for period 2002-07-01 to 2002-09-30
Fetched 460 records for GHCND:USW00094846 - 2002-07-01
Fetching GHCND:USW00094846 for period 2002-10-01 to 2002-12-31
Fetched 460 records for GHCND:USW00094846 - 2002-10-01
Saved intermediate data for CHICAGO year 2002
Fetching GHCND:USW00094846 for period 2003-01-01 to 2003-03-31
Fetched 450 records for GHCND:USW00094846 - 2003-01-01
Fetching GHCND:USW00094846 for period 2003-04-01 to 2003-06-30
Fetched 455 records for GHCND:USW00094846 - 2003-04-01
Fetching GHCND:USW00094846 for period 2003-07-01 to 2003-09-30
Fetched 460 records for GHCND:USW00094846 - 2003-07-01
Fetching GHCND:U

  hourly_index = pd.date_range(


Data saved to station_chicago.csv
Backup saved to station_chicago.csv.backup

Processing station: COLUMBUS
Fetching GHCND:USW00014821 for period 2002-01-01 to 2002-03-31
Fetched 450 records for GHCND:USW00014821 - 2002-01-01
Fetching GHCND:USW00014821 for period 2002-04-01 to 2002-06-30
Fetched 455 records for GHCND:USW00014821 - 2002-04-01
Fetching GHCND:USW00014821 for period 2002-07-01 to 2002-09-30
Fetched 460 records for GHCND:USW00014821 - 2002-07-01
Fetching GHCND:USW00014821 for period 2002-10-01 to 2002-12-31
Fetched 460 records for GHCND:USW00014821 - 2002-10-01
Saved intermediate data for COLUMBUS year 2002
Fetching GHCND:USW00014821 for period 2003-01-01 to 2003-03-31
Fetched 450 records for GHCND:USW00014821 - 2003-01-01
Fetching GHCND:USW00014821 for period 2003-04-01 to 2003-06-30
Fetched 454 records for GHCND:USW00014821 - 2003-04-01
Fetching GHCND:USW00014821 for period 2003-07-01 to 2003-09-30
Fetched 460 records for GHCND:USW00014821 - 2003-07-01
Fetching GHCND:USW00

  hourly_index = pd.date_range(


Data saved to station_columbus.csv
Backup saved to station_columbus.csv.backup

Processing station: WASHINGTON
Fetching GHCND:USW00013743 for period 2002-01-01 to 2002-03-31
Fetched 450 records for GHCND:USW00013743 - 2002-01-01
Fetching GHCND:USW00013743 for period 2002-04-01 to 2002-06-30
Fetched 454 records for GHCND:USW00013743 - 2002-04-01
Fetching GHCND:USW00013743 for period 2002-07-01 to 2002-09-30
Fetched 460 records for GHCND:USW00013743 - 2002-07-01
Fetching GHCND:USW00013743 for period 2002-10-01 to 2002-12-31
Fetched 460 records for GHCND:USW00013743 - 2002-10-01
Saved intermediate data for WASHINGTON year 2002
Fetching GHCND:USW00013743 for period 2003-01-01 to 2003-03-31
Fetched 450 records for GHCND:USW00013743 - 2003-01-01
Fetching GHCND:USW00013743 for period 2003-04-01 to 2003-06-30
Fetched 455 records for GHCND:USW00013743 - 2003-04-01
Fetching GHCND:USW00013743 for period 2003-07-01 to 2003-09-30
Fetched 460 records for GHCND:USW00013743 - 2003-07-01
Fetching GHCND

  hourly_index = pd.date_range(


Data saved to station_washington.csv
Backup saved to station_washington.csv.backup

Aggregating station data...
Error in aggregation/saving: cannot perform __mul__ with this index type: DatetimeArray


In [11]:
import pandas as pd

def aggregate_saved_stations():
    weights = {
        'PHILADELPHIA': 0.3,
        'CHICAGO': 0.25,
        'WASHINGTON': 0.2,
        'PITTSBURGH': 0.15,
        'COLUMBUS': 0.1
    }
    
    # Load all station data
    all_station_data = {}
    for station in weights.keys():
        try:
            file_path = f"station_{station.lower()}.csv"
            df = pd.read_csv(file_path)
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            all_station_data[station] = df
        except Exception as e:
            print(f"Could not load {station}: {e}")
            continue
    
    # Initialize with first station
    first_station = next(iter(all_station_data))
    combined_data = all_station_data[first_station].copy()
    numeric_columns = combined_data.select_dtypes(include=['float64', 'int64']).columns
    
    # Multiply numeric columns by weight
    combined_data[numeric_columns] = combined_data[numeric_columns].multiply(weights[first_station])
    
    # Add other stations
    for station, data in all_station_data.items():
        if station == first_station:
            continue
        if not data.empty:
            data[numeric_columns] = data[numeric_columns].multiply(weights[station])
            combined_data = pd.merge(combined_data, data, on='timestamp', suffixes=('', f'_{station.lower()}'))
    
    # Save result
    combined_data.to_csv("noaa_data.csv", index=False)
    print("Aggregated data saved to noaa_data.csv")

# Run the aggregation
aggregate_saved_stations()

Aggregated data saved to noaa_data.csv
