# Data Acquisition: NOAA Integrated Surface Database (ISD)

This notebook loads operational-quality hourly surface observations from NOAA ISD for the primary production dataset.

## Objectives
1. Load station inventory and metadata
2. Load hourly observations for selected stations
3. Parse NOAA Global Hourly CSV format
4. Extract key meteorological variables (temperature, humidity, wind, pressure)
5. Handle missing data and station temporal coverage
6. Create cleaned dataset for model training

## Data Source
- **NOAA ISD**: Integrated Surface Database (Global Hourly)
- **Access**: Direct download from https://www.ncei.noaa.gov/data/global-hourly/access/
- **Format**: CSV files (one per station per year)
- **Coverage**: Global land stations, hourly observations
- **Variables**: 2m air temperature, relative humidity, dew point, wind speed/direction, surface pressure

## Two Paths to Load Data

### Option A: Load Pre-Downloaded CSV Files (Recommended)
If you've already downloaded CSV files from the NOAA website to `data/noaa-data/`:
1. Run cells 1-2 (imports and configuration)
2. **Skip to Section 3** and run cells 6-9 to load your local CSV files
3. Continue with Section 5 (Data Validation)

### Option B: Download via FTP (if network allows)
If you want to download data programmatically:
1. Run cells 1-5 for FTP-based download
2. Note: This may fail due to network restrictions

## Alternative: Standalone Download Script

If you encounter persistent network issues, use the standalone script:

```bash
python scripts/download_noaa_isd_data.py \
    --stations 725030-14732,722950-23174 \
    --years 2022,2023 \
    --output-dir data/raw/noaa_isd
```

See `scripts/README.md` for more details.

In [1]:
# Imports
import os
import sys
from pathlib import Path
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import time
from tqdm import tqdm
import ftplib
import gzip
import io
import re
import requests
from typing import Optional, List, Dict, Tuple

# Setup project path
current_dir = Path(os.getcwd()).resolve()

# Find project root by looking for src/ and notebooks/ directories
if (current_dir / 'src').exists() and (current_dir / 'notebooks').exists():
    project_root = current_dir
elif current_dir.name in ['01_data_acquisition', '02_data_preprocessing', '03_baselines', 
                           '04_gnn_models', '05_training', '06_analysis', '07_evaluation', '08_documentation']:
    project_root = current_dir.parent.parent
elif current_dir.name == 'notebooks':
    project_root = current_dir.parent
else:
    # Walk up to find project root
    for parent in current_dir.parents:
        if (parent / 'src').exists() and (parent / 'notebooks').exists():
            project_root = parent
            break
    else:
        project_root = current_dir

# Add project root to Python path
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# Now we can import from src
from src.utils.config import RAW_DATA_DIR, PROCESSED_DATA_DIR

print(f"Project root: {project_root}")
print(f"Raw data directory: {RAW_DATA_DIR}")
print(f"Processed data directory: {PROCESSED_DATA_DIR}")

# Create directories if they don't exist
RAW_DATA_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)

Project root: C:\Users\Kata\Desktop\earth-sgnn
Raw data directory: C:\Users\Kata\Desktop\earth-sgnn\data\raw
Processed data directory: C:\Users\Kata\Desktop\earth-sgnn\data\processed


## 1. NOAA ISD Configuration

NOAA ISD data is accessed via FTP. We'll download:
- Station inventory (station metadata)
- Hourly observation files (fixed-width format)

In [3]:
# NOAA ISD FTP Configuration
NOAA_ISD_FTP_HOST = "ftp.ncei.noaa.gov"
NOAA_ISD_FTP_PATH = "/pub/data/noaa/"

# Date range for data download
# Note: ISD data is typically available with a few days delay
# Only download data for years that exist (current year and past years)
CURRENT_YEAR = datetime.now().year
# ISD data is typically available up to a few days ago, so use current year - 1 as max
# For prototyping, let's use 2022-2023 (recent years with complete data)
END_YEAR = min(CURRENT_YEAR - 1, 2023)  # Don't go beyond available data
START_YEAR = END_YEAR - 1  # 2 years of data

END_DATE = datetime(END_YEAR, 12, 31)  # End of the last complete year
START_DATE = datetime(START_YEAR, 1, 1)  # Start of first year

# Target region (can be expanded)
# For initial download, we'll focus on US stations
TARGET_REGION = "US"  # Options: "US", "global"

print(f"Start date: {START_DATE.strftime('%Y-%m-%d')}")
print(f"End date: {END_DATE.strftime('%Y-%m-%d')}")
print(f"Years to download: {START_YEAR} to {END_YEAR}")
print(f"Total days: {(END_DATE - START_DATE).days}")
print(f"Target region: {TARGET_REGION}")

Start date: 2022-01-01
End date: 2023-12-31
Years to download: 2022 to 2023
Total days: 729
Target region: US


## 2. Download Station Inventory

The station inventory contains metadata for all ISD stations including:
- Station ID (USAF-WBAN)
- Station name
- Latitude, longitude, elevation
- Country code
- Temporal coverage

In [None]:
def download_station_inventory(output_file: Optional[Path] = None) -> pd.DataFrame:
    """
    Download NOAA ISD station inventory from FTP.
    
    The inventory file is a fixed-width format file containing station metadata.
    
    Parameters:
    -----------
    output_file : Path, optional
        Path to save the inventory file. If None, saves to RAW_DATA_DIR.
    
    Returns:
    --------
    pd.DataFrame
        DataFrame with station metadata
    """
    if output_file is None:
        output_file = RAW_DATA_DIR / "noaa_isd_station_inventory.csv"
    
    print("Connecting to NOAA ISD FTP server...")
    
    try:
        ftp = ftplib.FTP(NOAA_ISD_FTP_HOST)
        ftp.login()  # Anonymous login
        ftp.cwd(NOAA_ISD_FTP_PATH)
        
        # List files to find inventory
        files = ftp.nlst()
        inventory_file = None
        
        # Look for inventory file (typically named isd-history.csv or similar)
        for f in files:
            if 'history' in f.lower() or 'inventory' in f.lower() or 'station' in f.lower():
                inventory_file = f
                break
        
        if inventory_file is None:
            # Try common names
            for name in ['isd-history.csv', 'isd-history.txt', 'isd-stations.txt']:
                try:
                    ftp.size(name)
                    inventory_file = name
                    break
                except:
                    continue
        
        if inventory_file is None:
            print("Warning: Could not find station inventory file automatically.")
            print("Available files:", files[:10])  # Show first 10
            return None
        
        print(f"Downloading station inventory: {inventory_file}")
        
        # Download file
        buffer = io.BytesIO()
        ftp.retrbinary(f'RETR {inventory_file}', buffer.write)
        buffer.seek(0)
        
        ftp.quit()
        
        # Handle .z compressed files (Unix compress format)
        if inventory_file.endswith('.z'):
            try:
                import zlib
                # Read compressed data
                compressed_data = buffer.read()
                # Try to decompress (zlib can handle some .z files, but not all)
                # For .z files, we might need a different approach
                # Let's try using gzip first (some .z files are actually gzip)
                buffer.seek(0)
                try:
                    import gzip
                    decompressed = gzip.decompress(compressed_data)
                    buffer = io.BytesIO(decompressed)
                except:
                    # If gzip fails, try zlib
                    try:
                        # Skip header if present and try decompression
                        decompressed = zlib.decompress(compressed_data)
                        buffer = io.BytesIO(decompressed)
                    except:
                        print("Warning: Could not decompress .z file. Trying alternative method...")
                        # Save to temp file and use system decompress if available
                        temp_file = RAW_DATA_DIR / "temp_inventory.z"
                        with open(temp_file, 'wb') as f:
                            f.write(compressed_data)
                        print(f"Saved compressed file to {temp_file}")
                        print("Note: You may need to decompress manually or install a .z decompression tool")
                        return None
            except ImportError:
                print("Warning: zlib not available. Cannot decompress .z file.")
                return None
        
        # Parse fixed-width format
        # ISD inventory format: USAF,WBAN,STATION NAME,CTRY,STATE,ICAO,LAT,LON,ELEV(M),BEGIN,END
        # Note: Format may vary, we'll try to parse it flexibly
        try:
            # Try reading as CSV first
            df = pd.read_csv(buffer, encoding='utf-8', low_memory=False)
        except:
            buffer.seek(0)
            # Try with different encoding
            try:
                df = pd.read_csv(buffer, encoding='latin-1', low_memory=False)
            except:
                buffer.seek(0)
                # Try reading as fixed-width text
                try:
                    content = buffer.read().decode('latin-1', errors='ignore')
                    lines = content.split('\n')
                    # ISD inventory is typically fixed-width or CSV
                    # Try parsing as CSV with different separators
                    if len(lines) > 0:
                        # Check if it's CSV-like
                        if ',' in lines[0] or '\t' in lines[0]:
                            sep = ',' if ',' in lines[0] else '\t'
                            df = pd.read_csv(io.StringIO(content), sep=sep, encoding='latin-1', 
                                            low_memory=False, on_bad_lines='skip')
                        else:
                            # Fixed-width format - parse manually
                            print("Attempting fixed-width parsing...")
                            # ISD inventory format (based on NOAA documentation):
                            # USAF: 0-5 (6 chars)
                            # WBAN: 7-11 (5 chars)
                            # STATION NAME: 13-42 (30 chars)
                            # CTRY: 43-44 (2 chars)
                            # STATE: 46-47 (2 chars)
                            # ICAO: 49-52 (4 chars)
                            # LAT: 57-63 (7 chars, +N/-S, decimal degrees)
                            # LON: 65-72 (8 chars, +E/-W, decimal degrees)
                            # ELEV(M): 74-80 (7 chars)
                            # BEGIN: 82-89 (8 chars, YYYYMMDD)
                            # END: 91-98 (8 chars, YYYYMMDD)
                            data = []
                            header_skipped = False
                            for line in lines:
                                if len(line.strip()) == 0:
                                    continue
                                
                                # Skip header rows (lines that don't start with digits or have "USAF" in them)
                                if not header_skipped:
                                    if 'USAF' in line.upper() or 'FEDERAL' in line.upper() or 'INVENTORY' in line.upper():
                                        continue
                                    if not (line[0:6].strip().isdigit() or len(line[0:6].strip()) == 0):
                                        continue
                                    header_skipped = True
                                
                                if len(line) < 80:  # Need at least 80 chars for full record
                                    continue
                                
                                try:
                                    usaf = line[0:6].strip()
                                    wban = line[7:12].strip() if len(line) > 12 else ''
                                    
                                    # Skip if USAF is not a valid station ID
                                    if not usaf.isdigit() or len(usaf) == 0:
                                        continue
                                    
                                    name = line[13:43].strip() if len(line) > 43 else ''
                                    ctry = line[43:45].strip() if len(line) > 45 else ''
                                    state = line[46:48].strip() if len(line) > 48 else ''
                                    icao = line[49:53].strip() if len(line) > 53 else ''
                                    
                                    # Parse latitude (position 57-63, format: +NN.NNN or -NN.NNN)
                                    lat_str = line[57:64].strip() if len(line) > 64 else ''
                                    lat = np.nan
                                    if lat_str:
                                        try:
                                            lat_val = float(lat_str)
                                            lat = lat_val
                                        except:
                                            pass
                                    
                                    # Parse longitude (position 65-72, format: +NNN.NNN or -NNN.NNN)
                                    lon_str = line[65:73].strip() if len(line) > 73 else ''
                                    lon = np.nan
                                    if lon_str:
                                        try:
                                            lon_val = float(lon_str)
                                            lon = lon_val
                                        except:
                                            pass
                                    
                                    # Parse elevation (position 74-80)
                                    elev_str = line[74:81].strip() if len(line) > 81 else ''
                                    elev = np.nan
                                    if elev_str:
                                        try:
                                            elev = float(elev_str)
                                        except:
                                            pass
                                    
                                    # Parse BEGIN date (position 82-89, YYYYMMDD)
                                    begin_str = line[82:90].strip() if len(line) > 90 else ''
                                    begin = None
                                    if begin_str and len(begin_str) == 8 and begin_str.isdigit():
                                        begin = begin_str
                                    
                                    # Parse END date (position 91-98, YYYYMMDD)
                                    end_str = line[91:99].strip() if len(line) > 99 else ''
                                    end = None
                                    if end_str and len(end_str) == 8 and end_str.isdigit():
                                        end = end_str
                                    
                                    data.append({
                                        'USAF': usaf,
                                        'WBAN': wban,
                                        'STATION NAME': name,
                                        'CTRY': ctry,
                                        'STATE': state,
                                        'ICAO': icao,
                                        'LAT': lat,
                                        'LON': lon,
                                        'ELEV(M)': elev,
                                        'BEGIN': begin,
                                        'END': end
                                    })
                                except Exception as e:
                                    continue
                            
                            if len(data) > 0:
                                df = pd.DataFrame(data)
                                # Filter out rows with invalid coordinates
                                df = df[df['LAT'].notna() & df['LON'].notna()]
                            else:
                                raise ValueError("Could not parse fixed-width format")
                except Exception as e:
                    print(f"Could not automatically parse inventory file: {e}")
                    print("Please download manually from: https://www.ncei.noaa.gov/data/global-hourly/access/")
                    return None
        
        # Clean column names
        df.columns = df.columns.str.strip()
        
        # Save to CSV
        df.to_csv(output_file, index=False)
        print(f"Station inventory saved to: {output_file}")
        print(f"Total stations: {len(df)}")
        
        return df
        
    except Exception as e:
        print(f"Error downloading station inventory: {e}")
        print("\nAlternative: Download manually from:")
        print("https://www.ncei.noaa.gov/data/global-hourly/access/")
        print("or")
        print("https://www.ncei.noaa.gov/pub/data/noaa/")
        return None

# Download station inventory
print("=" * 60)
print("Downloading NOAA ISD Station Inventory")
print("=" * 60)
station_inventory = download_station_inventory()

if station_inventory is not None:
    print("\nFirst few stations:")
    print(station_inventory.head())
    print(f"\nColumns: {list(station_inventory.columns)}")

Downloading NOAA ISD Station Inventory
Connecting to NOAA ISD FTP server...
Downloading station inventory: isd-inventory.txt.z


## 3. Load Pre-Downloaded NOAA Global Hourly Data

If you've already downloaded data from https://www.ncei.noaa.gov/data/global-hourly/access/2022/, 
use this section to load the CSV files directly instead of downloading via FTP.

In [4]:
# Path to pre-downloaded NOAA Global Hourly CSV files
NOAA_LOCAL_DATA_DIR = project_root / "data" / "noaa-data"

def parse_noaa_global_hourly_csv(file_path: Path) -> Optional[pd.DataFrame]:
    """
    Parse a NOAA Global Hourly CSV file.
    
    The CSV format has columns like:
    - STATION: Combined USAF+WBAN (e.g., "06279099999")
    - DATE: ISO timestamp (e.g., "2022-01-01T00:00:00")
    - LATITUDE, LONGITUDE, ELEVATION
    - NAME: Station name
    - TMP: Temperature "+NNNN,Q" (scaled by 10, Celsius)
    - DEW: Dew point "+NNNN,Q"
    - WND: "DIR,Q,TYPE,SPD,Q" (direction in degrees, speed in m/s * 10)
    - SLP: Sea level pressure "NNNNN,Q" (scaled by 10, hPa)
    
    Parameters:
    -----------
    file_path : Path
        Path to the CSV file
    
    Returns:
    --------
    pd.DataFrame or None
        Parsed observations with standardized columns
    """
    try:
        df = pd.read_csv(file_path, low_memory=False)
    except Exception as e:
        print(f"  Error reading {file_path.name}: {e}")
        return None
    
    if len(df) == 0:
        return None
    
    # Parse station info
    station_id = df['STATION'].iloc[0] if 'STATION' in df.columns else file_path.stem
    
    # Parse timestamp
    df['timestamp'] = pd.to_datetime(df['DATE'], errors='coerce')
    df = df.dropna(subset=['timestamp'])
    
    if len(df) == 0:
        return None
    
    # Parse temperature (TMP format: "+NNNN,Q" where NNNN is temp * 10 in Celsius)
    def parse_tmp(val):
        if pd.isna(val) or val == '' or '+9999' in str(val):
            return np.nan
        try:
            parts = str(val).split(',')
            temp_str = parts[0]
            temp_val = float(temp_str) / 10.0
            if -100 <= temp_val <= 100:
                return temp_val
        except:
            pass
        return np.nan
    
    # Parse dew point (same format as TMP)
    def parse_dew(val):
        return parse_tmp(val)  # Same parsing logic
    
    # Parse wind (WND format: "DIR,Q,TYPE,SPD,Q")
    def parse_wind_direction(val):
        if pd.isna(val) or val == '':
            return np.nan
        try:
            parts = str(val).split(',')
            if len(parts) >= 1:
                dir_val = int(parts[0])
                if 0 <= dir_val <= 360 and dir_val != 999:
                    return float(dir_val)
        except:
            pass
        return np.nan
    
    def parse_wind_speed(val):
        if pd.isna(val) or val == '':
            return np.nan
        try:
            parts = str(val).split(',')
            if len(parts) >= 4:
                speed_str = parts[3]
                speed_val = float(speed_str) / 10.0  # Convert from m/s * 10
                if 0 <= speed_val <= 200 and speed_str != '9999':
                    return speed_val
        except:
            pass
        return np.nan
    
    # Parse sea level pressure (SLP format: "NNNNN,Q" where NNNNN is pressure * 10 in hPa)
    def parse_slp(val):
        if pd.isna(val) or val == '' or '99999' in str(val):
            return np.nan
        try:
            parts = str(val).split(',')
            press_str = parts[0]
            press_val = float(press_str) / 10.0
            if 800 <= press_val <= 1100:
                return press_val
        except:
            pass
        return np.nan
    
    # Apply parsing
    result = pd.DataFrame({
        'station_id': station_id,
        'timestamp': df['timestamp'],
        'latitude': df['LATITUDE'].iloc[0] if 'LATITUDE' in df.columns else np.nan,
        'longitude': df['LONGITUDE'].iloc[0] if 'LONGITUDE' in df.columns else np.nan,
        'elevation': df['ELEVATION'].iloc[0] if 'ELEVATION' in df.columns else np.nan,
        'name': df['NAME'].iloc[0] if 'NAME' in df.columns else '',
        'temperature_2m': df['TMP'].apply(parse_tmp) if 'TMP' in df.columns else np.nan,
        'dewpoint_2m': df['DEW'].apply(parse_dew) if 'DEW' in df.columns else np.nan,
        'wind_direction_10m': df['WND'].apply(parse_wind_direction) if 'WND' in df.columns else np.nan,
        'wind_speed_10m': df['WND'].apply(parse_wind_speed) if 'WND' in df.columns else np.nan,
        'surface_pressure': df['SLP'].apply(parse_slp) if 'SLP' in df.columns else np.nan
    })
    
    # Calculate relative humidity from temperature and dew point
    def calc_rh(row):
        T = row['temperature_2m']
        Td = row['dewpoint_2m']
        if pd.isna(T) or pd.isna(Td):
            return np.nan
        try:
            e_sat = 6.112 * np.exp(17.67 * T / (T + 243.5))
            e_act = 6.112 * np.exp(17.67 * Td / (Td + 243.5))
            rh = 100.0 * (e_act / e_sat)
            if 0 <= rh <= 100:
                return rh
        except:
            pass
        return np.nan
    
    result['relative_humidity_2m'] = result.apply(calc_rh, axis=1)
    
    return result

# Check if local data exists
if NOAA_LOCAL_DATA_DIR.exists():
    csv_files = list(NOAA_LOCAL_DATA_DIR.glob("*.csv"))
    print(f"Found {len(csv_files)} CSV files in {NOAA_LOCAL_DATA_DIR}")
    
    if len(csv_files) > 0:
        print(f"\nSample filenames: {[f.name for f in csv_files[:5]]}")
else:
    csv_files = []
    print(f"Local data directory not found: {NOAA_LOCAL_DATA_DIR}")
    print("Please download data from: https://www.ncei.noaa.gov/data/global-hourly/access/2022/")

Found 1090 CSV files in C:\Users\Kata\Desktop\earth-sgnn\data\noaa-data

Sample filenames: ['01001099999.csv', '01001499999.csv', '01002099999.csv', '01003099999.csv', '01006099999.csv']


In [5]:
# Load all local CSV files
if csv_files and len(csv_files) > 0:
    print("=" * 60)
    print(f"Loading {len(csv_files)} NOAA Global Hourly CSV files...")
    print("=" * 60)
    
    all_data = []
    station_metadata = []
    failed_files = []
    
    # Process files with progress bar
    for file_path in tqdm(csv_files, desc="Loading CSV files"):
        try:
            df = parse_noaa_global_hourly_csv(file_path)
            if df is not None and len(df) > 0:
                all_data.append(df)
                # Extract metadata for first observation
                station_metadata.append({
                    'station_id': df['station_id'].iloc[0],
                    'name': df['name'].iloc[0],
                    'latitude': df['latitude'].iloc[0],
                    'longitude': df['longitude'].iloc[0],
                    'elevation': df['elevation'].iloc[0],
                    'n_observations': len(df),
                    'start_date': df['timestamp'].min(),
                    'end_date': df['timestamp'].max()
                })
        except Exception as e:
            failed_files.append((file_path.name, str(e)))
    
    if all_data:
        # Combine all data
        combined_df = pd.concat(all_data, ignore_index=True)
        selected_stations = pd.DataFrame(station_metadata)
        
        print(f"\n✓ Successfully loaded data from {len(all_data)} stations")
        print(f"✓ Total observations: {len(combined_df):,}")
        print(f"✓ Date range: {combined_df['timestamp'].min()} to {combined_df['timestamp'].max()}")
        
        if failed_files:
            print(f"\n⚠ Failed to parse {len(failed_files)} files")
            if len(failed_files) <= 5:
                for fname, err in failed_files:
                    print(f"  - {fname}: {err}")
    else:
        print("\nNo data could be loaded from CSV files.")
        combined_df = None
        selected_stations = None
else:
    print("\nNo CSV files found. Please download data first.")
    combined_df = None
    selected_stations = None

Loading 1090 NOAA Global Hourly CSV files...


Loading CSV files: 100%|██████████| 1090/1090 [03:14<00:00,  5.60it/s]



✓ Successfully loaded data from 1090 stations
✓ Total observations: 9,612,898
✓ Date range: 2022-01-01 00:00:00 to 2022-12-31 23:59:00


In [6]:
# Preview the loaded data
if combined_df is not None and len(combined_df) > 0:
    print("=" * 60)
    print("Data Preview")
    print("=" * 60)
    
    print("\nSample observations:")
    display(combined_df.head(10))
    
    print("\nColumn data types:")
    print(combined_df.dtypes)
    
    print("\nStation metadata sample:")
    if selected_stations is not None:
        display(selected_stations.head(10))

Data Preview

Sample observations:


Unnamed: 0,station_id,timestamp,latitude,longitude,elevation,name,temperature_2m,dewpoint_2m,wind_direction_10m,wind_speed_10m,surface_pressure,relative_humidity_2m
0,1001099999,2022-01-01 01:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-8.7,-11.1,358.0,13.9,1014.0,82.758801
1,1001099999,2022-01-01 02:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-9.0,-11.4,353.0,12.7,1014.1,82.718506
2,1001099999,2022-01-01 05:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-9.9,-12.7,350.0,10.5,1013.8,79.975301
3,1001099999,2022-01-01 07:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-10.5,-14.1,351.0,8.5,1013.4,74.841808
4,1001099999,2022-01-01 08:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-10.5,-15.8,349.0,7.9,1013.2,65.062245
5,1001099999,2022-01-01 09:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-10.7,-17.3,2.0,6.8,1013.2,58.317554
6,1001099999,2022-01-01 10:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-11.7,-15.8,343.0,8.9,1013.4,71.58905
7,1001099999,2022-01-01 11:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-11.5,-16.2,344.0,6.0,1013.3,68.148376
8,1001099999,2022-01-01 12:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-12.4,-17.6,340.0,8.2,1012.5,65.14394
9,1001099999,2022-01-01 13:00:00,70.933333,-8.666667,9.0,"JAN MAYEN NOR NAVY, NO",-12.5,-19.3,340.0,7.4,1011.9,56.839831



Column data types:
station_id                       int64
timestamp               datetime64[us]
latitude                       float64
longitude                      float64
elevation                      float64
name                               str
temperature_2m                 float64
dewpoint_2m                    float64
wind_direction_10m             float64
wind_speed_10m                 float64
surface_pressure               float64
relative_humidity_2m           float64
dtype: object

Station metadata sample:


Unnamed: 0,station_id,name,latitude,longitude,elevation,n_observations,start_date,end_date
0,1001099999,"JAN MAYEN NOR NAVY, NO",70.933333,-8.666667,9.0,4022,2022-01-01 01:00:00,2022-12-31 23:00:00
1,1001499999,"SORSTOKKEN, NO",59.791925,5.34085,48.76,6216,2022-01-02 11:20:00,2022-12-30 18:20:00
2,1002099999,"VERLEGENHUKEN, NO",80.05,16.25,8.0,99,2022-01-08 21:00:00,2022-12-31 20:00:00
3,1003099999,"HORNSUND, NO",77.0,15.5,12.0,237,2022-01-02 06:00:00,2022-12-31 06:00:00
4,1006099999,"EDGEOYA, NO",78.25,22.816667,14.0,123,2022-01-01 17:00:00,2022-12-29 21:00:00
5,1007099999,"NY ALESUND, SV",78.916667,11.933333,7.7,977,2022-01-01 12:00:00,2022-12-31 21:00:00
6,1008099999,"LONGYEAR, SV",78.246111,15.465556,26.82,18125,2022-01-01 00:20:00,2022-12-31 23:50:00
7,1009099999,"KARL XII OYA, SV",80.65,25.0,5.0,133,2022-01-03 21:00:00,2022-12-31 00:00:00
8,1010099999,"ANDOYA, NO",69.2925,16.144167,13.1,19535,2022-01-01 00:20:00,2022-12-31 23:50:00
9,1011099999,"KVITOYA, SV",80.066667,31.5,10.0,80,2022-01-03 22:00:00,2022-12-31 23:00:00


In [7]:
# ============================================================================
# RESTORE DATA FROM LOCAL CSV FILES (if FTP download cell set combined_df to None)
# ============================================================================
# This cell fixes the issue where the FTP download cell overwrites combined_df

# Check if we need to reload from the CSV files
_need_reload = False
try:
    if combined_df is None or len(combined_df) == 0:
        _need_reload = True
except NameError:
    _need_reload = True

if _need_reload:
    print("Checking for pre-loaded CSV data...")
    # Try to reload from the noaa-data directory
    _noaa_dir = project_root / "data" / "noaa-data"
    if _noaa_dir.exists():
        _csv_files = list(_noaa_dir.glob("*.csv"))
        if len(_csv_files) > 0:
            print(f"Found {len(_csv_files)} CSV files - reloading...")
            _all_data = []
            _station_meta = []
            for _fp in tqdm(_csv_files, desc="Reloading CSV files"):
                try:
                    _df = parse_noaa_global_hourly_csv(_fp)
                    if _df is not None and len(_df) > 0:
                        _all_data.append(_df)
                        _station_meta.append({
                            'station_id': _df['station_id'].iloc[0],
                            'name': _df['name'].iloc[0],
                            'latitude': _df['latitude'].iloc[0],
                            'longitude': _df['longitude'].iloc[0],
                            'elevation': _df['elevation'].iloc[0],
                            'n_observations': len(_df)
                        })
                except:
                    pass
            if _all_data:
                combined_df = pd.concat(_all_data, ignore_index=True)
                selected_stations = pd.DataFrame(_station_meta)
                print(f"\n✓ Restored {len(combined_df):,} observations from {len(_all_data)} stations")
            else:
                print("Could not reload data")
        else:
            print("No CSV files found in data/noaa-data/")
    else:
        print(f"Directory not found: {_noaa_dir}")
else:
    print(f"✓ Data already loaded: {len(combined_df):,} observations")

✓ Data already loaded: 9,612,898 observations


In [None]:
def select_stations(
    inventory: pd.DataFrame,
    region: str = "US",
    min_lat: Optional[float] = None,
    max_lat: Optional[float] = None,
    min_lon: Optional[float] = None,
    max_lon: Optional[float] = None,
    min_elevation: Optional[float] = None,
    max_elevation: Optional[float] = None,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    max_stations: Optional[int] = None
) -> pd.DataFrame:
    """
    Select stations from inventory based on criteria.
    
    Parameters:
    -----------
    inventory : pd.DataFrame
        Station inventory DataFrame
    region : str
        Region filter (e.g., "US")
    min_lat, max_lat, min_lon, max_lon : float, optional
        Geographic bounding box
    min_elevation, max_elevation : float, optional
        Elevation range
    start_date, end_date : datetime, optional
        Required temporal coverage
    max_stations : int, optional
        Maximum number of stations to return
    
    Returns:
    --------
    pd.DataFrame
        Filtered station inventory
    """
    df = inventory.copy()
    
    # Filter by region (if country code column exists)
    if region is not None and 'CTRY' in df.columns:
        if region == "US":
            df = df[df['CTRY'] == 'US']
        # Add other region filters as needed
    
    # Geographic filters
    if 'LAT' in df.columns:
        if min_lat is not None:
            df = df[df['LAT'] >= min_lat]
        if max_lat is not None:
            df = df[df['LAT'] <= max_lat]
    
    if 'LON' in df.columns:
        if min_lon is not None:
            df = df[df['LON'] >= min_lon]
        if max_lon is not None:
            df = df[df['LON'] <= max_lon]
    
    # Elevation filter
    elev_col = None
    for col in ['ELEV(M)', 'ELEV', 'ELEVATION', 'ELEV_M']:
        if col in df.columns:
            elev_col = col
            break
    
    if elev_col:
        if min_elevation is not None:
            df = df[pd.to_numeric(df[elev_col], errors='coerce') >= min_elevation]
        if max_elevation is not None:
            df = df[pd.to_numeric(df[elev_col], errors='coerce') <= max_elevation]
    
    # Temporal coverage filter
    if start_date and 'BEGIN' in df.columns:
        # Parse BEGIN date and filter
        try:
            df['BEGIN_DATE'] = pd.to_datetime(df['BEGIN'], format='%Y%m%d', errors='coerce')
            df = df[df['BEGIN_DATE'] <= start_date]
        except:
            pass
    
    if end_date and 'END' in df.columns:
        # Parse END date and filter
        try:
            df['END_DATE'] = pd.to_datetime(df['END'], format='%Y%m%d', errors='coerce')
            df = df[(df['END_DATE'] >= end_date) | (df['END_DATE'].isna())]
        except:
            pass
    
    # Filter out invalid stations
    # Remove stations with invalid coordinates (0,0 or NaN)
    if 'LAT' in df.columns and 'LON' in df.columns:
        df = df[(df['LAT'] != 0) | (df['LON'] != 0)]  # Remove stations at (0,0)
        df = df[df['LAT'].notna() & df['LON'].notna()]  # Remove NaN coordinates
        # Remove stations with obviously invalid coordinates
        df = df[(df['LAT'].abs() <= 90) & (df['LON'].abs() <= 180)]
    
    # Remove stations with invalid station names (containing years or numbers that look like data)
    if 'STATION NAME' in df.columns:
        # Filter out rows where station name looks like data (contains "2020", "2021", etc.)
        df = df[~df['STATION NAME'].astype(str).str.contains(r'^\d{4}', na=False)]
        # Filter out rows where station name is mostly numbers/spaces
        df = df[df['STATION NAME'].astype(str).str.len() > 3]
    
    # Remove stations with invalid USAF/WBAN
    if 'USAF' in df.columns:
        df = df[df['USAF'].astype(str).str.len() >= 5]  # USAF should be at least 5 digits
    
    # Limit number of stations
    if max_stations and len(df) > max_stations:
        df = df.head(max_stations)
    
    return df.reset_index(drop=True)

# Select stations for initial download
# For prototyping, we'll select a subset of US stations
if station_inventory is not None and len(station_inventory) > 0:
    # First, check what country codes are available
    if 'CTRY' in station_inventory.columns:
        print(f"\nAvailable country codes: {station_inventory['CTRY'].value_counts().head(10)}")
    
    selected_stations = select_stations(
        station_inventory,
        region="US",
        max_stations=20  # Start with 20 stations for prototyping
    )
    
    # If no stations selected, try without region filter
    if len(selected_stations) == 0:
        print("\nNo stations found with US filter. Trying without region filter...")
        selected_stations = select_stations(
            station_inventory,
            region=None,  # No region filter
            max_stations=20
        )
    
    print(f"\nSelected {len(selected_stations)} stations")
    
    if len(selected_stations) > 0:
        print("\nSelected stations:")
        # Show available columns
        cols_to_show = ['USAF', 'WBAN', 'STATION NAME']
        if 'LAT' in selected_stations.columns:
            cols_to_show.append('LAT')
        if 'LON' in selected_stations.columns:
            cols_to_show.append('LON')
        if 'ELEV(M)' in selected_stations.columns:
            cols_to_show.append('ELEV(M)')
        if 'CTRY' in selected_stations.columns:
            cols_to_show.append('CTRY')
        
        available_cols = [c for c in cols_to_show if c in selected_stations.columns]
        if available_cols:
            print(selected_stations[available_cols].head(10))
        else:
            print(selected_stations.head(10))
    else:
        print("\nNo stations selected from inventory. Using predefined station list.")
        selected_stations = None
else:
    print("\nNote: Station inventory not available. Using predefined station list.")
    selected_stations = None

# Fallback to predefined stations if needed
# Also check if selected stations have valid data (not just inventory summary rows)
if selected_stations is None or len(selected_stations) == 0:
    use_predefined = True
else:
    # Check if stations look valid (have proper names, not year data)
    invalid_count = 0
    if 'STATION NAME' in selected_stations.columns:
        for name in selected_stations['STATION NAME'].astype(str):
            if name.startswith('202') or len(name.strip()) < 3:
                invalid_count += 1
    
    # If more than half are invalid, use predefined
    if invalid_count > len(selected_stations) * 0.5:
        print(f"\nWarning: {invalid_count}/{len(selected_stations)} stations appear invalid (inventory parsing issue).")
        use_predefined = True
    else:
        use_predefined = False

if use_predefined:
    print("\nUsing predefined station list (major US cities with known good station IDs).")
    selected_stations = pd.DataFrame({
        'USAF': ['725030', '722950', '725300', '722430', '722780'],
        'WBAN': ['14732', '23174', '14734', '12960', '23183'],
        'STATION NAME': ['NEW YORK', 'LOS ANGELES', 'CHICAGO', 'HOUSTON', 'PHOENIX'],
        'LAT': [40.78, 34.05, 41.98, 29.98, 33.43],
        'LON': [-73.97, -118.24, -87.90, -95.36, -112.01],
        'ELEV(M)': [10, 71, 182, 13, 331],
        'CTRY': ['US', 'US', 'US', 'US', 'US']
    })
    print(f"Using {len(selected_stations)} predefined stations")

In [None]:
def parse_isd_hourly_line(line: str) -> Optional[Dict]:
    """
    Parse a single line from ISD hourly data file.
    
    Handles both ISD-Lite and full ISD formats. Detects format based on line length.
    
    Full ISD format (typical line length ~400 chars):
    - Position 0-5: USAF (6 chars)
    - Position 7-11: WBAN (5 chars)
    - Position 13-16: Year (4 chars)
    - Position 17-18: Month (2 chars)
    - Position 19-20: Day (2 chars)
    - Position 21-22: Hour (2 chars)
    - Temperature: Look for "+NNNN" or "-NNNN" patterns in the line
    - Wind, pressure: Similar pattern matching
    
    ISD-Lite format (typical line length ~100 chars):
    - Similar positions but shorter format
    
    Parameters:
    -----------
    line : str
        Single line from ISD file
    
    Returns:
    --------
    dict or None
        Parsed observation data
    """
    if len(line) < 80:  # Minimum line length
        return None
    
    try:
        # Extract basic fields - same for both formats
        # USAF: positions 0-5 (6 chars)
        usaf = line[0:6].strip()
        # WBAN: positions 7-11 (5 chars, skip position 6 which may be a dash or space)
        wban = line[7:12].strip() if len(line) > 11 else ''
        # Year: positions 13-16
        year_str = line[13:17].strip() if len(line) > 16 else ''
        # Month: positions 17-18
        month_str = line[17:19].strip() if len(line) > 18 else ''
        # Day: positions 19-20
        day_str = line[19:21].strip() if len(line) > 20 else ''
        # Hour: positions 21-22
        hour_str = line[21:23].strip() if len(line) > 22 else ''
        
        # Validate and convert date/time
        if not (year_str.isdigit() and month_str.isdigit() and day_str.isdigit() and hour_str.isdigit()):
            return None
        
        year = int(year_str)
        month = int(month_str)
        day = int(day_str)
        hour = int(hour_str)
        
        # Validate date
        if not (1900 <= year <= 2100 and 1 <= month <= 12 and 1 <= day <= 31 and 0 <= hour <= 23):
            return None
        
        # Create timestamp
        try:
            timestamp = datetime(year, month, day, hour)
        except ValueError:
            return None
        
        # Detect format: full ISD has longer lines with more fields
        is_full_format = len(line) > 200
        
        # Parse temperature - look for patterns like +NNNN or -NNNN
        # In full ISD, temperature can appear in multiple places, look for the air temperature field
        temperature_2m = np.nan
        dewpoint_2m = np.nan
        surface_pressure = np.nan
        wind_direction_10m = np.nan
        wind_speed_10m = np.nan
        
        if is_full_format:
            # Full ISD format - use regex to find temperature patterns
            # Air temperature typically appears as +NNNN or -NNNN (in tenths of degrees C)
            # Look for pattern: sign + 4 digits, preceded by certain markers
            import re
            
            # Temperature: Look for +NNNN or -NNNN patterns (air temperature)
            # In full ISD, air temp is often around position 87-92 or in ADD section
            temp_match = re.search(r'([+-]\d{4,5})', line[80:120])  # Search in common temp area
            if temp_match:
                temp_str = temp_match.group(1)
                try:
                    temp_val = float(temp_str) / 10.0
                    if -100 <= temp_val <= 100:
                        temperature_2m = temp_val
                except:
                    pass
            
            # Dew point: similar pattern
            dew_match = re.search(r'([+-]\d{3,4})', line[90:130])
            if dew_match:
                dew_str = dew_match.group(1)
                try:
                    dew_val = float(dew_str) / 10.0
                    if -100 <= dew_val <= 100:
                        dewpoint_2m = dew_val
                except:
                    pass
            
            # Sea level pressure: Look for patterns like +NNNN (in tenths of hPa)
            # Pressure often appears around position 99-104
            press_match = re.search(r'([+-]\d{4,5})', line[95:115])
            if press_match:
                press_str = press_match.group(1)
                try:
                    press_val = float(press_str) / 10.0
                    if 500 <= press_val <= 1100:
                        surface_pressure = press_val
                except:
                    pass
            
            # Wind direction: Look for 3-digit patterns (degrees, 0-360)
            wind_dir_match = re.search(r'\b(\d{3})\b', line[60:70])
            if wind_dir_match:
                wind_dir_str = wind_dir_match.group(1)
                try:
                    wind_dir_val = float(wind_dir_str)
                    if 0 <= wind_dir_val <= 360 and wind_dir_str != '999':
                        wind_direction_10m = wind_dir_val
                except:
                    pass
            
            # Wind speed: Look for patterns like NNNN (in tenths of m/s)
            wind_speed_match = re.search(r'\b(\d{3,4})\b', line[65:75])
            if wind_speed_match:
                wind_speed_str = wind_speed_match.group(1)
                try:
                    wind_speed_val = float(wind_speed_str) / 10.0
                    if 0 <= wind_speed_val <= 200 and wind_speed_str != '9999':
                        wind_speed_10m = wind_speed_val
                except:
                    pass
        else:
            # ISD-Lite format - use fixed positions
            # Temperature: position 24-28
            temp_str = line[24:29].strip() if len(line) > 28 else ''
            if temp_str and temp_str not in ['+9999', '99999', '+999']:
                try:
                    temperature_2m = float(temp_str) / 10.0
                    if not (-100 <= temperature_2m <= 100):
                        temperature_2m = np.nan
                except:
                    temperature_2m = np.nan
            
            # Dew point: position 30-33
            dewpoint_str = line[30:34].strip() if len(line) > 33 else ''
            if dewpoint_str and dewpoint_str not in ['+999', '9999']:
                try:
                    dewpoint_2m = float(dewpoint_str) / 10.0
                    if not (-100 <= dewpoint_2m <= 100):
                        dewpoint_2m = np.nan
                except:
                    dewpoint_2m = np.nan
            
            # Pressure: position 35-38
            pressure_str = line[35:39].strip() if len(line) > 38 else ''
            if pressure_str and pressure_str not in ['+999', '9999']:
                try:
                    surface_pressure = float(pressure_str) / 10.0
                    if not (500 <= surface_pressure <= 1100):
                        surface_pressure = np.nan
                except:
                    surface_pressure = np.nan
            
            # Wind direction: position 40-43
            wind_dir_str = line[40:44].strip() if len(line) > 43 else ''
            if wind_dir_str and wind_dir_str not in ['999', '9999']:
                try:
                    wind_direction_10m = float(wind_dir_str)
                    if not (0 <= wind_direction_10m <= 360):
                        wind_direction_10m = np.nan
                except:
                    wind_direction_10m = np.nan
            
            # Wind speed: position 46-50
            wind_speed_str = line[46:51].strip() if len(line) > 50 else ''
            if wind_speed_str and wind_speed_str not in ['9999', '99999']:
                try:
                    wind_speed_10m = float(wind_speed_str) / 10.0
                    if not (0 <= wind_speed_10m <= 200):
                        wind_speed_10m = np.nan
                except:
                    wind_speed_10m = np.nan
        
        # Calculate relative humidity from temperature and dew point
        if not np.isnan(temperature_2m) and not np.isnan(dewpoint_2m):
            try:
                e_sat = 6.112 * np.exp(17.67 * temperature_2m / (temperature_2m + 243.5))
                e_act = 6.112 * np.exp(17.67 * dewpoint_2m / (dewpoint_2m + 243.5))
                relative_humidity_2m = 100.0 * (e_act / e_sat)
                if not (0 <= relative_humidity_2m <= 100):
                    relative_humidity_2m = np.nan
            except:
                relative_humidity_2m = np.nan
        else:
            relative_humidity_2m = np.nan
        
        # Only return if we have at least one valid measurement
        if (np.isnan(temperature_2m) and np.isnan(dewpoint_2m) and 
            np.isnan(surface_pressure) and np.isnan(wind_speed_10m) and 
            np.isnan(wind_direction_10m)):
            return None
        
        return {
            'station_id': f"{usaf}-{wban}",
            'timestamp': timestamp,
            'temperature_2m': temperature_2m,
            'dewpoint_2m': dewpoint_2m,
            'relative_humidity_2m': relative_humidity_2m,
            'wind_speed_10m': wind_speed_10m,
            'wind_direction_10m': wind_direction_10m,
            'surface_pressure': surface_pressure
        }
    except Exception as e:
        return None

def get_available_years() -> List[int]:
    """
    Check which years are available on the NOAA ISD HTTPS server.
    
    Returns:
    --------
    List[int]
        List of available years
    """
    # Try to get available years from HTTPS endpoint
    # NCEI typically has data from 1900s to present
    current_year = datetime.now().year
    # For now, return a reasonable range - we'll verify existence when downloading
    # Common years with data: 2000 to current
    return list(range(2000, current_year + 1))

def download_isd_station_data(
    usaf: str,
    wban: str,
    year: int,
    output_dir: Optional[Path] = None,
    max_retries: int = 3
) -> Optional[pd.DataFrame]:
    """
    Download ISD hourly data for a specific station and year using HTTPS.
    
    Uses NCEI HTTPS endpoint with retry logic and exponential backoff.
    
    Parameters:
    -----------
    usaf : str
        USAF station identifier
    wban : str
        WBAN station identifier
    year : int
        Year to download
    output_dir : Path, optional
        Directory to save raw files
    max_retries : int
        Maximum number of retry attempts for failed downloads
    
    Returns:
    --------
    pd.DataFrame or None
        Parsed hourly observations
    """
    if output_dir is None:
        output_dir = RAW_DATA_DIR / "noaa_isd_raw"
    output_dir.mkdir(parents=True, exist_ok=True)
    
    station_id = f"{usaf}-{wban}"
    filename = f"{usaf}-{wban}-{year}.gz"
    local_file = output_dir / filename
    
    # NCEI HTTPS endpoint for global-hourly data
    base_url = "https://www.ncei.noaa.gov/data/global-hourly/access"
    url = f"{base_url}/{year}/{filename}"
    
    # Download with retry logic
    for attempt in range(max_retries):
        try:
            # Use requests with streaming and timeout
            response = requests.get(
                url,
                stream=True,
                timeout=(30, 300),  # (connect timeout, read timeout) - 5 min for large files
                headers={'User-Agent': 'Mozilla/5.0 (compatible; ISD-Downloader/1.0)'}
            )
            response.raise_for_status()
            
            # Download to temporary file first, then rename on success
            temp_file = local_file.with_suffix('.tmp')
            total_size = 0
            expected_size = int(response.headers.get('Content-Length', 0))
            
            with open(temp_file, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
                        total_size += len(chunk)
            
            # Verify file size
            if expected_size > 0 and total_size != expected_size:
                print(f"  Warning: Downloaded size ({total_size}) != expected ({expected_size})")
                # Still try to use it if it's reasonable (> 1KB)
                if total_size < 1024:
                    temp_file.unlink()
                    raise ValueError(f"File too small: {total_size} bytes (likely error page)")
            
            # Verify file is not empty
            if total_size == 0:
                temp_file.unlink()
                raise ValueError("Downloaded file is empty")
            
            # Rename temp file to final name
            if temp_file.exists():
                if local_file.exists():
                    local_file.unlink()
                temp_file.rename(local_file)
            
            print(f"  Downloaded {filename} ({total_size:,} bytes)")
            break  # Success, exit retry loop
            
        except requests.exceptions.Timeout as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
                print(f"  Timeout (attempt {attempt + 1}/{max_retries}), retrying in {wait_time}s...")
                time.sleep(wait_time)
                continue
            else:
                print(f"  Download timeout after {max_retries} attempts: {e}")
                if local_file.exists():
                    local_file.unlink()
                return None
                
        except requests.exceptions.RequestException as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"  Network error (attempt {attempt + 1}/{max_retries}), retrying in {wait_time}s...")
                time.sleep(wait_time)
                continue
            else:
                # Check if it's a 404 (file doesn't exist) vs other error
                if hasattr(e, 'response') and e.response is not None:
                    if e.response.status_code == 404:
                        # File doesn't exist - this is "no data", not a network error
                        return None
                print(f"  Download failed after {max_retries} attempts: {e}")
                if local_file.exists():
                    local_file.unlink()
                return None
                
        except Exception as e:
            print(f"  Unexpected error during download: {e}")
            if local_file.exists():
                local_file.unlink()
            return None
    
    # Verify file exists and has reasonable size before parsing
    if not local_file.exists():
        return None
    
    file_size = local_file.stat().st_size
    if file_size < 1024:  # Less than 1KB is suspicious
        print(f"  Warning: File size is very small ({file_size} bytes), may be an error page")
        # Still try to parse it - debugging will show what's in it
        
    # Parse file (handle both .gz and uncompressed)
    observations = []
    sample_lines = []
    try:
        if filename.endswith('.gz'):
            with gzip.open(local_file, 'rt', encoding='latin-1', errors='ignore') as f:
                line_count = 0
                for line in f:
                    line_count += 1
                    # Save first few lines for debugging
                    if line_count <= 3:
                        sample_lines.append(line.rstrip())
                    obs = parse_isd_hourly_line(line)
                    if obs:
                        observations.append(obs)
        else:
            with open(local_file, 'r', encoding='latin-1', errors='ignore') as f:
                line_count = 0
                for line in f:
                    line_count += 1
                    # Save first few lines for debugging
                    if line_count <= 3:
                        sample_lines.append(line.rstrip())
                    obs = parse_isd_hourly_line(line)
                    if obs:
                        observations.append(obs)
    except Exception as e:
        print(f"  Error parsing file: {e}")
        import traceback
        traceback.print_exc()
        return None
    
    # Debug: Show sample lines if no observations found
    if len(observations) == 0:
        print(f"  No valid observations found after parsing")
        if sample_lines:
            print(f"  Sample lines from file (first 3, length {len(sample_lines[0]) if sample_lines else 0}):")
            for i, line in enumerate(sample_lines):
                print(f"    Line {i+1}: {repr(line[:150])}")
        else:
            print(f"  File appears empty or unreadable")
        return None
    
    df = pd.DataFrame(observations)
    df['usaf'] = usaf
    df['wban'] = wban
    
    return df

# Check available years
print("=" * 60)
print("Checking Available Years on NOAA ISD Server")
print("=" * 60)
available_years = get_available_years()
print(f"Available years: {available_years[-10:]}")  # Show last 10 years

# Filter to only download years that exist and are in our range
years_to_download = [y for y in range(START_YEAR, END_YEAR + 1) if y in available_years]
print(f"Years to download: {years_to_download}")

if not years_to_download:
    print(f"\nWarning: No data available for years {START_YEAR}-{END_YEAR}")
    print(f"Available years: {available_years}")
    print("Adjusting date range to use available years...")
    if available_years:
        years_to_download = available_years[-2:]  # Use last 2 available years
        START_YEAR = min(years_to_download)
        END_YEAR = max(years_to_download)
        START_DATE = datetime(START_YEAR, 1, 1)
        END_DATE = datetime(END_YEAR, 12, 31)
        print(f"Using years: {years_to_download}")

# Download data for selected stations
print("\n" + "=" * 60)
print("Downloading NOAA ISD Hourly Data")
print("=" * 60)

all_data = []

for idx, station in tqdm(selected_stations.iterrows(), total=len(selected_stations), desc="Stations"):
    usaf = str(station.get('USAF', '')).zfill(6)
    wban = str(station.get('WBAN', '')).zfill(5)
    station_name = station.get('STATION NAME', f"{usaf}-{wban}")
    
    # Skip invalid stations
    if not usaf.isdigit() or len(usaf) < 5:
        print(f"\nSkipping invalid station: {station_name} (USAF: {usaf})")
        continue
    
    print(f"\nStation: {station_name} ({usaf}-{wban})")
    
    station_data_found = False
    for year in years_to_download:
        try:
            df_year = download_isd_station_data(usaf, wban, year)
            if df_year is not None and len(df_year) > 0:
                # Filter by date range
                df_year = df_year[
                    (df_year['timestamp'] >= START_DATE) & 
                    (df_year['timestamp'] <= END_DATE)
                ]
                if len(df_year) > 0:
                    all_data.append(df_year)
                    print(f"  {year}: {len(df_year)} observations")
                    station_data_found = True
        except Exception as e:
            print(f"  Error for {year}: {e}")
            continue
        time.sleep(0.5)  # Be polite to server
    
    if not station_data_found:
        print(f"  No data found for this station")

# Combine all data
if all_data:
    combined_df = pd.concat(all_data, ignore_index=True)
    print(f"\nTotal observations: {len(combined_df)}")
    print(f"Date range: {combined_df['timestamp'].min()} to {combined_df['timestamp'].max()}")
    print(f"Stations: {combined_df['station_id'].nunique()}")
else:
    print("\nNo data downloaded. Check station IDs and FTP connection.")
    combined_df = None

## 5. Data Validation and Quality Checks

Check data quality, missing values, and temporal coverage.

In [8]:
if combined_df is not None and len(combined_df) > 0:
    print("=" * 60)
    print("Data Quality Summary")
    print("=" * 60)
    
    # Basic statistics
    print(f"\nTotal observations: {len(combined_df):,}")
    print(f"Unique stations: {combined_df['station_id'].nunique()}")
    print(f"Date range: {combined_df['timestamp'].min()} to {combined_df['timestamp'].max()}")
    
    # Missing data analysis
    print("\nMissing data percentage:")
    missing_pct = (combined_df.isnull().sum() / len(combined_df) * 100).round(2)
    print(missing_pct[missing_pct > 0])
    
    # Data coverage by station
    print("\nObservations per station:")
    station_counts = combined_df.groupby('station_id').size().sort_values(ascending=False)
    print(station_counts.head(10))
    
    # Temporal coverage
    print("\nTemporal coverage (observations per day):")
    daily_counts = combined_df.groupby(combined_df['timestamp'].dt.date).size()
    print(f"Mean: {daily_counts.mean():.1f}")
    print(f"Min: {daily_counts.min()}")
    print(f"Max: {daily_counts.max()}")
    
    # Variable statistics
    print("\nVariable statistics:")
    numeric_cols = ['temperature_2m', 'relative_humidity_2m', 'wind_speed_10m', 
                    'wind_direction_10m', 'surface_pressure']
    print(combined_df[numeric_cols].describe())
else:
    print("No data available for validation.")

Data Quality Summary

Total observations: 9,612,898
Unique stations: 1090
Date range: 2022-01-01 00:00:00 to 2022-12-31 23:59:00

Missing data percentage:
temperature_2m           2.34
dewpoint_2m              3.34
wind_direction_10m      11.16
wind_speed_10m           6.06
surface_pressure        55.88
relative_humidity_2m     3.37
dtype: float64

Observations per station:
station_id
6235099999    40458
6242099999    40327
6060099999    38197
6030099999    34401
6110099999    27543
3969099999    26082
6201099999    26078
3962099999    26077
6203099999    26073
6211099999    26071
dtype: int64

Temporal coverage (observations per day):
Mean: 26336.7
Min: 19398
Max: 29683

Variable statistics:
       temperature_2m  relative_humidity_2m  wind_speed_10m  \
count    9.388317e+06          9.288997e+06    9.030249e+06   
mean     7.164884e+00          7.962010e+01    4.843662e+00   
std      8.497164e+00          1.566240e+01    3.685883e+00   
min     -5.990000e+01          4.715105e-04   

## 6. Save Processed Data

Save the cleaned data to parquet format for efficient storage and loading.

In [9]:
if combined_df is not None and len(combined_df) > 0:
    # Save raw data
    raw_data_file = RAW_DATA_DIR / "noaa_isd_raw_data.parquet"
    combined_df.to_parquet(raw_data_file, index=False)
    print(f"Raw data saved to: {raw_data_file}")
    print(f"File size: {raw_data_file.stat().st_size / (1024*1024):.2f} MB")
    
    # Also save as CSV for easy inspection (sample)
    csv_file = RAW_DATA_DIR / "noaa_isd_raw_data_sample.csv"
    combined_df.head(10000).to_csv(csv_file, index=False)
    print(f"Sample CSV saved to: {csv_file}")
    
    # Save station metadata
    if selected_stations is not None:
        metadata_file = RAW_DATA_DIR / "noaa_isd_station_metadata.csv"
        selected_stations.to_csv(metadata_file, index=False)
        print(f"Station metadata saved to: {metadata_file}")
    
    print("\n" + "=" * 60)
    print("Data acquisition complete!")
    print("=" * 60)
    print(f"\nNext steps:")
    print("1. Review data quality in preprocessing notebook")
    print("2. Handle missing values and outliers")
    print("3. Create graph structure from station coordinates")
else:
    print("No data to save. Please check:")
    print("1. Station IDs are correct")
    print("2. FTP connection is working")
    print("3. Data files exist for selected years")

Raw data saved to: C:\Users\Kata\Desktop\earth-sgnn\data\raw\noaa_isd_raw_data.parquet
File size: 78.41 MB
Sample CSV saved to: C:\Users\Kata\Desktop\earth-sgnn\data\raw\noaa_isd_raw_data_sample.csv
Station metadata saved to: C:\Users\Kata\Desktop\earth-sgnn\data\raw\noaa_isd_station_metadata.csv

Data acquisition complete!

Next steps:
1. Review data quality in preprocessing notebook
2. Handle missing values and outliers
3. Create graph structure from station coordinates
