# NOAA Weather Station Data Tool

This notebook retrieves information about all NOAA weather stations including their geographical information (latitude, longitude, elevation, etc.) and saves the data to a DataFrame. It can also fetch historical weather data for these stations.

## Overview

This tool is designed to:
1. Fetch all NOAA weather stations with geographical data
2. Retrieve historical weather data for these stations
3. Create pandas DataFrames that can be converted to temp tables in Databricks

## Features

* Retrieves metadata for all NOAA weather stations
* Gets historical weather data for stations (temperature, precipitation, snow, wind)
* Exports data to CSV files for analysis
* Handles NOAA API pagination and rate limits
* Easily configurable via parameters at the top of the notebook

## Requirements

* Python 3.6+
* pandas
* requests

Install required packages with:
```
pip install pandas requests
```

## Getting Started

1. You need a NOAA API token. Register at: https://www.ncdc.noaa.gov/cdo-web/token
2. Enter your token in the configuration section below
3. Adjust other parameters as needed
4. Run the notebook cells sequentially

For Databricks, copy the entire notebook to your Databricks workspace and use the temp table creation cells.

## Setup and Configuration

First, we'll import the necessary libraries and set up configuration parameters.

In [None]:
# Import required libraries
import os
import time
import json
from datetime import datetime, timedelta
import pandas as pd
import requests

# Configuration parameters - modify these as needed
# =================================================

# Your NOAA API token - replace with your actual token
NOAA_API_TOKEN = "YOUR_NOAA_API_TOKEN_HERE"

# Historical data parameters
GET_HISTORICAL_DATA = True  # Set to False to only get station information
DAYS_OF_HISTORICAL_DATA = 90  # Number of days of historical data to fetch
MAX_STATIONS = 100  # Maximum number of stations to get historical data for (0 for all)

# Data types to retrieve
# TMAX: Maximum temperature
# TMIN: Minimum temperature
# PRCP: Precipitation
# SNOW: Snowfall
# AWND: Average daily wind speed
DATA_TYPES = "TMAX,TMIN,PRCP,SNOW,AWND"

# API parameters
API_LIMIT = 1000  # Number of results per API request
API_RATE_LIMIT_DELAY = 0.2  # Seconds to wait between API calls (NOAA limits to 5 requests/sec)

## NOAA Data Fetcher Class

Now we'll create a class to handle fetching data from the NOAA API.

In [None]:
class NoaaDataFetcher:
    """Class to fetch NOAA station and historical weather data."""
    
    BASE_URL = "https://www.ncdc.noaa.gov/cdo-web/api/v2"
    
    def __init__(self, token):
        """Initialize with an API token."""
        self.token = token
        if not self.token:
            raise ValueError("NOAA API token is required.")
        self.headers = {"token": self.token}
    
    def get_stations(self, limit=1000, offset=1):
        """
        Fetch weather stations data from NOAA API.
        
        Args:
            limit: Number of stations to retrieve per request
            offset: Starting offset for pagination
            
        Returns:
            List of station data dictionaries
        """
        stations = []
        url = f"{self.BASE_URL}/stations"
        
        while True:
            params = {
                "limit": limit,
                "offset": offset,
                # Include stations with data in GHCND dataset
                "datasetid": "GHCND",
                # Sort by station ID
                "sortfield": "id"
            }
            
            print(f"Fetching stations {offset} to {offset + limit - 1}...")
            response = requests.get(url, headers=self.headers, params=params)
            
            if response.status_code != 200:
                print(f"Error retrieving stations: {response.status_code}")
                print(response.text)
                break
                
            data = response.json()
            if "results" not in data or not data["results"]:
                break
                
            stations.extend(data["results"])
            print(f"Retrieved {len(data['results'])} stations")
            
            # If we've received fewer results than the limit, we've reached the end
            if len(data["results"]) < limit:
                break
                
            offset += limit
            # Respect NOAA API rate limits
            time.sleep(API_RATE_LIMIT_DELAY)
        
        print(f"Total stations retrieved: {len(stations)}")
        return stations

    def get_historical_data(self, station_id, start_date, end_date, datatype):
        """
        Fetch historical weather data for a specific station.
        
        Args:
            station_id: NOAA station identifier
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format
            datatype: Comma-separated list of data types to retrieve
            
        Returns:
            DataFrame with historical weather data or None if error
        """
        url = f"{self.BASE_URL}/data"
        params = {
            "datasetid": "GHCND",  # Global Historical Climatology Network Daily
            "stationid": station_id,
            "startdate": start_date,
            "enddate": end_date,
            "datatypeid": datatype,
            "limit": 1000,
            "units": "standard"
        }
        
        try:
            response = requests.get(url, headers=self.headers, params=params)
            
            if response.status_code != 200:
                print(f"Error retrieving data for station {station_id}: {response.status_code}")
                return None
                
            data = response.json()
            if "results" not in data or not data["results"]:
                print(f"No data available for station {station_id}")
                return None
                
            # Convert to DataFrame
            df = pd.json_normalize(data["results"])
            # Add station ID as a column
            df["station_id"] = station_id
            return df
            
        except Exception as e:
            print(f"Error processing data for station {station_id}: {str(e)}")
            return None

    def process_stations_to_df(self, stations):
        """
        Convert stations list to a pandas DataFrame with clean columns.
        
        Args:
            stations: List of station dictionaries from the API
            
        Returns:
            DataFrame with station information
        """
        # Convert to DataFrame
        df = pd.DataFrame(stations)
        
        # Handle nested location data
        if 'elevation' in df.columns:
            df['elevation_meters'] = df['elevation']
        
        if 'location' in df.columns:
            # Extract latitude and longitude from the location dictionary
            df['latitude'] = df['location'].apply(lambda x: x.get('latitude') if isinstance(x, dict) else None)
            df['longitude'] = df['location'].apply(lambda x: x.get('longitude') if isinstance(x, dict) else None)
            df.drop('location', axis=1, inplace=True)
        
        # Convert dates to datetime
        for date_col in ['mindate', 'maxdate']:
            if date_col in df.columns:
                df[date_col] = pd.to_datetime(df[date_col])
        
        return df

## Fetch Station Data

Now we'll use the NoaaDataFetcher class to retrieve data for all NOAA weather stations.

In [None]:
# Initialize fetcher with configured token
fetcher = NoaaDataFetcher(token=NOAA_API_TOKEN)

# Get all stations
stations = fetcher.get_stations(limit=API_LIMIT)

# Convert to DataFrame
stations_df = fetcher.process_stations_to_df(stations)

# Display the first few rows of the stations DataFrame
stations_df.head()

## Station Data Overview

Let's explore the station data to understand what information we have.

In [None]:
# Display DataFrame info
print("DataFrame Information:")
print(f"Number of stations: {len(stations_df)}")
print("\nColumns:")
for col in stations_df.columns:
    print(f"- {col}")

# Show basic statistics for numerical columns
print("\nBasic Statistics:")
stations_df.describe()

## Fetch Historical Weather Data

Now we'll fetch historical weather data for the stations. This can be time-consuming for a large number of stations, so we can limit the number of stations to process.

In [None]:
# Only run this cell if GET_HISTORICAL_DATA is True
if GET_HISTORICAL_DATA:
    # Calculate date range
    end_date = datetime.now().strftime("%Y-%m-%d")
    start_date = (datetime.now() - timedelta(days=DAYS_OF_HISTORICAL_DATA)).strftime("%Y-%m-%d")
    
    all_historical_data = []
    
    # Limit number of stations if specified
    station_ids = stations_df['id'].tolist()
    if MAX_STATIONS > 0:
        station_ids = station_ids[:MAX_STATIONS]
    
    print(f"Fetching historical data for {len(station_ids)} stations...")
    print(f"Date range: {start_date} to {end_date}")
    
    for i, station_id in enumerate(station_ids):
        print(f"Processing station {i+1}/{len(station_ids)}: {station_id}")
        hist_data = fetcher.get_historical_data(station_id, start_date, end_date, DATA_TYPES)
        
        if hist_data is not None:
            all_historical_data.append(hist_data)
        
        # Respect NOAA API rate limits
        time.sleep(API_RATE_LIMIT_DELAY)
    
    if all_historical_data:
        # Combine all historical data
        historical_df = pd.concat(all_historical_data, ignore_index=True)
        print(f"Retrieved historical data: {len(historical_df)} records from {len(station_ids)} stations")
        
        # Display the first few rows
        historical_df.head()
    else:
        print("No historical data was retrieved.")
else:
    print("Historical data retrieval is disabled. Set GET_HISTORICAL_DATA = True to enable.")

## Historical Data Overview

If we've retrieved historical data, let's explore it to understand the structure and contents.

In [None]:
# Only run this cell if GET_HISTORICAL_DATA is True and we have historical data
if GET_HISTORICAL_DATA and 'historical_df' in locals() and len(historical_df) > 0:
    # Display DataFrame info
    print("Historical DataFrame Information:")
    print(f"Number of records: {len(historical_df)}")
    print("\nColumns:")
    for col in historical_df.columns:
        print(f"- {col}")
    
    # Show basic statistics
    print("\nBasic Statistics:")
    historical_df.describe()
    
    # Distribution by data type
    print("\nDistribution by Data Type:")
    print(historical_df['datatype'].value_counts())
    
    # Distribution by station
    print("\nTop 10 Stations by Record Count:")
    print(historical_df['station_id'].value_counts().head(10))
else:
    print("No historical data available for analysis.")

## Prepare Data for Databricks

Now we'll prepare the data to be easily converted to temporary tables in Databricks. This includes ensuring proper data types and organizing the data structure.

In [None]:
# Function to prepare station data for Databricks
def prepare_stations_for_databricks(df):
    """Prepare station DataFrame for use in Databricks."""
    # Make a copy to avoid modifying the original
    df_prep = df.copy()
    
    # Ensure proper data types
    if 'latitude' in df_prep.columns:
        df_prep['latitude'] = pd.to_numeric(df_prep['latitude'], errors='coerce')
    if 'longitude' in df_prep.columns:
        df_prep['longitude'] = pd.to_numeric(df_prep['longitude'], errors='coerce')
    if 'elevation_meters' in df_prep.columns:
        df_prep['elevation_meters'] = pd.to_numeric(df_prep['elevation_meters'], errors='coerce')
    
    # Add data collection timestamp
    df_prep['data_collection_ts'] = datetime.now()
    
    return df_prep

# Function to prepare historical data for Databricks
def prepare_historical_for_databricks(df):
    """Prepare historical DataFrame for use in Databricks."""
    if df is None or len(df) == 0:
        return None
    
    # Make a copy to avoid modifying the original
    df_prep = df.copy()
    
    # Convert values to appropriate numeric types
    if 'value' in df_prep.columns:
        df_prep['value'] = pd.to_numeric(df_prep['value'], errors='coerce')
    
    # Ensure date is in proper datetime format
    if 'date' in df_prep.columns:
        df_prep['date'] = pd.to_datetime(df_prep['date'], errors='coerce')
    
    # Add data collection timestamp
    df_prep['data_collection_ts'] = datetime.now()
    
    return df_prep

# Prepare the station data
stations_db_df = prepare_stations_for_databricks(stations_df)
print(f"Prepared stations DataFrame for Databricks: {len(stations_db_df)} rows")

# Prepare historical data if available
if GET_HISTORICAL_DATA and 'historical_df' in locals() and len(historical_df) > 0:
    historical_db_df = prepare_historical_for_databricks(historical_df)
    print(f"Prepared historical DataFrame for Databricks: {len(historical_db_df)} rows")
else:
    print("No historical data to prepare for Databricks.")

## Create Temp Tables in Databricks

When you copy this notebook to Databricks, you can use the following code to create temporary tables from the DataFrames.

In [None]:
# In Databricks, uncomment and run this code:

# Create temp table from stations DataFrame
# stations_db_df.createOrReplaceTempView("noaa_stations")
# display(spark.sql("SELECT * FROM noaa_stations LIMIT 10"))

# Create temp table from historical DataFrame if available
# if 'historical_db_df' in locals() and historical_db_df is not None:
#     historical_db_df.createOrReplaceTempView("noaa_historical_data")
#     display(spark.sql("SELECT * FROM noaa_historical_data LIMIT 10"))

## Example Queries for Analysis

Here are some example queries you can run in Databricks to analyze the NOAA weather data.

In [None]:
# In Databricks, you can run SQL queries like these:

# 1. Find stations with the highest elevation
# display(spark.sql("""
#     SELECT id, name, latitude, longitude, elevation_meters
#     FROM noaa_stations
#     WHERE elevation_meters IS NOT NULL
#     ORDER BY elevation_meters DESC
#     LIMIT 20
# """))

# 2. Get average temperatures by station
# display(spark.sql("""
#     SELECT 
#         s.id, 
#         s.name,
#         s.latitude,
#         s.longitude,
#         AVG(CASE WHEN h.datatype = 'TMAX' THEN h.value / 10 END) as avg_max_temp_celsius,
#         AVG(CASE WHEN h.datatype = 'TMIN' THEN h.value / 10 END) as avg_min_temp_celsius
#     FROM noaa_stations s
#     JOIN noaa_historical_data h ON s.id = h.station_id
#     WHERE h.datatype IN ('TMAX', 'TMIN')
#     GROUP BY s.id, s.name, s.latitude, s.longitude
#     HAVING avg_max_temp_celsius IS NOT NULL
#     ORDER BY avg_max_temp_celsius DESC
#     LIMIT 20
# """))

# 3. Analyze precipitation patterns
# display(spark.sql("""
#     SELECT 
#         h.date,
#         COUNT(DISTINCT h.station_id) as station_count,
#         AVG(CASE WHEN h.datatype = 'PRCP' THEN h.value / 10 END) as avg_precipitation_mm
#     FROM noaa_historical_data h
#     WHERE h.datatype = 'PRCP'
#     GROUP BY h.date
#     ORDER BY h.date
# """))

## Saving Data For Further Use

If you need to save the data for use outside of Databricks, you can export it to various formats.

In [None]:
# Save to CSV files locally
# Not needed in Databricks but useful for local analysis
stations_df.to_csv("noaa_stations.csv", index=False)
print(f"Saved {len(stations_df)} stations to noaa_stations.csv")

if GET_HISTORICAL_DATA and 'historical_df' in locals() and len(historical_df) > 0:
    historical_df.to_csv("noaa_historical_data.csv", index=False)
    print(f"Saved historical data to noaa_historical_data.csv")
    
# In Databricks, you might want to save to Delta tables instead:
# stations_db_df.write.format("delta").mode("overwrite").saveAsTable("noaa_stations")
# if 'historical_db_df' in locals() and historical_db_df is not None:
#     historical_db_df.write.format("delta").mode("overwrite").saveAsTable("noaa_historical_data")