In [None]:
import dask
import dask.dataframe as dd
import dask.bag as db
import pandas as pd
import os
import time
import glob
import multiprocessing as mp
from dask.distributed import Client, LocalCluster
from dask_cloudprovider.gcp import GCPCluster
from google.auth import default

In [None]:
# ✅ Step 1: Start a Local Cluster
local_cluster = LocalCluster(
    n_workers=int(0.9 * mp.cpu_count()),  # Use ~90% of available CPUs
    processes=True,
    threads_per_worker=1,
    memory_limit="2GB",
)
client = Client(local_cluster)
print(f"Dashboard available at: {client.dashboard_link}")

In [None]:
# ✅ Step 2: Define Data Processing Code

# Get a list of all station files
data_path = "D:/daily-summaries-latest-test"  
station_files = glob.glob(os.path.join(data_path, "*"))

print(f"Found {len(station_files)} station files")

In [None]:
# Function to process a single station file
def process_station(filename):
    # Extract station ID from filename
    station_id = os.path.basename(filename)
    
    # Read the CSV file with the correct column structure
    try:
        df = pd.read_csv(filename, 
                        parse_dates=['DATE'],
                        na_values=['', 'NA', 'NULL'])
        
        # Make sure required columns exist
        required_cols = ['DATE', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'NAME', 
                         'PRCP', 'SNOW', 'SNWD', 'TMAX', 'TMIN']
        
        # Check if all required columns exist, if not return limited data
        missing_cols = [col for col in required_cols if col not in df.columns]
        if missing_cols:
            print(f"Warning: File {station_id} is missing columns: {missing_cols}")
        
        # Convert numeric columns to float, handling any non-numeric values
        numeric_cols = ['LATITUDE', 'LONGITUDE', 'ELEVATION', 'PRCP', 'SNOW', 'SNWD', 'TMAX', 'TMIN']
        for col in numeric_cols:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors='coerce')
        
        # Skip attribute columns for calculations
        # Perform some analysis on this station's data
        monthly_data = None
        if 'DATE' in df.columns:
            # Calculate monthly averages for temperature and precipitation
            agg_dict = {}
            if 'TMAX' in df.columns:
                agg_dict['TMAX'] = 'mean'
            if 'TMIN' in df.columns:
                agg_dict['TMIN'] = 'mean'
            if 'PRCP' in df.columns:
                agg_dict['PRCP'] = 'sum'
            if 'SNOW' in df.columns:
                agg_dict['SNOW'] = 'sum'
            if 'SNWD' in df.columns:
                agg_dict['SNWD'] = 'mean'  # Average snow depth
            
            if agg_dict:  # Only perform groupby if we have columns to aggregate
                monthly_data = df.groupby(pd.Grouper(key='DATE', freq='M')).agg(agg_dict).reset_index()
        
        # Calculate station metadata
        metadata = {
            'station_id': station_id,
            'name': df['NAME'].iloc[0] if 'NAME' in df.columns else 'Unknown',
            'latitude': df['LATITUDE'].iloc[0] if 'LATITUDE' in df.columns else None,
            'longitude': df['LONGITUDE'].iloc[0] if 'LONGITUDE' in df.columns else None,
            'elevation': df['ELEVATION'].iloc[0] if 'ELEVATION' in df.columns else None,
            'record_length': len(df),
        }
        
        # Date range calculation
        if 'DATE' in df.columns:
            metadata['date_range'] = (df['DATE'].min(), df['DATE'].max())
        
        # Climate statistics
        if 'TMAX' in df.columns and df['TMAX'].notna().any():
            metadata['avg_tmax'] = df['TMAX'].mean()
            metadata['max_tmax'] = df['TMAX'].max()
        
        if 'TMIN' in df.columns and df['TMIN'].notna().any():
            metadata['avg_tmin'] = df['TMIN'].mean()
            metadata['min_tmin'] = df['TMIN'].min()
        
        if 'PRCP' in df.columns and df['PRCP'].notna().any():
            metadata['total_precip'] = df['PRCP'].sum()
            metadata['max_precip'] = df['PRCP'].max()
            metadata['precip_days'] = (df['PRCP'] > 0).sum()
        
        if 'SNOW' in df.columns and df['SNOW'].notna().any():
            metadata['total_snow'] = df['SNOW'].sum()
            metadata['max_snow'] = df['SNOW'].max()
            metadata['snow_days'] = (df['SNOW'] > 0).sum()
        
        # Add monthly data if available
        if monthly_data is not None:
            metadata['monthly_data'] = monthly_data
            
        return metadata
    
    except Exception as e:
        print(f"Error processing {station_id}: {str(e)}")
        return {
            'station_id': station_id,
            'error': str(e)
        }

In [None]:
# ✅ Step 3: Process Data Using Local Workers

# Create a bag from our filenames
files_bag = db.from_sequence(station_files)

# Start timing
print("Starting computation...")
start_time = time.time()

# Compute results
results = files_bag.map(process_station).compute()

# End timing
end_time = time.time()
elapsed_time = end_time - start_time

print("Computation complete!")
print(f"Processed {len(results)} stations in {elapsed_time:.2f} seconds")

In [None]:
# ✅ Step 4: Close Clusters
client.close()
local_cluster.close()