In [None]:
import dask.dataframe as dd
import os
import pandas as pd
from dask import delayed

data_dir = "C:/Users/hopeh/Desktop/DS_Bootcamp/Flight_times_project/weather_data/ghcnd_all/ghcnd_all/usa_weather"
start_date = pd.to_datetime('2022-05-01')
end_date = pd.to_datetime('2024-04-30')

# Define column names and widths for fixed-width files
colnames = ["ID", "YEAR", "MONTH"] + [f"VALUE{i+1}" for i in range(31)] + \
           [f"MFLAG{i+1}" for i in range(31)] + [f"QFLAG{i+1}" for i in range(31)] + \
           [f"SFLAG{i+1}" for i in range(31)]

col_widths = [11, 4, 2] + [8] * 31 * 4

# Define dtypes for each column
dtypes = {'ID': 'object', 'YEAR': 'int32', 'MONTH': 'int32'}
for i in range(1, 32):
    dtypes[f'VALUE{i}'] = 'object'
    dtypes[f'MFLAG{i}'] = 'object'
    dtypes[f'QFLAG{i}'] = 'object'
    dtypes[f'SFLAG{i}'] = 'object'

# Function to clean and convert VALUE columns
def clean_and_convert_values(df):
    for day in range(1, 32):
        value_col = f"VALUE{day}"
        df[value_col] = pd.to_numeric(df[value_col], errors='coerce')
    return df

# Function to filter U.S. stations
def filter_us_stations(df):
    return df[df['ID'].str.startswith("US")]

# Function to process each chunk of the data
def process_chunk(chunk):
    results = []
    for _, row in chunk.iterrows():
        station_id = row['ID']
        year = row['YEAR']
        month = row['MONTH']
        for day in range(1, 32):
            value_col = f"VALUE{day}"
            mflag_col = f"MFLAG{day}"
            value = row.get(value_col, None)
            mflag = row.get(mflag_col, None)
            if pd.notna(value) and pd.notna(mflag):
                if pd.to_datetime(f"{year}-{month:02d}-{day:02d}", format='%Y-%m-%d', errors='coerce') is not pd.NaT:
                    date_str = f"{year}-{month:02d}-{day:02d}"
                    date = pd.to_datetime(date_str, format='%Y-%m-%d')
                    if start_date <= date <= end_date:
                        result = {
                            'STATION': station_id,
                            'DATE': date.strftime('%Y-%m-%d'),
                            'TMAX': value if mflag == "TMAX" else None,
                            'TMIN': value if mflag == "TMIN" else None,
                            'TAVG': value if mflag == "TAVG" else None,
                            'PRCP': value if mflag == "PRCP" else None,
                            'SNOW': value if mflag == "SNOW" else None,
                            'AWND': value if mflag == "AWND" else None
                        }
                        results.append(result)
    return pd.DataFrame(results)

# Function to process each file
def process_file(file_path):
    print(f"Processing file: {file_path}")
    try:
        ddf = dd.read_fwf(file_path, widths=col_widths, header=None, names=colnames, skip_blank_lines=True, dtype=dtypes)
        ddf = filter_us_stations(ddf)
        ddf = ddf.map_partitions(clean_and_convert_values)
        result_df = ddf.map_partitions(process_chunk).compute()
        return result_df
    except Exception as e:
        print(f"Error: {e}")
        return pd.DataFrame()

# List all files
all_dly_files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.dly')]

# Use Dask's delayed to process files in parallel
all_data = [delayed(process_file)(file) for file in all_dly_files]

# Combine all data
all_data_combined = dd.from_delayed(all_data)

# Repartition if needed
num_partitions = 50  
all_data_combined = dd.from_delayed(all_data).repartition(npartitions=num_partitions)


# Save the combined data to CSV
all_data_combined.to_csv('ghcn_daily_filtered_us.csv', index=False, single_file=True)
print("Saved final data.")


Processing file: C:/Users/hopeh/Desktop/DS_Bootcamp/Flight_times_project/weather_data/ghcnd_all/ghcnd_all/usa_weather\US009052008.dly
Processing file: C:/Users/hopeh/Desktop/DS_Bootcamp/Flight_times_project/weather_data/ghcnd_all/ghcnd_all/usa_weather\US009052008.dly
Processing file: C:/Users/hopeh/Desktop/DS_Bootcamp/Flight_times_project/weather_data/ghcnd_all/ghcnd_all/usa_weather\US009052008.dly
Processing file: C:/Users/hopeh/Desktop/DS_Bootcamp/Flight_times_project/weather_data/ghcnd_all/ghcnd_all/usa_weather\US10boyd005.dly
Processing file: C:/Users/hopeh/Desktop/DS_Bootcamp/Flight_times_project/weather_data/ghcnd_all/ghcnd_all/usa_weather\US10keit006.dly
Processing file: C:/Users/hopeh/Desktop/DS_Bootcamp/Flight_times_project/weather_data/ghcnd_all/ghcnd_all/usa_weather\US10york034.dly
Processing file: C:/Users/hopeh/Desktop/DS_Bootcamp/Flight_times_project/weather_data/ghcnd_all/ghcnd_all/usa_weather\US10boon011.dly
Processing file: C:/Users/hopeh/Desktop/DS_Bootcamp/Flight_tim