# Create Weather DataFrame 2020 with Dask

Creates a wide-format dataframe with weather data from 2020 including TOBS, TMAX, TMIN, PRCP, SNOW, SNWD measurements.
Each row represents a station-year-measurement combination with 365 daily value columns.


In [None]:
import s3fs
import dask.dataframe as dd
import pandas as pd
import numpy as np
from datetime import datetime
from weather_transformation import transform_long_to_wide, display_dataframe_info, save_wide_dataframe

# Import cluster management utilities
from cluster_utils import setup_dask_cluster

# Setup Dask cluster with 20 workers
cluster, client, cleanup_summary = setup_dask_cluster(
    n_workers=20,
    memory_per_worker='4GB',
    dashboard_port=8790,
    worker_port=8791
)

# Configure dask to use the cluster for all operations
import dask
dask.config.set({'scheduler': 'distributed'})

# Note: This will process 76 years × 6 measurements = 456 data collections
# Estimated processing time: 10-30 minutes depending on data availability
print(f"Starting weather data processing at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


Checking for existing Dask clusters...
No current client found
All existing clusters closed.
Setting up new cluster with 20 workers...


2025-10-26 17:13:11,578 - distributed.scheduler - INFO - State start
2025-10-26 17:13:11,580 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:44521
2025-10-26 17:13:11,580 - distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8790
2025-10-26 17:13:11,609 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:42519'
2025-10-26 17:13:11,611 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:43069'
2025-10-26 17:13:11,612 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:39829'
2025-10-26 17:13:11,614 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:37603'
2025-10-26 17:13:11,615 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:34973'
2025-10-26 17:13:11,617 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:33801'
2025-10-26 17:13:11,619 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:39843'
2025-10-26 17:13:11,621 - 

✓ Dask cluster created with 20 workers
✓ Dashboard available at: http://127.0.0.1:8790/status
✓ Total workers: 20
✓ Cores per worker: {'tcp://127.0.0.1:34645': 1, 'tcp://127.0.0.1:35267': 1, 'tcp://127.0.0.1:36027': 1, 'tcp://127.0.0.1:36379': 1, 'tcp://127.0.0.1:36821': 1, 'tcp://127.0.0.1:37135': 1, 'tcp://127.0.0.1:38967': 1, 'tcp://127.0.0.1:39623': 1, 'tcp://127.0.0.1:39691': 1, 'tcp://127.0.0.1:40257': 1, 'tcp://127.0.0.1:40307': 1, 'tcp://127.0.0.1:40627': 1, 'tcp://127.0.0.1:40665': 1, 'tcp://127.0.0.1:41957': 1, 'tcp://127.0.0.1:43691': 1, 'tcp://127.0.0.1:44079': 1, 'tcp://127.0.0.1:45499': 1, 'tcp://127.0.0.1:46243': 1, 'tcp://127.0.0.1:46625': 1, 'tcp://127.0.0.1:46713': 1}
✓ Total memory available: 80.0 GB
✓ Cluster test computation successful: 4964.67
Starting weather data processing at 2025-10-26 17:13:12


2025-10-26 22:51:54,773 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:59020 closed before handshake completed
2025-10-26 22:51:54,780 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:59036 closed before handshake completed
2025-10-26 22:51:54,789 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:59052 closed before handshake completed
2025-10-26 22:51:54,795 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:59054 closed before handshake completed
2025-10-26 22:51:54,804 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:33370 closed before handshake completed
2025-10-26 22:51:54,816 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:33380 closed before handshake completed
2025-10-26 22:51:54,827 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:33396 closed before handshake completed
2025-10-26 22:51:54,838 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:33410 closed bef

In [2]:
# Load weather data for year 2020 using Dask
s3 = s3fs.S3FileSystem(anon=True)
bucket_path = 's3://noaa-ghcn-pds/parquet/by_year/'
measurements = ['TOBS', 'TMAX', 'TMIN', 'PRCP', 'SNOW', 'SNWD']

# Collect all data files (this may take a few minutes)
all_files = []
print("Collecting file paths...")
for year in range(2020,2021):
    for measurement in measurements:
        file_path = f"{bucket_path}YEAR={year}/ELEMENT={measurement}/"
        try:
            files = s3.glob(f"{file_path}*.parquet")
            all_files.extend([f"s3://{f}" for f in files])
        except:
            continue
    if year % 10 == 0:  # Progress indicator
        print(f"  Processed years up to {year}...")

print(f"Found {len(all_files)} parquet files to process")

# Load all data using Dask (lazy loading)
df_long = dd.read_parquet(all_files, storage_options={'anon': True})
print(f"Loaded {len(df_long):,} rows from {df_long['ID'].nunique().compute():,} stations")


Collecting file paths...
  Processed years up to 2020...
Found 62 parquet files to process
Loaded 29,323,503 rows from 42,653 stations


In [3]:
df_long.head()


Unnamed: 0,ID,DATE,DATA_VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME,YEAR,ELEMENT
0,ASN00007195,20200101,0,,,a,,2020,PRCP
1,ASN00007197,20200101,0,,,a,,2020,PRCP
2,ASN00007600,20200101,0,,,a,,2020,PRCP
3,ASN00008002,20200101,0,,,a,,2020,PRCP
4,ASN00007159,20200101,0,,,a,,2020,PRCP


In [4]:
# Transform to wide format using external function
# This replaces the manual transformation code with a reusable function
df_wide = transform_long_to_wide(df_long, aggfunc='mean', fill_missing_days=True)


2025-10-26 22:51:54,741 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:34174 closed before handshake completed
2025-10-26 22:51:54,748 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:34188 closed before handshake completed
2025-10-26 22:51:54,757 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:34202 closed before handshake completed
2025-10-26 22:51:54,764 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:34212 closed before handshake completed
2025-10-26 22:51:54,998 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:55548 closed before handshake completed
2025-10-26 22:51:55,011 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:55558 closed before handshake completed
2025-10-26 22:51:55,023 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:55574 closed before handshake completed
2025-10-26 22:51:55,036 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:55588 closed bef

In [None]:

# Display information about the transformed dataframe
display_dataframe_info(df_wide)


In [None]:

# Display first few rows of the processed dataframe
df_wide.head()


In [None]:
# Save to file using external function
output_file = 'weather_2020_wide.parquet'
save_wide_dataframe(df_wide, output_file)

print(f"\nProcessing completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


## Summary

Dataframe saved as `weather_2020_wide.parquet` with:
- **Index**: `ID.*ELEMENT is TOBS, TMAX, TMIN, PRCP, SNOW, or SNWD)
- **Columns**: `day_1` through `day_365` 
- **Values**: Weather observations (units vary by measurement type)
- **Time Range**: 2020 (1 year)
- **Processed using Dask DataFrames** for efficient handling of large datasets

### Measurement Types:
- **TOBS**: Temperature at observation time (tenths of degrees C)
- **TMAX**: Maximum temperature (tenths of degrees C)  
- **TMIN**: Minimum temperature (tenths of degrees C)
- **PRCP**: Precipitation (tenths of mm)
- **SNOW**: Snowfall (mm)
- **SNWD**: Snow depth (mm)


In [None]:
# Clean up cluster
from cluster_utils import close_dask_cluster
close_dask_cluster(cluster, client)
