In [15]:
# Environment used: earth_analytics_python (found here: /ihme/homes/nhashmeh/miniconda3/envs/earth-analytics-python/bin/python)

# Load libraries
import numpy as np
import pandas as pd
from shapely.geometry import Point
from joblib import load
import geopandas as gpd
import os
import xarray as xr
from datetime import datetime, timedelta
from joblib import load
import getpass
import subprocess
import dask

import sklearn
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import median_absolute_error
from sklearn.metrics import explained_variance_score
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import cross_val_score 
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RandomizedSearchCV

os.getenv('SLURM_ARRAY_TASK_ID')

# Step 1: Merge datasets
#### Here we launch a script that will combine ERA5 data, climate station data, and LCZ data. Things to know:
- ERA5 data: 9km per pixel, downloaded at the 3-hour frequency level from the copernicus database. We then averaged these to daily values. The daily values can be found here: /mnt/share/erf/ERA5/three_hourly_temp_9km/daily_temps/
#
- Climate stations: Daily summary values from climate stations across the globe. We make use of the temperature averages as well as the cliamte station elevations in our model. Source: https://www.ncei.noaa.gov/metadata/geoportal/rest/metadata/item/gov.noaa.ncdc:C00652/html
#
- LCZ: Local Climate Zones that basically represent a categorical spectrum of urban development (and lack thereof). This dataset is originally at 100m. Source: https://lcz-generator.rub.de/global-lcz-map
#
- Output: year-month files containing table of points with climate station daily average temperatures, climate station elevation, daily averages of ERA5 temperature, and LCZ value. There are lat/lon and time variables here as well. Since climate stations are points, these basically dictate the spatial coverage of our new output. This table will be used to train the ML model.
#
- Resources/time: This takes a long time (how long?) and at least 500 GB of resources (750 GB?).
#
- Worker script: merge_era5_stations_script_v3.py

In [None]:
era5_path = '/mnt/share/erf/ERA5/three_hourly_temp_9km/daily_temps/'

for file in os.listdir(era5_path):

    year = file.split('_')[0] # get year

    asdf = ['sbatch', '-J', '{}'.format(year), '-e', '/share/temp/sgeoutput/{}/errors/%x.e%j'.format(getpass.getuser()),
        '-o', '/share/temp/sgeoutput/{}/output/%x.o%j'.format(getpass.getuser()), '-A', 'proj_diet',
        '--mem=750G', '-c', '5', '-t', '240', '-C', 'archive', '-p', 'all.q', #'--wait',
        '--wrap', '/ihme/homes/nhashmeh/miniconda3/envs/earth-analytics-python/bin/python ' + '/ihme/homes/nhashmeh/downscale_temperature/merge_era5_stations_script_v3.py ' + file]
    # subprocess.call(asdf)
    # # break

# Step 2: Creating the model
#### Before we can create the model, the outputs from step 1 need to be combined into a single dataframe.

In [4]:
combine_outputs = False # set to True if you want this to run

if combine_outputs == True:
    filepath = '/mnt/share/erf/ERA5/merged_era5_with_stations/'
    outpath = '/mnt/share/erf/ERA5/merged_era5_with_stations/ML_training/'

    # ML_df = pd.DataFrame()
    ML_df = []

    counter = 0

    now = datetime.now()
    current_time = now.strftime("%I:%M:%S %p")
    print("Start", current_time)

    for count, file in enumerate(os.listdir(filepath), 1):
        
        if file.split('.')[-1] != 'feather':
            continue

        temp_df = pd.read_feather(filepath + file)

        ML_df.append(temp_df)

        if count % 50 == 0:
            print(str(count) + " files have been concatenated")
            
    ML_df_final = pd.concat(ML_df).reset_index(drop=True)

    # ML_df_final.to_feather(outpath + 'ML_df_all_v3.feather')

    now = datetime.now()
    current_time = now.strftime("%I:%M:%S %p")
    print("End", current_time)

else:
    print("This file already exists, no need to run this script.")

This file already exists, no need to run this script.


#### Now that we have one table containing all of our temperature data, we can set it up for model training.

In [10]:
# Load in the data if it isn't already loaded
if 'ML_df_final' not in globals():
    ML_df_final = pd.read_feather('/mnt/share/erf/ERA5/merged_era5_with_stations/ML_training/ML_df_all_v3.feather')

# some final processing steps
test_new = ML_df_final

# capture cyclical behavior of annual data using sine/cosine
test_new['day_of_year_sin'] = np.sin(2 * np.pi * test_new['day_of_year']/test_new['total_days_in_year'])
test_new['day_of_year_cos'] = np.cos(2 * np.pi * test_new['day_of_year']/test_new['total_days_in_year'])

# converting years to be relative to 1990 (so instead of 1990-2023, its 0-33) 
test_new['year'] = test_new['year'] - 1990

# removing LCZ values of 0.0 (these represent the ocean)
test_new = test_new[test_new['band_1'] != 0.0]

# removing rows with -999.0 and -999.9 values, which come from climate station errors
values_to_remove = [-999.0, -999.9]  # replace with your actual values
test_new = test_new[~test_new['elevation'].isin(values_to_remove)]

# renaming the band_1 column to lcz_value
test_new.rename(columns={'band_1':'lcz_value'}, inplace=True)

# creating a dummy variable for the lcz_value (this is our only dummy variable)
test_new_one_dummy = pd.get_dummies(test_new, columns=['lcz_value'])

#### Machine learning testing

In [16]:
run_ML_testing = False

if run_ML_testing == True:

    # split the data into features (X) and target (y)
    X_1 = test_new_one_dummy.drop(['temp', 'day_of_year'], axis=1)  # all columns except 'temp' (this is the predictor) and 'day_of_year' (already captured in sine/cosine)
    y_1 = test_new_one_dummy['temp']  # 'temp' column

    # split the data into train and test sets
    X_train_1, X_test_1, y_train_1, y_test_1 = train_test_split(X_1, y_1, test_size=0.2, random_state=42)

    # scale the features
    scaler_1 = StandardScaler()
    X_train_1 = scaler_1.fit_transform(X_train_1)
    X_test_1 = scaler_1.transform(X_test_1)

    ### Initialize the model ###
    model_1 = DecisionTreeRegressor(max_depth=20)

    ### Train the model ###
    model_1.fit(X_train_1, y_train_1) # time varies based on model type and parameters (DTR with max_depth = 20 is ~ 20 minutes)

    # Make predictions
    predictions_1 = model_1.predict(X_test_1)

    # Metrics
    model_score_1 = model_1.score(X_test_1, y_test_1)
    print(f"Model Score: {model_score_1}")

    # calculate the mean absolute error of the model
    mae_1 = mean_absolute_error(y_test_1, predictions_1)
    print(f"Mean Absolute Error: {mae_1}")

    # Mean Squared Error (MSE)
    mse_1 = mean_squared_error(y_test_1, predictions_1)
    print(f"Mean Squared Error: {mse_1}")

    # Root Mean Squared Error (RMSE)
    rmse_1 = np.sqrt(mse_1)
    print(f"Root Mean Squared Error: {rmse_1}")

    # Median Absolute Error (MedAE)
    medae_1 = median_absolute_error(y_test_1, predictions_1)
    print(f"Median Absolute Error: {medae_1}")

    # Explained Variance Score
    evs_1 = explained_variance_score(y_test_1, predictions_1)
    print(f"Explained Variance Score: {evs_1}")

Model Score: 0.9816103832645074
Mean Absolute Error: 1.1688360668087523
Mean Squared Error: 2.912941802343664
Root Mean Squared Error: 1.706734250650541
Median Absolute Error: 0.8451612903225403
Explained Variance Score: 0.9816103842954629


#### Once we're happy with our model's in-sample performance, we can retrain the model without splitting up the data into train/test sets so that our final model is trained on all of our data.

In [None]:
run_ML_full = False

if run_ML_full == True:
    
    # scale the features
    scaler = StandardScaler()
    X_1_scaled = scaler.fit_transform(X_1)

    # Initialize the model
    model = DecisionTreeRegressor(max_depth=20)

    # Train the model
    model.fit(X_1_scaled, y_1)

    # # Save the model to a file
    # dump(model, 'validation_model_one_dummy_v3.joblib') 

    # # Save the scaler to a file
    # dump(scaler, 'validation_scaler_one_dummy_v3.joblib')

# Step 3: Assemble the 1-km grid
#### We need to create a global grid at the desired resolution (1 km), fill it with our variables, and use our model ro predict temperatures at that scale. First create the empty grid:

In [None]:
# extent of our new grid (0.0089 degrees is approximately 1 km at the equator)
new_lat = np.arange(-90, 90, 0.00898315)
new_lon = np.arange(0, 359.9, 0.00898315)

# create a new xarray dataset with the new lat/lon
ds = xr.Dataset(
    coords={
        'latitude': (['latitude'], new_lat),
        'longitude': (['longitude'], new_lon),
    }
)

# convert the dataset to a geodataframe
ds_df = ds.to_dataframe().reset_index()
ds_df = gpd.GeoDataFrame(ds_df, geometry=gpd.points_from_xy(ds_df.longitude, ds_df.latitude))

#### Now we fill the empty grid with our variables. First, the LCZ data:
#### TODO: (THIS SHOULD BE A NON-INTERACTIVE PROCESS SINCE IT TAKES SEVERAL HOURS TO RUN)

In [None]:
lcz_filepath = "/mnt/share/erf/ERA5/merged_era5_with_stations/LCZ_data/lcz_resampled_mode.tif"

# Open the raster file
raster = xr.open_rasterio(lcz_filepath)

# Convert the DataArray to Dataset with a name
raster_lcz = raster.to_dataset(name='lcz_value')

raster_lcz = raster_lcz.rename({'x': 'longitude', 'y': 'latitude'})

raster_lcz = raster_lcz.assign_coords(longitude=(((raster_lcz.longitude + 180) % 360) - 180))
raster_lcz = raster_lcz.sortby(raster_lcz.longitude)
raster_lcz = raster_lcz.assign_coords(longitude=(raster_lcz.longitude.where(raster_lcz.longitude >= 0, raster_lcz.longitude + 360)))

lcz_df = raster_lcz.to_dataframe().reset_index()
lcz_df.drop(columns='band', inplace=True)

lcz_df = gpd.GeoDataFrame(lcz_df, geometry=gpd.points_from_xy(lcz_df.longitude, lcz_df.latitude))

# now we join the empty grid with the LCZ data
# WARNING: THIS TAKES SEVERAL HOURS -- IT MIGHT BE BETTER TO RUN THIS NON-INTERACTIVELY
result = gpd.sjoin_nearest(ds_df, lcz_df)

# remove rows where the lcz_value is 0 (these are the ocean)
result = result[result['lcz_value'] != 0]

# result.to_feather('/mnt/share/erf/ERA5/merged_era5_with_stations/grid_full_no_ocean.feather', index=False)

#### Now we want to add elevation. Since elevation was originally a variable associated with the climate stations, we need a new source for elevation data in our empty grid. We opted to use the SRTM global dataset. Since this is very high resolution and distributed across many files, we are going to break up our grid into smaller chunks (5x5 degrees) to run merges in parallel.

In [None]:
grid_filepath = '/mnt/share/erf/ERA5/merged_era5_with_stations/grid_setup/grid_full_no_ocean.feather'

grid_csv = pd.read_csv(grid_filepath)

now = datetime.now()
current_time = now.strftime("%I:%M:%S %p")
print("Start", current_time)

# Define your latitude and longitude boundaries for your chunks.
lat_bounds = list(range(-60, 81, 5))
lon_bounds = list(range(0, 361, 5))

# Load the entire dataset (if it's too large, consider loading in chunks and processing each chunk separately)
# data = pd.read_csv('file.csv')
data = grid_csv

# Split the dataset into chunks by latitude and longitude
for i in range(len(lat_bounds)-1):
    print("Searching between latitudes " + str(lat_bounds[i]) + " and " + str(lat_bounds[i+1]))
    for j in range(len(lon_bounds)-1):
        chunk = data[(data['latitude_left'] >= lat_bounds[i]) & (data['latitude_left'] < lat_bounds[i+1]) & 
                     (data['longitude_left'] >= lon_bounds[j]) & (data['longitude_left'] < lon_bounds[j+1])]
        
        if len(chunk) > 0:
            now = datetime.now()
            current_time = now.strftime("%I:%M:%S %p")
            print("bounds satisfied, saving chunk at time", current_time)
            
            filename = "chunk_lat" + str(lat_bounds[i]) + "_lat" + str(lat_bounds[i+1]) + "_lon" + str(lon_bounds[j]) + "_lon" + str(lon_bounds[j+1]) + ".feather"

            # chunk.reset_index(drop=True).to_feather("/mnt/share/erf/ERA5/merged_era5_with_stations/grid_setup/chunks_by_latlon_v3/" + filename)

#### Now that our data is chunked, we can add elevations.

In [None]:
chunks_path = "/mnt/share/erf/ERA5/merged_era5_with_stations/grid_setup/chunks_by_latlon_v3/"

for file in os.listdir(chunks_path):

    chunk_number = file.split('.')[0] # get year

    asdf = ['sbatch', '-J', '{}'.format(chunk_number), '-e', '/share/temp/sgeoutput/{}/errors/%x.e%j'.format(getpass.getuser()),
        '-o', '/share/temp/sgeoutput/{}/output/%x.o%j'.format(getpass.getuser()), '-A', 'proj_diet',
        '--mem=300G', '-c', '5', '-t', '360', '-p', 'all.q', #'--wait',
        '--wrap', '/ihme/homes/nhashmeh/miniconda3/envs/earth-analytics-python/bin/python ' + '/ihme/homes/nhashmeh/downscale_temperature/grid_setup/grid_setup_with_elevation_v3.py ' + file]
    # subprocess.call(asdf)

#### TODO: THERE WERE FIXES NEEDED AFTER THIS STEP. SHOULD INCORPORATE THESE INTO THE ABOVE
#### TODO: UPDATE THE WORKER SCRIPT TO OUTPUT THE FILES AS COMPRESSED NETCDF FILES?

In [None]:
chunks_elev_path = "/mnt/share/erf/ERA5/merged_era5_with_stations/grid_setup/chunks_with_elevation_and_era5_v3/"

for file in os.listdir(chunks_elev_path):

    chunk_number = file.split('.')[0] # get chunk

    asdf = ['sbatch', '-J', '{}'.format(chunk_number), '-e', '/share/temp/sgeoutput/{}/errors/%x.e%j'.format(getpass.getuser()),
        '-o', '/share/temp/sgeoutput/{}/output/%x.o%j'.format(getpass.getuser()), '-A', 'proj_erf',
        '--mem=100G', '-c', '5', '-t', '360', '-p', 'all.q', #'--wait',
        '--wrap', '/ihme/homes/nhashmeh/miniconda3/envs/earth-analytics-python/bin/python ' + '/ihme/homes/nhashmeh/downscale_temperature/grid_setup/fixes_v3.py ' + file]
    # subprocess.call(asdf)

#### Now we add the ERA5 data. The outputs of this script are now ready for predictions.

#### TODO: UPDATE THE WORKER SCRIPT TO OUTPUT THE FILES AS COMPRESSED NETCDF FILES?

In [None]:
chunks_elev_path = "/mnt/share/erf/ERA5/merged_era5_with_stations/grid_setup/chunks_with_elevation_v3/"
era5_path = "/mnt/share/erf/ERA5/three_hourly_temp_9km/daily_temps/"

for file in os.listdir(chunks_elev_path):

    chunk_number = file.split('.')[0] # get chunk

    for era5_file in os.listdir(era5_path):

        year = era5_file.split('_')[0]

        jname = chunk_number + '_' + year

        asdf = ['sbatch', '-J', '{}'.format(jname), '-e', '/share/temp/sgeoutput/{}/errors/%x.e%j'.format(getpass.getuser()),
            '-o', '/share/temp/sgeoutput/{}/output/%x.o%j'.format(getpass.getuser()), '-A', 'proj_erf',
            '--mem=250G', '-c', '5', '-t', '480', '-p', 'all.q', #'--wait',
            '--wrap', '/ihme/homes/nhashmeh/miniconda3/envs/earth-analytics-python/bin/python ' + '/ihme/homes/nhashmeh/downscale_temperature/grid_setup/grid_setup_with_era5_v3.py ' + file + ' ' + era5_file]
        # subprocess.call(asdf)

# Step 4: Run predictions

#### The final two steps are: 
#### 1. Run predictions for each chunk
#### 2. Recompile chunks with predictions into a daily output.

#### This can be done in the launch_regional_predictions.ipynb, but maybe I should bring those cells here...

#### Run predictions for each chunk:

In [None]:
##########################
year = 1999 # CHANGE THIS FOR EACH YEAR YOU WANT TO RUN
##########################

path_to_chunks = '/mnt/share/erf/ERA5/merged_era5_with_stations/grid_setup/chunks_with_elevation_and_era5_v3/'
path_to_chunks_year = path_to_chunks + str(year) + '/'
output_path = '/mnt/share/erf/ERA5/merged_era5_with_stations/grid_setup/predictions/year_' + str(year) + '_predictions/'

list_of_files = [name for name in os.listdir(path_to_chunks_year) if os.path.isfile(os.path.join(path_to_chunks_year, name))]

# count the number of files in this directory
n_files = len([name for name in os.listdir(path_to_chunks_year) if os.path.isfile(os.path.join(path_to_chunks_year, name))])

# check the number of files and the list of files
print(n_files)
print(list_of_files)

# make output_path if it doesn't exist
if not os.path.exists(output_path):
    os.makedirs(output_path)

# Create a job array with one job per file
sbatch_command = ['sbatch', '-J', 'regional_predictions', '--array=0-' + str(n_files-1) + '%150',
                  '-e', '/share/temp/sgeoutput/{}/errors/%x.e%j'.format(getpass.getuser()),
                  '-o', '/share/temp/sgeoutput/{}/output/%x.o%j'.format(getpass.getuser()), '-A', 'proj_erf',
                  '--mem=50G', '-c', '5', '-t', '240', '-p', 'all.q',
                  '--wrap', '/ihme/homes/nhashmeh/miniconda3/envs/earth-analytics-python/bin/python ' + 
                      '/mnt/share/code/nhashmeh/downscaling/temperature/regional_predictions.py ' + 
                      '${SLURM_ARRAY_TASK_ID}' + ' ' + str(year) + ' ' + path_to_chunks_year + ' ' + output_path]

print(' '.join(sbatch_command))  # Print the sbatch command
# subprocess.Popen(sbatch_command)

#### Recompile chunks with predictions into a daily output:

In [None]:
## This launches a script for each day of the year. It does the following:
## 1. Reads in each file from the initial prediction output (a 5x5 degree chunk of data for a whole year)
## 2. Extracts the data for the day of interest, does some processing, and appended to a larger list
## 3. All data is concatenated and then converted to an xarray Dataset
## 4. Writes the xarray Dataset to a netCDF file using some compression parameters

year = 2000

# if year is a leap year, the number of days in the year is 366, otherwise it is 365
if year % 4 == 0:
    num_days = 366
else:
    num_days = 365

# create a list of dates for the year of interest
day_list = np.arange(1, num_days + 1) 

input_path = '/mnt/share/erf/ERA5/merged_era5_with_stations/grid_setup/predictions/year_' + str(year) + '_predictions/'
output_path = '/mnt/share/erf/ERA5/merged_era5_with_stations/grid_setup/predictions/by_day_nc/' + str(year) + '/'

# make output_path if it doesn't exist
if not os.path.exists(output_path):
    os.makedirs(output_path)

for day in day_list:

    sbatch_command = ['sbatch', '-J', str(year) + '_' + str(day),
                      '-e', '/share/temp/sgeoutput/{}/errors/%x.e%j'.format(getpass.getuser()),
                      '-o', '/share/temp/sgeoutput/{}/output/%x.o%j'.format(getpass.getuser()), '-A', 'proj_erf',
                      '--mem=50G', '-c', '5', '-t', '240', '-p', 'all.q',
                      '--wrap', '/ihme/homes/nhashmeh/miniconda3/envs/earth-analytics-python/bin/python ' + 
                        #   '/mnt/share/code/nhashmeh/downscaling/temperature/compile_by_day_nc.py ' + 
                      '/mnt/share/code/nhashmeh/downscaling/temperature/daily_preds_nc.py ' + 
                          '${SLURM_ARRAY_TASK_ID}' + ' ' + str(day) + ' ' + str(year) + ' ' + input_path + ' ' + output_path]

    print(' '.join(sbatch_command))  # Print the sbatch command
    # subprocess.Popen(sbatch_command)

## That is the end of the pipeline!
#### TODO: Address problematic outliers in final data. I think this should be addressed right before the chunks with predictions are compiled into daily outputs. However, its easier to fix it from here rather than rerunning that part of the pipeline.