# Running Machine Learning tasks with DAT (Deployable Analysis environmenT) 

Problem at hand:
Train a Machine Learning model on sparse data. Use the model to predict the target variables as map in space and time. 

![data](./figs/big.jpg)

This notebook shows a simple workflow to:

- train a RandomForest model 
- Preprocess data
- Run the model 

This notebook and data are based on the research carried by Qianqian Han https://doi.org/10.5194/egusphere-egu24-5488


## Model training 

### Introducing `dask`, `dask-ml`, `MultiOutputRegressor`

In [1]:
import numpy as np
import pandas as pd

import dask
dask.config.set({'dataframe.query-planning': True})
import dask.dataframe as dd

from sklearn.ensemble import RandomForestRegressor
from sklearn.multioutput import MultiOutputRegressor

from dask_ml.model_selection import train_test_split
from dask_ml.preprocessing import OneHotEncoder

import joblib

In [2]:
import sys
sys.path.append('.')
from utils import training_testing_preprocess, igbp_to_landcover

In [6]:
parent_in_path = "./data"
parent_out_path = "./data"

data_paths = {"input_data": f"{parent_in_path}/model/training_testing_2014.csv",
            "igbp_table": f"{parent_in_path}/auxiliary/lccs_to_igbp_table.csv",
            "igbp_class": f"{parent_in_path}/auxiliary/IGBP11unique.csv",
             }
              
input_vars = ['Rin', 'Rli', 'p', 'Ta', 'ea', 'u', 'CO2', 'LAI','Vcmo', 'hc', 'Precip_msr','SSM', 'IGBP_veg_long', 
              'Rntot', 'LEtot', 'Htot','Gtot', 'Actot', 'SIF685', 'SIF740']

In [7]:
# read data
input_df = dd.read_csv(data_paths["input_data"], usecols=input_vars)

# define one hot encoding for IGBP using dask-ml functions
encoder = OneHotEncoder(sparse_output=False)

# preprocess data
input_df = training_testing_preprocess(input_df)
igbp_class = pd.read_csv(data_paths["igbp_class"])['0'].unique()
input_df = igbp_to_landcover(input_df, encoder, igbp_class)



In [8]:
# training multiple outputs
x_vars = ['Rin', 'Rli', 'p', 'Ta', 'ea', 'u', 'CO2', 'LAI','Vcmo', 'hc', 'Precip_msr','SSM', *[f'IGBP_veg_long{i}' for i in range(1, 12)]]
x = input_df[x_vars]

y_vars = ['LEtot','Htot','Rntot','Gtot', 'Actot','SIF685', 'SIF740']
y = input_df[y_vars]

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.25, random_state=0, shuffle=True)

regressor = MultiOutputRegressor(
    RandomForestRegressor(n_estimators=10,
                          max_depth=20,
                          random_state=0,
                          n_jobs=1,
                          min_samples_split=10,
                          min_samples_leaf=4),
    n_jobs=7,
)

regressor

In [10]:
x_train_values = x_train.compute()
y_train_values = y_train.compute()

# set joblib to use dask
with joblib.parallel_backend('dask'):
    regressor.fit(x_train_values, y_train_values)

CPU times: user 3.26 s, sys: 617 ms, total: 3.87 s
Wall time: 32.2 s


In [11]:
joblib.dump(regressor, f"{parent_out_path}/model/model_multi.joblib")
print("model is saved")

model is saved


## Preparing data

### Introducing `xr.open_mfdataset` and `xr.Dataset.to_zarr`

#### skip running during the presentation

In [None]:
import xarray as xr
from functools import partial

In [None]:
import sys
sys.path.append('.')
from utils import era5_preprocess, co2_preprocess, fix_coords, fix_time

In [None]:
parent_in_path = f"./data/global"
data_paths = {"era5land": f"{parent_in_path}/era5land/*.nc",
            "lai": f"{parent_in_path}/lai_v2/*.nc",
            "ssm": f"{parent_in_path}/ssm/GlobalGSSM11km2014_20240214.tif",
            "co2": f"{parent_in_path}/co2/CAMS_CO2_2003-2020.nc",
            "landcover": f"{parent_in_path}/igbp/landcover10km_global.nc",
            "vcmax": f"{parent_in_path}/vcmax/TROPOMI_Vmax_Tg_mean10km_global.nc",
            "canopyheight": f"{parent_in_path}/canopy_height/canopy_height_11kmEurope20230921_10km.nc",
            }

parent_out_path = "./data/EU"

# region of interest here EU
bbox = [-31.28903052,  34.93055094,  68.93136141,  81.85192337]

# time series
start_time = "2014-1-31"
end_time = "2014-02-10"

In [None]:
chunks = 500

co2_partial_func = partial(co2_preprocess, start_time=start_time, end_time=end_time)

for data_path in data_paths:
    
    if data_path == "era5land":
        ds = xr.open_mfdataset(data_paths[data_path], preprocess=era5_preprocess)
    
    if data_path == "co2":
        ds = xr.open_mfdataset(data_paths[data_path], preprocess=co2_partial_func)
        ds = ds.assign_coords(longitude=(((ds.longitude + 180) % 360) - 180))       

    else:
        ds = xr.open_mfdataset(data_paths[data_path], preprocess=fix_coords)
        
    # convert day of year
    ds = fix_time(ds, start_time)
        
    ds_sorted = ds.sortby(['longitude', 'latitude'])
    masked_ds = ds_sorted.sel(longitude=slice(bbox[0], bbox[2]), latitude=slice(bbox[1], bbox[3]), time=slice(start_time, end_time))
    
    masked_ds = masked_ds.chunk(chunks=chunks)
    
    # save data to zarr
    out_path = f"{parent_out_path}/{data_path}_{start_time}_{end_time}_EU.zarr"
    masked_ds.to_zarr(out_path, mode='w')
    print(f"{out_path} is saved")
    print("=======================================")

## Interpolations and Variable derivation
#### Skip running during the presentation

In [None]:
import xarray as xr
import numpy as np
import pandas as pd
import dask.array as da
from PyStemmusScope import variable_conversion as vc
import sys
sys.path.append('.')
from utils import interpolation, era5land_accumulated_vars, map_landcover_to_igbp, landcover_to_igbp
from dask_ml.preprocessing import OneHotEncoder

start_time = "2014-1-31"
end_time = "2014-02-10"

parent_in_path = "./data"
parent_in_path = "/scratch-shared/falidoost"
data_paths = {"era5land": f"{parent_in_path}/EU/era5land_{start_time}_{end_time}_EU.zarr",
              "lai": f"{parent_in_path}/EU/lai_{start_time}_{end_time}_EU.zarr",
              "ssm": f"{parent_in_path}/EU/ssm_{start_time}_{end_time}_EU.zarr",
              "co2": f"{parent_in_path}/EU/co2_{start_time}_{end_time}_EU.zarr",
              "landcover": f"{parent_in_path}/EU/landcover_{start_time}_{end_time}_EU.zarr",
              "vcmax": f"{parent_in_path}/EU/vcmax_{start_time}_{end_time}_EU.zarr",
              "canopyheight": f"{parent_in_path}/EU/canopyheight_{start_time}_{end_time}_EU.zarr",
              "all_data": f"{parent_in_path}/EU/all_data_{start_time}_{end_time}_EU.zarr",
              "igbp_table": f"{parent_in_path}/auxiliary/lccs_to_igbp_table.csv",
              "igbp_class": f"{parent_in_path}/auxiliary/IGBP11unique.csv",
             }
parent_out_path = "./data"
parent_in_path = "/scratch-shared/falidoost"

variable_names = {"lai": "LAI",
                  "ssm": "band_data",
                  "co2": "co2",
                  "canopyheight": "__xarray_dataarray_variable__",
                  "vcmax": "__xarray_dataarray_variable__",
                  "landcover": "lccs_class"}  

# interpolation
era5land = xr.open_zarr(data_paths["era5land"])
other_coords = {"time": era5land.time, "longitude": era5land.longitude, "latitude": era5land.latitude}

chunks = {"time": -1, "longitude": 500, "latitude": 500}
for name in variable_names:
    ds = xr.open_zarr(data_paths[name]).chunk(chunks)
    ds_interpolated = interpolation(ds, other_coords)    
    era5land[name] = ds_interpolated[variable_names[name]]

era5land = era5land.chunk(chunks)

# save to zarr
out_path = f"{parent_out_path}/EU/all_data_{start_time}_{end_time}_EU.zarr"
encoding = {var: {'chunks': (era5land.sizes["time"], 500, 500)} for var in era5land.data_vars}
era5land.to_zarr(out_path, mode='w', encoding=encoding)
print(f"{out_path} is saved")

# variable derivation
# read data
chunks = {"time": 100, "longitude": 100, "latitude": 100}
all_data = xr.open_zarr(data_paths["all_data"])
all_data = all_data.chunk(chunks)

# variable calculations
all_data = era5land_accumulated_vars(all_data, "ssrd", "Rin", 3600)
all_data = era5land_accumulated_vars(all_data, "strd", "Rli", 3600)
all_data = era5land_accumulated_vars(all_data, "tp", "Precip_msr", 0.001) # to mm
all_data["p"] = all_data["sp"] / 100  # Pa -> hPa
all_data["Ta"] = all_data["t2m"] - 273.15  # K -> degC
all_data["ea"] = vc.calculate_es(all_data["d2m"] - 273.15)*10 # *10 is for kPa -> hPa
all_data["u"] = (all_data["u10"] ** 2 + all_data["v10"] ** 2) ** 0.5
all_data["ssm"] = all_data["ssm"] / 1000

# convert landcover to IGBP
# lookup tables
igbp_table = pd.read_csv(data_paths["igbp_table"])
igbp_class = pd.read_csv(data_paths["igbp_class"])['0'].unique()

# define one hot encoding for IGBP using dask-ml functions
encoder = OneHotEncoder(sparse_output=False)

# Unsorted categories are not yet supported by dask-ml
igbp_stemmus_scope = np.sort(igbp_table["IGBP_STEMMUS_SCOPE"].to_numpy().reshape(-1,1))
encoder = encoder.fit(igbp_stemmus_scope)  
lookup_table = igbp_table.set_index("lccs_class").T.to_dict('records')[0]

ds = landcover_to_igbp(all_data, "landcover", encoder, lookup_table, igbp_class)
ds = ds.chunk(chunks)

# rename some variables
rename_vars = {"co2": "CO2", "lai": "LAI", "canopyheight": "hc", "ssm": "SSM", "vcmax": "Vcmo"}
ds = ds.rename(rename_vars)

# save to zarr
out_path = f"{parent_out_path}/EU/model_input_{start_time}_{end_time}_EU.zarr"
encoding = {var: {'chunks': (50, 50, 50)} for var in ds.data_vars}
ds.to_zarr(out_path, mode='w', encoding=encoding)
print(f"{out_path} is saved")

## Model prediction

### Introducing `xr.map_blocks`

In [1]:
import xarray as xr
import numpy as np
import dask.array as da

from joblib import load

In [2]:
import sys
sys.path.append('.')
from utils import arr_to_ds

In [3]:
start_time = "2014-1-31"
end_time = "2014-02-10"

parent_in_path = "./data"
parent_out_path = "./data"
parent_in_path = "/scratch-shared/falidoost"
parent_out_path = "/scratch-shared/falidoost/test"

chunks = {"time": 100, "longitude": 500, "latitude": 500}

model_input = xr.open_zarr(f"{parent_in_path}/EU/model_input_{start_time}_{end_time}_EU.zarr")
model_input = model_input.chunk(chunks)

In [4]:
# load model
path_model = f"{parent_in_path}/model/model_multi.joblib"
with open(path_model, 'rb') as f:
    model = load(f)
model

In [5]:
input_vars = [
    'Rin', 'Rli', 'p', 'Ta', 'ea', 'u', 'CO2', 'LAI', 'Vcmo','hc', 'Precip_msr',  
    'SSM',  *[f'IGBP_veg_long{i}' for i in range(1, 12)]
]

# select input/output data 
input_ds = model_input[input_vars]
output_vars = ['LEtot','Htot','Rntot','Gtot', 'Actot','SIF685', 'SIF740']

In [6]:
def predictFlux(input_ds, model, output_vars):

    df_features = input_ds.to_dataframe().reset_index().drop(columns=["time", "longitude", "latitude"])
    
    # Convert the nan value as 0 for the calculation
    df_features = df_features.fillna(0)
    
    LEH = model.predict(df_features)
    
    output_ds = arr_to_ds(LEH, input_ds, output_vars)
    return output_ds

In [7]:
# define output template
output_temp = xr.Dataset()
ds_shape = (input_ds.sizes['time'], input_ds.sizes['latitude'], input_ds.sizes['longitude'])

for var in output_vars:
    output_temp[var] = xr.DataArray(
        name = var,
        data=da.zeros(ds_shape),
        dims=input_ds.dims,
        coords=input_ds.coords,
    )
output_temp = output_temp.chunk(chunks) # the same chunk as input

In [None]:
# result
LEH = xr.map_blocks(
    predictFlux,
    input_ds,
    kwargs={
        "model": model, 
        "output_vars": output_vars, 
    },
    template=output_temp,
)
LEH = LEH.chunk(chunks)

# save data
out_path = f"{parent_out_path}/EU/predicted_{start_time}_{end_time}_EU.zarr"
encoding = {var: {'chunks': (50, 50, 50)} for var in LEH.data_vars}
LEH.to_zarr(out_path, mode='w', encoding=encoding)
print(f"{out_path} is saved")

In [None]:
LEH.LEtot.isel(time=2).plot()

In [None]:
LEH.Rntot.isel(time=2).plot()

In [None]:
LEH.LEtot.sel(latitude=(52.36),longitude=(4.90)).plot()