Weather Data Downloader

This script downloads sample weather data from NOAA's Global Historical Climatology Network (GHCN),
processes it, and loads it into a PostgreSQL database for analysis.

In [2]:
!pip install pandas numpy requests sqlalchemy matplotlib seaborn 

Collecting requests
  Downloading requests-2.32.3-py3-none-any.whl.metadata (4.6 kB)
Collecting matplotlib
  Downloading matplotlib-3.9.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting seaborn
  Downloading seaborn-0.13.2-py3-none-any.whl.metadata (5.4 kB)
Collecting charset-normalizer<4,>=2 (from requests)
  Downloading charset_normalizer-3.4.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (35 kB)
Collecting idna<4,>=2.5 (from requests)
  Downloading idna-3.10-py3-none-any.whl.metadata (10 kB)
Collecting urllib3<3,>=1.21.1 (from requests)
  Downloading urllib3-2.4.0-py3-none-any.whl.metadata (6.5 kB)
Collecting certifi>=2017.4.17 (from requests)
  Downloading certifi-2025.4.26-py3-none-any.whl.metadata (2.5 kB)
Collecting contourpy>=1.0.1 (from matplotlib)
  Downloading contourpy-1.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.4 kB)
Collecting cycler>=0.10 (from matplotlib)
  Downloading cycler-0.12.1

In [3]:
import os
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import gzip
import shutil
from sqlalchemy import create_engine, text
import matplotlib.pyplot as plt
import seaborn as sns
import io
import time
import logging

In [4]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('weather_data_downloader')

In [5]:
# Database connection parameters
DB_USER = 'postgres'
DB_PASSWORD = 'postgres'
DB_HOST = 'localhost'  # Use 'postgres' if running within Docker network
DB_PORT = '5432'
DB_NAME = 'weather_data'

# Create connection string
connection_string = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

# Data storage directory
DATA_DIR = os.path.join(os.getcwd(), 'data')
os.makedirs(DATA_DIR, exist_ok=True)

In [6]:
def download_file(url, local_path):
    """
    Download a file from a URL to a local path
    """
    try:
        logger.info(f"Downloading {url} to {local_path}")
        start_time = time.time()
        
        # Stream the download to handle large files efficiently
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            with open(local_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
        
        elapsed_time = time.time() - start_time
        logger.info(f"Download completed in {elapsed_time:.2f} seconds")
        return True
    except Exception as e:
        logger.error(f"Error downloading file: {e}")
        return False

def extract_gz_file(gz_path, output_path):
    """
    Extract a gzipped file
    """
    try:
        logger.info(f"Extracting {gz_path} to {output_path}")
        with gzip.open(gz_path, 'rb') as f_in:
            with open(output_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        logger.info("Extraction completed")
        return True
    except Exception as e:
        logger.error(f"Error extracting file: {e}")
        return False

def download_ghcn_stations():
    """
    Download and process the GHCN stations inventory file
    """
    # URL for the stations inventory file
    stations_url = "https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt"
    
    # Local path for the downloaded file
    stations_file = os.path.join(DATA_DIR, "ghcnd-stations.txt")
    
    # Download the file
    if not download_file(stations_url, stations_file):
        return None
    
    try:
        # The stations file has a fixed width format, so we need to specify column widths
        # Format details: https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt
        colspecs = [
            (0, 11),    # ID
            (12, 20),   # LATITUDE
            (21, 30),   # LONGITUDE
            (31, 37),   # ELEVATION
            (38, 68),   # STATION NAME
            (69, 75),   # STATE (optional)
            (76, 79)    # COUNTRY
        ]
        
        # Column names
        columns = ['station_id', 'latitude', 'longitude', 'elevation', 'name', 'state', 'country']
        
        # Read the fixed width file
        logger.info("Processing stations file")
        df = pd.read_fwf(stations_file, colspecs=colspecs, names=columns)
        
        # Clean up the data
        df['latitude'] = pd.to_numeric(df['latitude'], errors='coerce')
        df['longitude'] = pd.to_numeric(df['longitude'], errors='coerce')
        df['elevation'] = pd.to_numeric(df['elevation'], errors='coerce')
        
        # Fill missing values for state
        df['state'] = df['state'].fillna('')
        
        # Trim whitespace from string columns
        for col in ['station_id', 'name', 'state', 'country']:
            df[col] = df[col].str.strip()
        
        logger.info(f"Processed {len(df)} stations")
        return df
    except Exception as e:
        logger.error(f"Error processing stations file: {e}")
        return None

def download_ghcn_data(year, month=None, sample_size=100):
    """
    Download GHCN daily data for a specific year and optionally filter by month
    
    Args:
        year (int): Year to download data for
        month (int, optional): Month to filter data by (1-12)
        sample_size (int, optional): Number of random stations to sample
    
    Returns:
        pandas.DataFrame: Processed weather data
    """
    # URL for the yearly data file
    data_url = f"https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/{year}.csv.gz"
    
    # Local paths for the downloaded and extracted files
    gz_file = os.path.join(DATA_DIR, f"{year}.csv.gz")
    csv_file = os.path.join(DATA_DIR, f"{year}.csv")
    
    # Download the file
    if not download_file(data_url, gz_file):
        return None
    
    # Extract the file
    if not extract_gz_file(gz_file, csv_file):
        return None
    
    try:
        # The data file contains millions of records, so we'll read in chunks
        logger.info(f"Processing {year} data file")
        
        # Column names based on GHCN documentation
        columns = ['station_id', 'date', 'element', 'value', 'mflag', 'qflag', 'sflag', 'obstime']
        
        # Read a sample of the data
        # If month is specified, filter for that month
        if month:
            month_str = f"{month:02d}"
            
            # Read the entire file first (this is inefficient but simpler for demonstration)
            df = pd.read_csv(csv_file, names=columns, header=None)
            
            # Extract year and month from date column
            df['year'] = df['date'].astype(str).str[:4]
            df['month'] = df['date'].astype(str).str[4:6]
            
            # Filter for the specified month
            df = df[df['month'] == month_str]
            
            # Sample a subset of stations
            if sample_size:
                unique_stations = df['station_id'].unique()
                if len(unique_stations) > sample_size:
                    sampled_stations = np.random.choice(unique_stations, sample_size, replace=False)
                    df = df[df['station_id'].isin(sampled_stations)]
            
            logger.info(f"Processed {len(df)} records for {year}-{month}")
        else:
            # If no month specified, just take a random sample of rows
            # Count lines in file
            with open(csv_file, 'r') as f:
                line_count = sum(1 for _ in f)
            
            # Take a random sample
            sample_indices = np.random.choice(line_count, min(100000, line_count), replace=False)
            sample_indices = sorted(sample_indices)
            
            # Read only the sampled rows
            df = pd.read_csv(csv_file, skiprows=lambda i: i not in sample_indices, names=columns, header=None)
            logger.info(f"Sampled {len(df)} records from {year}")
        
        # Convert date string to date object
        df['date'] = pd.to_datetime(df['date'], format='%Y%m%d')
        
        # Convert value to float and handle the scale factor
        # GHCN stores temperatures in tenths of degrees C and precipitation in tenths of mm
        df['value'] = pd.to_numeric(df['value'], errors='coerce') / 10.0
        
        # Clean up the data
        for col in ['mflag', 'qflag', 'sflag', 'obstime']:
            if col in df.columns:
                df[col] = df[col].fillna('')
        
        return df
    except Exception as e:
        logger.error(f"Error processing {year} data file: {e}")
        return None

def create_database_schema(engine):
    """
    Create the database schema for the weather data
    """
    try:
        logger.info("Creating database schema")
        
        # Create stations table
        create_stations_table = """
        CREATE TABLE IF NOT EXISTS weather_stations (
            station_id VARCHAR(20) PRIMARY KEY,
            latitude FLOAT,
            longitude FLOAT,
            elevation FLOAT,
            name VARCHAR(100),
            state VARCHAR(50),
            country VARCHAR(50)
        );
        """
        
        # Create weather data table
        create_weather_table = """
        CREATE TABLE IF NOT EXISTS weather_readings (
            id SERIAL PRIMARY KEY,
            station_id VARCHAR(20) REFERENCES weather_stations(station_id),
            date DATE,
            element VARCHAR(10),
            value FLOAT,
            mflag VARCHAR(1),
            qflag VARCHAR(1),
            sflag VARCHAR(1),
            obstime VARCHAR(4)
        );
        """
        
        # Create index on station_id, date, and element
        create_index = """
        CREATE INDEX IF NOT EXISTS idx_station_date_element 
        ON weather_readings (station_id, date, element);
        """
        
        # Execute the SQL statements
        with engine.connect() as conn:
            conn.execute(text(create_stations_table))
            conn.execute(text(create_weather_table))
            conn.execute(text(create_index))
            conn.commit()
        
        logger.info("Database schema created")
        return True
    except Exception as e:
        logger.error(f"Error creating database schema: {e}")
        return False

def load_data_to_db(engine, stations_df, weather_df):
    """
    Load the processed data into the database
    """
    try:
        logger.info("Loading data to database")
        
        # Load stations data
        if stations_df is not None:
            stations_df.to_sql('weather_stations', engine, if_exists='replace', index=False)
            logger.info(f"Loaded {len(stations_df)} stations to database")
        
        # Load weather data
        if weather_df is not None:
            # To avoid primary key conflicts, we'll use append mode and let the database assign IDs
            weather_df.to_sql('weather_readings', engine, if_exists='append', index=False)
            logger.info(f"Loaded {len(weather_df)} weather readings to database")
        
        return True
    except Exception as e:
        logger.error(f"Error loading data to database: {e}")
        return False

def explore_data(engine):
    """
    Run basic exploratory queries on the data
    """
    try:
        logger.info("Running exploratory queries")
        
        # Query for station count
        station_query = "SELECT COUNT(*) FROM weather_stations;"
        with engine.connect() as conn:
            result = conn.execute(text(station_query))
            station_count = result.scalar()
            logger.info(f"Total stations in database: {station_count}")
        
        # Query for weather readings count
        readings_query = "SELECT COUNT(*) FROM weather_readings;"
        with engine.connect() as conn:
            result = conn.execute(text(readings_query))
            readings_count = result.scalar()
            logger.info(f"Total weather readings in database: {readings_count}")
        
        # Query for elements distribution
        elements_query = """
        SELECT element, COUNT(*) as count
        FROM weather_readings
        GROUP BY element
        ORDER BY count DESC;
        """
        with engine.connect() as conn:
            result = conn.execute(text(elements_query))
            logger.info("Elements distribution:")
            for row in result:
                logger.info(f"  {row[0]}: {row[1]}")
        
        # Query for date range
        date_query = """
        SELECT MIN(date), MAX(date)
        FROM weather_readings;
        """
        with engine.connect() as conn:
            result = conn.execute(text(date_query))
            min_date, max_date = result.fetchone()
            logger.info(f"Date range: {min_date} to {max_date}")
        
        return True
    except Exception as e:
        logger.error(f"Error exploring data: {e}")
        return False

def generate_sample_visualizations(engine):
    """
    Generate some sample visualizations from the data
    """
    try:
        logger.info("Generating sample visualizations")
        
        # Query for temperature data
        temp_query = """
        SELECT 
            r.date, 
            s.name as station_name,
            s.latitude,
            s.longitude,
            r.element, 
            r.value
        FROM 
            weather_readings r
        JOIN 
            weather_stations s ON r.station_id = s.station_id
        WHERE 
            r.element IN ('TMAX', 'TMIN')
            AND r.qflag = ''
        ORDER BY 
            r.date, s.name, r.element;
        """
        
        # Create a dataframe from the query
        temp_df = pd.read_sql(temp_query, engine)
        
        if len(temp_df) == 0:
            logger.warning("No temperature data found for visualization")
            return False
        
        # Create output directory for visualizations
        vis_dir = os.path.join(os.getcwd(), 'visualizations')
        os.makedirs(vis_dir, exist_ok=True)
        
        # 1. Temperature time series for a few stations
        plt.figure(figsize=(12, 8))
        
        # Get the top 5 stations with the most data
        top_stations = temp_df['station_name'].value_counts().head(5).index
        
        # Plot for each station
        for station in top_stations:
            station_data = temp_df[temp_df['station_name'] == station]
            
            # Get TMAX and TMIN data
            tmax_data = station_data[station_data['element'] == 'TMAX']
            tmin_data = station_data[station_data['element'] == 'TMIN']
            
            if len(tmax_data) > 0:
                plt.plot(tmax_data['date'], tmax_data['value'], 'o-', label=f"{station} (Max)")
            
            if len(tmin_data) > 0:
                plt.plot(tmin_data['date'], tmin_data['value'], 'o--', label=f"{station} (Min)")
        
        plt.title('Temperature Trends by Station', fontsize=16)
        plt.xlabel('Date', fontsize=12)
        plt.ylabel('Temperature (°C)', fontsize=12)
        plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.tight_layout()
        plt.savefig(os.path.join(vis_dir, 'temperature_trends.png'))
        
        # 2. Distribution of temperature values
        plt.figure(figsize=(12, 8))
        
        # Plot histograms for TMAX and TMIN
        tmax_data = temp_df[temp_df['element'] == 'TMAX']
        tmin_data = temp_df[temp_df['element'] == 'TMIN']
        
        if len(tmax_data) > 0:
            sns.histplot(tmax_data['value'], kde=True, label='TMAX')
        
        if len(tmin_data) > 0:
            sns.histplot(tmin_data['value'], kde=True, label='TMIN')
        
        plt.title('Distribution of Temperature Values', fontsize=16)
        plt.xlabel('Temperature (°C)', fontsize=12)
        plt.ylabel('Frequency', fontsize=12)
        plt.legend()
        plt.savefig(os.path.join(vis_dir, 'temperature_distribution.png'))
        
        # 3. Temperature by latitude (if we have geographic data)
        if 'latitude' in temp_df.columns and not temp_df['latitude'].isna().all():
            plt.figure(figsize=(12, 8))
            
            # Average temperature by latitude
            temp_by_lat = temp_df.groupby(['latitude', 'element'])['value'].mean().reset_index()
            
            # Plot for TMAX and TMIN
            tmax_by_lat = temp_by_lat[temp_by_lat['element'] == 'TMAX']
            tmin_by_lat = temp_by_lat[temp_by_lat['element'] == 'TMIN']
            
            if len(tmax_by_lat) > 0:
                plt.scatter(tmax_by_lat['latitude'], tmax_by_lat['value'], label='TMAX', alpha=0.7)
            
            if len(tmin_by_lat) > 0:
                plt.scatter(tmin_by_lat['latitude'], tmin_by_lat['value'], label='TMIN', alpha=0.7)
            
            plt.title('Temperature by Latitude', fontsize=16)
            plt.xlabel('Latitude', fontsize=12)
            plt.ylabel('Average Temperature (°C)', fontsize=12)
            plt.legend()
            plt.savefig(os.path.join(vis_dir, 'temperature_by_latitude.png'))
        
        logger.info(f"Visualizations saved to {vis_dir}")
        return True
    except Exception as e:
        logger.error(f"Error generating visualizations: {e}")
        return False

In [7]:
def main():
    """
    Main function to download and process weather data
    """
    try:
        logger.info("Starting weather data download and processing")
        
        # Create SQLAlchemy engine
        engine = create_engine(connection_string)
        
        # Create database schema
        create_database_schema(engine)
        
        # Download and process stations data
        stations_df = download_ghcn_stations()
        
        # Download weather data for a recent year and month
        # You can change the year and month as needed
        current_year = datetime.now().year
        current_month = datetime.now().month
        
        # Use previous month to ensure data is available
        if current_month == 1:
            year = current_year - 1
            month = 12
        else:
            year = current_year
            month = current_month - 1
        
        weather_df = download_ghcn_data(year, month, sample_size=50)
        
        # Load data to database
        load_data_to_db(engine, stations_df, weather_df)
        
        # Run exploratory queries
        explore_data(engine)
        
        # Generate visualizations
        generate_sample_visualizations(engine)
        
        logger.info("Weather data processing completed successfully")
    except Exception as e:
        logger.error(f"Error in main function: {e}")

In [8]:
if __name__ == "__main__":
    main()

2025-04-27 16:18:44,332 - weather_data_downloader - INFO - Starting weather data download and processing
2025-04-27 16:18:44,371 - weather_data_downloader - INFO - Creating database schema
2025-04-27 16:18:44,387 - weather_data_downloader - INFO - Database schema created
2025-04-27 16:18:44,388 - weather_data_downloader - INFO - Downloading https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt to /home/calvin/projects/zoomcamp/de-zoomcamp-2025/weather-analysis/jupyter/notebooks/data/ghcnd-stations.txt
2025-04-27 16:19:14,785 - weather_data_downloader - ERROR - Error downloading file: 503 Server Error: Service Unavailable for url: https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt
2025-04-27 16:19:14,786 - weather_data_downloader - INFO - Downloading https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2025.csv.gz to /home/calvin/projects/zoomcamp/de-zoomcamp-2025/weather-analysis/jupyter/notebooks/data/2025.csv.gz
2025-04-27 16:19:45,175 - weather_data_down