In [1]:
import dask.dataframe as dd
import os
file_path1 = '../data/training_ready_climatology_data.csv'
file_path2 = '../data/training_ready_snotel_data.csv'
file_path3 = '../data/training_ready_terrain_data.csv'
# Read each CSV file into a Dask DataFrame
df1 = dd.read_csv(file_path1)
df2 = dd.read_csv(file_path2)
df3 = dd.read_csv(file_path3)
# Perform data type conversion for latitude and longitude columns
df1['lat'] = df1['lat'].astype(float)
df1['lon'] = df1['lon'].astype(float)
df2['lat'] = df2['lat'].astype(float)
df2['lon'] = df2['lon'].astype(float)
df3['lat'] = df3['lat'].astype(float)
df3['lon'] = df3['lon'].astype(float)
#rename the columns to match the other dataframes
df2 = df2.rename(columns={"Date": "date"})





In [2]:
# Merge the first two DataFrames based on 'lat', 'lon', and 'date'
merged_df1 = dd.merge(df1, df2, left_on=['lat', 'lon', 'date'], right_on=['lat', 'lon', 'date'])

# Merge the third DataFrame based on 'lat' and 'lon'
merged_df2 = dd.merge(merged_df1, df3, on=['lat', 'lon'])

In [3]:
merged_df2.to_csv('../data/model_training_data.csv', index=False, single_file=True)

['/Users/gokulprathin/swe-workflow-book/book/data/model_training_data.csv']

In [4]:
input_csv = '../data/model_training_data.csv'

# List of columns you want to extract
selected_columns = ['date', 'lat', 'lon', 'etr', 'pr', 'rmax',
                    'rmin', 'tmmn', 'tmmx', 'vpd', 'vs', 
                    'elevation',
                    'slope', 'curvature', 'aspect', 'eastness',
                    'northness', 'Snow Water Equivalent (in) Start of Day Values']
# Read the CSV file into a Dask DataFrame
df = dd.read_csv(input_csv, usecols=selected_columns)

df = df.rename(columns={"Snow Water Equivalent (in) Start of Day Values": "swe_value"})

# Replace 'output.csv' with the desired output file name
output_csv = '../data/model_training_cleaned.csv'

# Write the selected columns to a new CSV file
df.to_csv(output_csv, index=False, single_file=True)

['/Users/gokulprathin/swe-workflow-book/book/data/model_training_cleaned.csv']

In [5]:
import dask.dataframe as dd
import os
import pandas as pd

homedir = os.path.expanduser('~')
working_dir = f"../data"
work_dir = working_dir
final_output_name = "final_merged_data_3yrs_all_active_stations_v1.csv"
chunk_size = '10MB'  # You can adjust this chunk size based on your hardware and data size

In [6]:
amsr_file = f'{working_dir}/all_snotel_cdec_stations_active_in_westus.csv_amsr_dask.csv'
snotel_file = f'{working_dir}/all_snotel_cdec_stations_active_in_westus.csv_swe_restored_dask_all_vars.csv'
gridmet_file = f'{working_dir}/training_all_active_snotel_station_list_elevation.csv_gridmet.csv'
terrain_file = f'{working_dir}/training_all_active_snotel_station_list_elevation.csv_terrain_4km_grid_shift.csv'
fsca_file = f'{working_dir}/fsca_final_training_all.csv'
final_final_output_file = f'{work_dir}/{final_output_name}'

if os.path.exists(final_final_output_file):
    print(f"The file '{final_final_output_file}' exists. Skipping")

The file '../data/final_merged_data_3yrs_all_active_stations_v1.csv' exists. Skipping


In [7]:
# Read the CSV files with a smaller chunk size and compression
amsr = dd.read_csv(amsr_file, blocksize=chunk_size)
print("amsr.columns = ", amsr.columns)

amsr.columns =  Index(['date', 'lat', 'lon', 'AMSR_SWE'], dtype='object')


In [8]:
snotel = dd.read_csv(snotel_file, blocksize=chunk_size)
print("snotel.columns = ", snotel.columns)

snotel.columns =  Index(['station_name', 'date', 'lat', 'lon', 'swe_value', 'change_in_swe_inch',
       'snow_depth', 'air_temperature_observed_f'],
      dtype='object')


In [9]:
gridmet = dd.read_csv(gridmet_file, blocksize=chunk_size)
# Drop the 'Unnamed: 0' column
gridmet = gridmet.drop(columns=["Unnamed: 0"])
print("gridmet.columns = ", gridmet.columns)

gridmet.columns =  Index(['day', 'lat', 'lon', 'air_temperature_tmmn',
       'potential_evapotranspiration', 'mean_vapor_pressure_deficit',
       'relative_humidity_rmax', 'relative_humidity_rmin',
       'precipitation_amount', 'air_temperature_tmmx', 'wind_speed'],
      dtype='object')


In [10]:
terrain = dd.read_csv(terrain_file, blocksize=chunk_size)
# rename columns to match the other dataframes
terrain = terrain.rename(columns={
    "latitude": "lat", 
    "longitude": "lon"
})
# select only the columns we need for the final output
terrain = terrain[["stationTriplet", "elevation", "lat", "lon", 'Elevation', 'Slope', 'Aspect', 'Curvature', 'Northness', 'Eastness']]
print("terrain.columns = ", terrain.columns)

terrain.columns =  Index(['stationTriplet', 'elevation', 'lat', 'lon', 'Elevation', 'Slope',
       'Aspect', 'Curvature', 'Northness', 'Eastness'],
      dtype='object')


In [11]:
snowcover = dd.read_csv(fsca_file, blocksize=chunk_size)
# rename columns to match the other dataframes
snowcover = snowcover.rename(columns={
    "latitude": "lat", 
    "longitude": "lon"
})
print("snowcover.columns = ", snowcover.columns)


snowcover.columns =  Index(['date', 'lat', 'lon', 'fSCA'], dtype='object')


In [12]:
# Repartition DataFrames for optimized processing
amsr = amsr.repartition(partition_size=chunk_size)
snotel = snotel.repartition(partition_size=chunk_size)
gridmet = gridmet.repartition(partition_size=chunk_size)
gridmet = gridmet.rename(columns={'day': 'date'})
terrain = terrain.repartition(partition_size=chunk_size)
snow_cover = snowcover.repartition(partition_size=chunk_size)
print("all the dataframes are partitioned")

all the dataframes are partitioned


In [13]:
# Merge DataFrames based on specified columns
print("start to merge amsr and snotel")
merged_df = dd.merge(amsr, snotel, on=['lat', 'lon', 'date'], how='outer')
merged_df = merged_df.drop_duplicates(keep='first')
output_file = os.path.join(working_dir, f"{final_output_name}_snotel.csv")
merged_df.to_csv(output_file, single_file=True, index=False)
print(f"intermediate file saved to {output_file}")

print("start to merge gridmet")
merged_df = dd.merge(merged_df, gridmet, on=['lat', 'lon', 'date'], how='outer')
merged_df = merged_df.drop_duplicates(keep='first')
output_file = os.path.join(working_dir, f"{final_output_name}_gridmet.csv")
merged_df.to_csv(output_file, single_file=True, index=False)
print(f"intermediate file saved to {output_file}")

print("start to merge terrain")
merged_df = dd.merge(merged_df, terrain, on=['lat', 'lon'], how='outer')
merged_df = merged_df.drop_duplicates(keep='first')
output_file = os.path.join(working_dir, f"{final_output_name}_terrain.csv")
merged_df.to_csv(output_file, single_file=True, index=False)
print(f"intermediate file saved to {output_file}")

print("start to merge snowcover")
merged_df = dd.merge(merged_df, snow_cover, on=['lat', 'lon', 'date'], how='outer')
merged_df = merged_df.drop_duplicates(keep='first')
output_file = os.path.join(working_dir, f"{final_output_name}_snow_cover.csv")
merged_df.to_csv(output_file, single_file=True, index=False)
print(f"intermediate file saved to {output_file}")

# Save the merged DataFrame to a CSV file in chunks
output_file = os.path.join(working_dir, final_output_name)
merged_df.to_csv(output_file, single_file=True, index=False)
print(f'Merge completed. {output_file}')

start to merge amsr and snotel


intermediate file saved to ../data/final_merged_data_3yrs_all_active_stations_v1.csv_snotel.csv
start to merge gridmet


intermediate file saved to ../data/final_merged_data_3yrs_all_active_stations_v1.csv_gridmet.csv
start to merge terrain


intermediate file saved to ../data/final_merged_data_3yrs_all_active_stations_v1.csv_terrain.csv
start to merge snowcover


intermediate file saved to ../data/final_merged_data_3yrs_all_active_stations_v1.csv_snow_cover.csv


Merge completed. ../data/final_merged_data_3yrs_all_active_stations_v1.csv


In [14]:

# Read the merged DataFrame, remove duplicate rows, and save the cleaned DataFrame to a new CSV file
df = dd.read_csv(f'{work_dir}/{final_output_name}', dtype={'stationTriplet': 'object',
       'station_name': 'object'})
df = df.drop_duplicates(keep='first')
df.to_csv(f'{work_dir}/{final_output_name}', single_file=True, index=False)
print('Data cleaning completed.')

Data cleaning completed.


In [15]:
def sort_training_data(input_training_csv, sorted_training_csv):
    # Read Dask DataFrame from CSV with increased blocksize and assuming missing data
    ddf = dd.read_csv(input_training_csv, assume_missing=True, blocksize='10MB', dtype={'stationTriplet': 'object',
       'station_name': 'object'})

    # Persist the Dask DataFrame in memory
    ddf = ddf.persist()

    # Sort Dask DataFrame by three columns: date, lat, and Lon
    sorted_ddf = ddf.sort_values(by=['date', 'lat', 'lon'])

    # Save the sorted Dask DataFrame to a new CSV file
    sorted_ddf.to_csv(sorted_training_csv, index=False, single_file=True)
    print(f"sorted training data is saved to {sorted_training_csv}")

final_final_output_file = f'{work_dir}/{final_output_name}'
sort_training_data(final_final_output_file, f'{work_dir}/{final_output_name}_sorted.csv')

sorted training data is saved to ../data/final_merged_data_3yrs_all_active_stations_v1.csv_sorted.csv
