# ML on ESDC using PyTorch including Transfer Learning
A DeepESDL example notebook

## Linear Regression for prediction of missing land surface temperature values from air temperature values
This notebook demonstrates how to implement Machine Learning on the Earth System Data Cube using the ML library PyTorch, how to safe the model and how to reload it for a second task (Transfer Learning). The workflow is self-contained and based on a generic use case to showcase data loading, sampling strategies, model training, model evaluation and visualisation.

Please, also refer to the DeepESDL documentation and visit the platform's website for further information!

ScaDS.AI, 2023

**This notebook runs with the python environment deepesdl-ml-transfer-learning, please checkout the documentation for help on [changing the environment](https://deepesdl.readthedocs.io/en/v2022.12.1/guide/jupyterlab/).**

### Import necessary libraries


In [1]:
import math
import numpy as np
import xarray as xr
from xcube.core.store import new_data_store

import pandas as pd
import dask.array as da
import torch
from torch.utils.data import TensorDataset, DataLoader
from torch import nn
from torch.nn.functional import normalize

import nbimporter

# add path, if mltools not installed
import sys
sys.path.append('../mltools')

### Load Data (Earth System Data Cube)
We load the ESDC (*.zarr) from the s3 data store (lazy load). The ESDC consists of three dimensions (longitude, latitude, time). Out of many available cube variables, which are dask arrays, we load two ("land_surface_temperature", "air_temperature_2m"). 

In [2]:
data_store = new_data_store("s3", root="esdl-esdc-v2.1.1", storage_options=dict(anon=True))
dataset = data_store.open_data('esdc-8d-0.083deg-184x270x270-2.1.1.zarr')
ds = dataset[['land_surface_temperature', 'air_temperature_2m']]
ds

Unnamed: 0,Array,Chunk
Bytes,63.96 GiB,51.17 MiB
Shape,"(1840, 2160, 4320)","(184, 270, 270)"
Dask graph,1280 chunks in 2 graph layers,1280 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 63.96 GiB 51.17 MiB Shape (1840, 2160, 4320) (184, 270, 270) Dask graph 1280 chunks in 2 graph layers Data type float32 numpy.ndarray",4320  2160  1840,

Unnamed: 0,Array,Chunk
Bytes,63.96 GiB,51.17 MiB
Shape,"(1840, 2160, 4320)","(184, 270, 270)"
Dask graph,1280 chunks in 2 graph layers,1280 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,63.96 GiB,51.17 MiB
Shape,"(1840, 2160, 4320)","(184, 270, 270)"
Dask graph,1280 chunks in 2 graph layers,1280 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 63.96 GiB 51.17 MiB Shape (1840, 2160, 4320) (184, 270, 270) Dask graph 1280 chunks in 2 graph layers Data type float32 numpy.ndarray",4320  2160  1840,

Unnamed: 0,Array,Chunk
Bytes,63.96 GiB,51.17 MiB
Shape,"(1840, 2160, 4320)","(184, 270, 270)"
Dask graph,1280 chunks in 2 graph layers,1280 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [3]:
ds = xr.open_zarr('lst_small.zarr')

### Assign a random train/test split

In [4]:
from mltools.data_assignment import rand
xds = ds.assign({"split": rand})
xds

(('time', 'lat', 'lon'), dask.array<lt, shape=(10, 2160, 4320), dtype=bool, chunksize=(10, 270, 270), chunktype=numpy.ndarray>)


Unnamed: 0,Array,Chunk
Bytes,355.96 MiB,2.78 MiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 355.96 MiB 2.78 MiB Shape (10, 2160, 4320) (10, 270, 270) Dask graph 128 chunks in 2 graph layers Data type float32 numpy.ndarray",4320  2160  10,

Unnamed: 0,Array,Chunk
Bytes,355.96 MiB,2.78 MiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,355.96 MiB,2.78 MiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 355.96 MiB 2.78 MiB Shape (10, 2160, 4320) (10, 270, 270) Dask graph 128 chunks in 2 graph layers Data type float32 numpy.ndarray",4320  2160  10,

Unnamed: 0,Array,Chunk
Bytes,355.96 MiB,2.78 MiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,88.99 MiB,711.91 kiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,bool numpy.ndarray,bool numpy.ndarray
"Array Chunk Bytes 88.99 MiB 711.91 kiB Shape (10, 2160, 4320) (10, 270, 270) Dask graph 128 chunks in 2 graph layers Data type bool numpy.ndarray",4320  2160  10,

Unnamed: 0,Array,Chunk
Bytes,88.99 MiB,711.91 kiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,bool numpy.ndarray,bool numpy.ndarray


### Model set up

Select cuda device if available to use GPU ressources

In [5]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cpu device


#### Define model, loss and error

In [6]:
# model, loss and optimizer
class Model(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        return x

reg_model = Model(input_size=1, hidden_size=1, output_size=1)
mse_loss = nn.MSELoss()
optimizer = torch.optim.SGD(reg_model.parameters(), lr=0.0001)

Get range (min, max) and statistics (mean, std) of data variables for normalization or standardization.

In [7]:
from mltools.statistics import get_range, get_statistics

at_range = get_range(ds, 'air_temperature_2m')
lst_range = get_range(ds, 'land_surface_temperature')

at_stat = get_statistics(ds, 'air_temperature_2m')
lst_stat = get_statistics(ds, 'land_surface_temperature')

### Train model

We iterate through the chunks of the ESDC. The data will be preprocessed by flattening, removing NaNs, normalization or standardization. Further, we will split the data into a training and testing fraction. We generate a train data loader and a test data loader and perform a linear regression. The train and test errors are returned during model training.

In [8]:
from mltools.cube_utilities import iter_data_var_blocks
from mltools.statistics import standardize
from mltools.torch_training import train_one_epoch, test

for chunk in iter_data_var_blocks(xds): 
    ### preprocessing 
    # flatten
    cf = {x: chunk[x].ravel() for x in chunk.keys()}
    # drop nans
    lst = cf['land_surface_temperature']
    cfn = {x: cf[x][~np.isnan(lst)] for x in cf.keys()}

    if len(cfn['land_surface_temperature']) > 0:
        #X = normalize(cfn['air_temperature_2m'], 'air_temperature_2m')
        #y = normalize(cfn['land_surface_temperature'], 'land_surface_temperature')
        X = standardize(cfn['air_temperature_2m'],*at_stat)
        y = standardize(cfn['land_surface_temperature'], *lst_stat)
        
        
        ### get train/test data 
        X_train = X[cfn['split']==True]
        X_test  = X[cfn['split']==False]
        y_train = y[cfn['split']==True]
        y_test  = y[cfn['split']==False]
        
        inputs  = torch.tensor(X_train)
        outputs =  torch.tensor(y_train)
        
        train_ds = TensorDataset(inputs, outputs)
        test_ds  = TensorDataset(torch.tensor(X_test), torch.tensor(y_test))
        
        trainloader = DataLoader(train_ds, batch_size=50, shuffle=True)
        testloader  = DataLoader(test_ds, batch_size=50, shuffle=True)
        
        ### train model 
        for i in range(3):
            reg_model,train_pred,loss = train_one_epoch(i, trainloader, reg_model, mse_loss, optimizer, device)
            print(f"Training Error: Avg loss: {loss:>8f}")
            test_pred, test_loss = test(testloader, reg_model, mse_loss, device)
            print(f"Test Error: Avg loss: {test_loss:>8f} \n")

Training Error: Avg loss: 0.076649
Test Error: Avg loss: 0.076265 

Training Error: Avg loss: 0.078450
Test Error: Avg loss: 0.075798 

Training Error: Avg loss: 0.075208
Test Error: Avg loss: 0.075414 

Training Error: Avg loss: 0.135137
Test Error: Avg loss: 0.134245 

Training Error: Avg loss: 0.131426
Test Error: Avg loss: 0.132345 

Training Error: Avg loss: 0.132978
Test Error: Avg loss: 0.130305 

Training Error: Avg loss: 0.136137
Test Error: Avg loss: 0.136575 

Training Error: Avg loss: 0.131909
Test Error: Avg loss: 0.132185 

Training Error: Avg loss: 0.128278
Test Error: Avg loss: 0.127005 

Training Error: Avg loss: 0.080998
Test Error: Avg loss: 0.081088 

Training Error: Avg loss: 0.076586
Test Error: Avg loss: 0.077018 

Training Error: Avg loss: 0.073148
Test Error: Avg loss: 0.072519 

Training Error: Avg loss: 0.084229
Test Error: Avg loss: 0.085255 

Training Error: Avg loss: 0.080813
Test Error: Avg loss: 0.080335 

Training Error: Avg loss: 0.076077
Test Error: A

### Save pre-trained model

In [9]:
torch.save(reg_model.state_dict(), 'trained_model.pt')

### Load pre-trained model and set up
We load the pre-trained model weights into a modified model. The last layer of the pre-trained model is replaced by a new one.
The modified model is then trained on a second task.

In [10]:
# Define the modified model
class ModifiedModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        # no layer 4

        # Add a new layer
        self.fc5 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc5(x) # This is the new layer
        return x

# Create an instance of the modified model
reg_model = ModifiedModel(input_size=1, hidden_size=1, output_size=1)

# Load the pre-trained model weights
# strict = False: ignores non matching keys
reg_model.load_state_dict(torch.load('trained_model.pt'), strict=False)
reg_model.eval()

mse_loss = nn.MSELoss()

optimizer = torch.optim.SGD(reg_model.parameters(), lr=0.01)

# use gpu if available
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cpu device


#### Load Data
Here we use the same ESDC data as before. Normally you would use other data.

In [11]:
data_store = new_data_store("s3", root="esdl-esdc-v2.1.1", storage_options=dict(anon=True))
dataset = data_store.open_data('esdc-8d-0.083deg-184x270x270-2.1.1.zarr')
#dataset = xr.open_zarr('lst_small.zarr')
ds = dataset[['land_surface_temperature', 'air_temperature_2m']]
ds

Unnamed: 0,Array,Chunk
Bytes,355.96 MiB,2.78 MiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 355.96 MiB 2.78 MiB Shape (10, 2160, 4320) (10, 270, 270) Dask graph 128 chunks in 2 graph layers Data type float32 numpy.ndarray",4320  2160  10,

Unnamed: 0,Array,Chunk
Bytes,355.96 MiB,2.78 MiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,355.96 MiB,2.78 MiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 355.96 MiB 2.78 MiB Shape (10, 2160, 4320) (10, 270, 270) Dask graph 128 chunks in 2 graph layers Data type float32 numpy.ndarray",4320  2160  10,

Unnamed: 0,Array,Chunk
Bytes,355.96 MiB,2.78 MiB
Shape,"(10, 2160, 4320)","(10, 270, 270)"
Dask graph,128 chunks in 2 graph layers,128 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


### Assign random train/test split

In [12]:
from mltools.data_assignment import rand
xds = ds.assign({"split": rand})

(('time', 'lat', 'lon'), dask.array<lt, shape=(10, 2160, 4320), dtype=bool, chunksize=(10, 270, 270), chunktype=numpy.ndarray>)


Get range (min, max) and statistics (mean, std) of data variables for normalization or standardization.

In [13]:
from mltools.statistics import get_range, get_statistics

at_range = get_range(ds, 'air_temperature_2m')
lst_range = get_range(ds, 'land_surface_temperature')

at_stat = get_statistics(ds, 'air_temperature_2m')
lst_stat = get_statistics(ds, 'land_surface_temperature')

### Train pre-trained model

In [14]:
from mltools.cube_utilities import iter_data_var_blocks
from mltools.statistics import standardize
from mltools.torch_training import test, train_one_epoch

for chunk in iter_data_var_blocks(xds): 
    ### preprocessing 
    # flatten
    cf = {x: chunk[x].ravel() for x in chunk.keys()}
    # drop nans
    lst = cf['land_surface_temperature']
    cfn = {x: cf[x][~np.isnan(lst)] for x in cf.keys()}

    if len(cfn['land_surface_temperature']) > 0:
        #X = normalize(cfn['air_temperature_2m'], 'air_temperature_2m')
        #y = normalize(cfn['land_surface_temperature'], 'land_surface_temperature')
        X = standardize(cfn['air_temperature_2m'],*at_stat)
        y = standardize(cfn['land_surface_temperature'], *lst_stat)
               
        ### get train/test data 
        X_train = X[cfn['split']==True]
        X_test  = X[cfn['split']==False]
        y_train = y[cfn['split']==True]
        y_test  = y[cfn['split']==False]
        
        inputs  = torch.tensor(X_train)
        outputs =  torch.tensor(y_train)
        
        train_ds = TensorDataset(inputs, outputs)
        test_ds  = TensorDataset(torch.tensor(X_test), torch.tensor(y_test))
        
        trainloader = DataLoader(train_ds, batch_size=50, shuffle=True)
        testloader  = DataLoader(test_ds, batch_size=50, shuffle=True)
        
        ### train model 
        for i in range(3):
            reg_model,train_pred,loss = train_one_epoch(i, trainloader, reg_model, mse_loss, optimizer, device)
            print(f"Training Error: Avg loss: {loss:>8f}")
            test_pred, test_loss = test(testloader, reg_model, mse_loss, device)
            print(f"Test Error: Avg loss: {test_loss:>8f} \n")

Training Error: Avg loss: 0.034492
Test Error: Avg loss: 0.034739 

Training Error: Avg loss: 0.034933
Test Error: Avg loss: 0.034965 

Training Error: Avg loss: 0.034783
Test Error: Avg loss: 0.034735 

Training Error: Avg loss: 0.040882
Test Error: Avg loss: 0.041278 

Training Error: Avg loss: 0.041230
Test Error: Avg loss: 0.040728 

Training Error: Avg loss: 0.040803
Test Error: Avg loss: 0.040657 

Training Error: Avg loss: 0.037781
Test Error: Avg loss: 0.037870 

Training Error: Avg loss: 0.037651
Test Error: Avg loss: 0.037864 

Training Error: Avg loss: 0.037636
Test Error: Avg loss: 0.037932 

Training Error: Avg loss: 0.033016
Test Error: Avg loss: 0.033087 

Training Error: Avg loss: 0.033689
Test Error: Avg loss: 0.033062 

Training Error: Avg loss: 0.033083
Test Error: Avg loss: 0.033182 

Training Error: Avg loss: 0.057400
Test Error: Avg loss: 0.057069 

Training Error: Avg loss: 0.057204
Test Error: Avg loss: 0.057110 

Training Error: Avg loss: 0.057020
Test Error: A