# Code to build ML techniques to predict seven days of SST over the Mediterranean Sea

#### example of code to generate LSTM, CNN and RForest models to predict seven days of SST in 16 regions of the Mediterranean Sea. ML techniques are trained using 1981-2016 as the training period and 2017-2021 as the testing period.

Import relevant libraries

In [8]:
import pandas as pd
import tensorflow as tf
import xarray as xr
from tensorflow.keras.models import Sequential
from sklearn.ensemble import RandomForestRegressor
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Conv1D
from keras.layers import MaxPooling1D
from tensorflow.keras.layers import Dense, BatchNormalization, Dropout, Activation
from tensorflow.keras.layers import LSTM, GRU
from sklearn.metrics import mean_squared_error
from tensorflow.keras.optimizers import Adam, SGD, RMSprop
from sklearn.metrics import mean_squared_error
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from numpy import *
import statsmodels.api as sm

The following lines of code are specific for CMCC supercomputer in order to support parallel computation of the code on the CMCC cluster. You can find information on how to use the Dask.distributed and dask_jobqueue libraries at https://distributed.dask.org/en/stable/ and https://jobqueue.dask.org/en/latest/install.html. In particular we are asking for 36 cores, each of which starts 9 worker processes, and 2 jobs (i.e. 72 cores and 18 workers). If you do not have a cluster to run the code, you can skip this part and go to the 'Load data' section. Inevitably, the code will slow down and you can fall into memory errors. 

In [12]:
import os
import dask
import dask.distributed
from dask.distributed import Client

from dask_jobqueue import LSFCluster
cluster = LSFCluster(
    queue='p_medium',
    cores=36,
    processes=9,
    memory="85 GB",
    project="0456",
    walltime="04:00",
    job_extra=['-x'],
    #use_stdin=False,
    local_directory=os.environ['HOME']+"/dask-worker-space"
)



In [14]:
cluster.scale(jobs=2)

In [16]:
client = Client(cluster)
client

Task exception was never retrieved
future: <Task finished name='Task-4280' coro=<_wrap_awaitable() done, defined at /home/bill/Documents/Projects/french_hackathon/venv/lib/python3.11/site-packages/distributed/deploy/spec.py:124> exception=RuntimeError('Command exited with non-zero exit code.\nExit code: 127\nCommand:\nbsub< /tmp/tmptrc0ugqb.sh 2> /dev/null\nstdout:\n\nstderr:\n\n')>
Traceback (most recent call last):
  File "/home/bill/Documents/Projects/french_hackathon/venv/lib/python3.11/site-packages/distributed/deploy/spec.py", line 125, in _wrap_awaitable
    return await aw
           ^^^^^^^^
  File "/home/bill/Documents/Projects/french_hackathon/venv/lib/python3.11/site-packages/distributed/deploy/spec.py", line 74, in _
    await self.start()
  File "/home/bill/Documents/Projects/french_hackathon/venv/lib/python3.11/site-packages/dask_jobqueue/core.py", line 426, in start
    out = await self._submit_job(fn)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/bill/Documents/Pr

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.LSFCluster
Dashboard: http://192.168.1.10:8787/status,

0,1
Dashboard: http://192.168.1.10:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://192.168.1.10:41827,Workers: 0
Dashboard: http://192.168.1.10:8787/status,Total threads: 0
Started: 1 minute ago,Total memory: 0 B


Define useful functions 

In [18]:
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
        n_vars = 1 if type(data) is list else data.shape[1]
        df = pd.DataFrame(data)
        cols, names = list(), list()
        # input sequence (t-n, ... t-1)
        for i in range(n_in, 0, -1):
                cols.append(df.shift(i))
                names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
        # forecast sequence (t, t+1, ... t+n)
        for i in range(0, n_out):
                cols.append(df.shift(-i))
                if i == 0:
                        names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
                else:
                        names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
        # put it all together
        agg = pd.concat(cols, axis=1)
        agg.columns = names
        # drop rows with NaN values
        if dropnan:
                agg.dropna(inplace=True)
        return agg


# transform series into train and test sets for supervised learning
def prepare_data(series, In_test, En_test, n_lag, n_seq, exp):
        # extract raw values
        raw_values = series
        raw_values = raw_values.reshape(len(raw_values), 1)
        # transform into supervised learning problem X, y
        supervised = series_to_supervised(raw_values, n_lag, n_seq)
        supervised_values = supervised.values
        # split into train and test sets
        if exp=='Q' or exp=='REXP':
                train = supervised_values[:In_test]
        elif exp=='A':
                train = supervised_values[En_test:]
        else:
                train1=supervised_values[:In_test]
                train2=supervised_values[En_test:]
                train=np.concatenate((train1,train2),0)
        test = supervised_values[In_test:En_test]
        return train, test


def evaluate_forecasts(test, forecasts, n_lag, n_seq):
        a=list()
        for i in range(n_seq):
                actual = [row[i] for row in test]
                predicted = [forecast[i] for forecast in forecasts]
                rmse = np.sqrt(mean_squared_error(actual, predicted))
                print('t+%d RMSE: %f' % ((i+1), rmse))
                a.append(rmse)
        return a
    
def mysel_daily(ds):
    ds = ds.isel(lat=slice(2390,2750), lon=slice(3490,4700))
    return ds

Extract time-series for SST (ESA CCI SST) and predictors (ERA5). The SST dataset used in this study is the European Space Agency (ESA) Climate Change Initiative SST dataset v2.1 (Merchant et al., 2019) and it is freely available at the CEDA catalogue here:
https://catalogue.ceda.ac.uk/uuid/62c0f97b1eac4e0197a674870afe1ee6) from September 1981 to December 2016 and in the
Copernicus CDS here: https://cds.climate.copernicus.eu/cdsapp#!/dataset/satellite-seasurfacetemperature?tab=overview from January 2017 to December 2021. The relevant atmospheric variables are taken from European Centre for Medium-Range  Weather Forecasts (ECMWF) ERA5 dataset (Hersbach et al., 2020) at https://cds.climate.copernicus.eu/cdsapp#!/dataset/
reanalysis-era5-single-levels?tab=overview as daily mean (see the manuscript for variables details).

In [19]:
##ESA CCI dataset
ds = xr.open_mfdataset("./*-ESACCI-L4_GHRSST-SSTdepth-OSTIA-GLOB_CDR2.1-v02.0-fv01.0.nc", combine='by_coords', parallel=True,preprocess=mysel_daily)

OSError: no files to open

In [None]:
####define limits of MedFS regions 
name=['1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16']
lat1=np.array([35, 34.8,34.8,    36.6,  41.2,  32,  36.6, 40,  42.5,30, 35.2, 33.5,30])
lat2=np.array([39, 39.4,39.4,  41.2,44.8, 36.6,  40.6,  42.5,  46,  35.2, 41, 37,33.5])
lon1=np.array([-9, -1, 5,  9.25,  9.25, 9.25, 16.4,14.8,12.2,  21, 21, 26.3, 26.3])
lon2=np.array([-1, 5, 9.25, 15, 12.1,  15, 21,  21,  21, 26.3, 28, 33,33])

In [None]:
###Save SST for areas
for ii in range(0,len(name)):
    ESA=ds.where((ds.lat>lat1[ii]) & (ds.lat<lat2[ii])).where((ds.lon>lon1[ii]) & (ds.lon<lon2[ii]))
    savesst=prova.mean("lon").mean("lat") 
    np.save('./SST_'+name[ii]+'.npy',savesst)

In [None]:
##define ERA5 variables
nameF=['SLP','GEO','WIND','SLP','SENS','LAT','INC']
namevar=['msl','z','vel','msl','sh','lh','inc']

In [None]:
for ii in range(0, len(nameF)):
    ERA5=xr.open_mfdataset("./ERA5_"+nameF[ii]+"/*.nc", combine='by_coords', drop_variables=['month'], parallel=True)
    ERA5=ERA5.interp(longitude=ds.lon, latitude=ds.lat)
    ERA5=ERA5.sel(time=slice("1981-09-01", "2021-12-31"))
    ERA5=ERA5.rename({'longitude': 'lon'})
    ERA5=ERA5.rename({'latitude': 'lat'})
    ERA5= xr.where(ds.mask[0,:,:]==1, ERA5[namevar[ii]], np.nan)
    for xx in range(0,len(name)):
        tosave=ERA5.where((ERA5.lat>lat1[xx]) & (ERA5.lat<lat2[xx])).where((ERA5.lon>lon1[xx]) & (ERA5.lon<lon2[xx]))
        tosave=tosave.mean("lon").mean("lat")
        np.save('./ERA5_'+nameF[ii]+'_'+name[xx]+'.npy',tosave)

Define experiment name (e.g. in this case Reference Experiments (REXP), and test time window indices

In [3]:
itest=12906 ##index of 1January2017
ftest=14732 ##index of 31December2021
ML=['LSTM','CNN', 'RF']

For each region and for each ML technique, the following codes load the data, prepare the data, train and test the model

In [None]:
for ii in range(0,len(name)):
    print(name[ii])
    for mm in range(0,len(ML)):
            SST = np.load('./SST_'+name[ii]+'.npy')
            SST=SST-273.15
            #Wind speed at 10m
            WIND = np.load('./ERA5_WIND_'+name[ii]+'.npy')
            #Geopotential height at 500hPa
            GEO = np.load('./ERA5_GEO_'+name[ii]+'.npy')
            #Mean Sea Level pressure
            SLP = np.load('./ERA5_SLP_'+name[ii]+'MFS.npy')
            #latent heat flux
            LAT = np.load('./ERA5_LAT_'+name[ii]+'.npy')
            #Sensible heat
            SENS = np.load('./ERA5_SENS_'+name[ii]+'.npy')
            #Incoming Solar Radiation 
            INC = np.load('./ERA5_INC_'+name[ii]+'MFS.npy')
            
            ##binary values to identify seasonality
            time=pd.date_range(start="1981-09-01",end="2021-12-31",freq='D')
            MM=np.array(time.month).astype('float64')

            n_lag = 7
            n_seq = 7
            ES= ftest
            IS = itest
            
            
            # Normalize the data
            scaler = MinMaxScaler()
            WIND =WIND.reshape(-1, 1)
            WIND = scaler.fit_transform(WIND)
            SLP =SLP.reshape(-1, 1)
            SLP = scaler.fit_transform(SLP)
            LAT =LAT.reshape(-1, 1)
            LAT = scaler.fit_transform(LAT)
            SENS =SENS.reshape(-1, 1)
            SENS = scaler.fit_transform(SENS)
            INC =INC.reshape(-1, 1)
            INC = scaler.fit_transform(INC)
            GEO =GEO.reshape(-1, 1)
            GEO = scaler.fit_transform(GEO)
            SST =SST.reshape(-1, 1)
            SST = scaler.fit_transform(SST)
            sst_min = scaler.data_min_[0]
            sst_max = scaler.data_max_[0]
            
            # prepare data
            train, test = prepare_data(SST, IS, ES, n_lag, n_seq, exp)
            trainW, testW=prepare_data(WIND, IS, ES, n_lag, n_seq, exp)
            trainSLP, testSLP=prepare_data(SLP, IS, ES, n_lag, n_seq, exp)
            trainHEAT, testHEAT=prepare_data(LAT, IS, ES, n_lag, n_seq, exp)
            trainHEATS, testHEATS=prepare_data(SENS, IS, ES, n_lag, n_seq, exp)
            trainHEATINC, testHEATINC=prepare_data(INC, IS, ES, n_lag, n_seq, exp)
            trainGEO, testGEO=prepare_data(GEO, IS, ES, n_lag, n_seq, exp)
            trainMM, testMM=prepare_data(MM, IS, ES,n_lag, n_seq, exp)
            
            if ML[mm]=='LSTM':
                ###reorder data
                X = np.stack((train[:, 0:n_lag], trainW[:, 0:n_lag],trainHEAT[:, 0:n_lag], trainHEATS[:, 0:n_lag], trainHEATINC[:, 0:n_lag], trainSLP[:, 0:n_lag], trainGEO[:, 0:n_lag],trainMM[:, 0:n_lag]),2)
                y=train[:,n_lag:]
                X_test = np.stack((test[:, 0:n_lag], testW[:, 0:n_lag],testHEAT[:, 0:n_lag], testHEATS[:, 0:n_lag], testHEATINC[:, 0:n_lag],testSLP[:, 0:n_lag],testGEO[:, 0:n_lag],testMM[:, 0:n_lag]),2)
                y_test=test[:,n_lag:]
            
                model = Sequential()
                model.add(LSTM(60, input_shape=(X.shape[1], X.shape[2])))
                model.add(Dense(n_seq,activation='linear'))
                model.compile(loss='mae', optimizer='adam')

                history = model.fit(X, y, epochs=200, batch_size=150, validation_split=0.3, verbose=1, shuffle=False)
                yhat = model.predict(X_test)
                yhat = yhat * (sst_max - sst_min) + sst_min
                y_test= y_test* (sst_max - sst_min) + sst_min
                a=evaluate_forecasts(y_test, list(yhat), n_lag, n_seq)

            if ML[mm]=='CNN':
                
                X = np.stack((train[:, 0:n_lag], trainW[:, 0:n_lag],trainHEAT[:, 0:n_lag], trainHEATS[:, 0:n_lag], trainHEATINC[:, 0:n_lag], trainSLP[:, 0:n_lag], trainGEO[:, 0:n_lag],trainMM[:, 0:n_lag]),2)
                y=train[:,n_lag:]
                X_test = np.stack((test[:, 0:n_lag], testW[:, 0:n_lag],testHEAT[:, 0:n_lag], testHEATS[:, 0:n_lag], testHEATINC[:, 0:n_lag],testSLP[:, 0:n_lag],testGEO[:, 0:n_lag],testMM[:, 0:n_lag]),2)
                y_test=test[:,n_lag:]
                
                #model
                model = Sequential()
                model.add(Conv1D(filters=64, kernel_size=2, activation='relu', input_shape=(X.shape[1], X.shape[2])))
                model.add(MaxPooling1D(pool_size=2))
                model.add(Flatten())
                model.add(Dense(y.shape[1]))
                model.compile(optimizer='adam', loss='mae')

                # fit network
                history = model.fit(X, y, epochs=200, batch_size=150, validation_split=0.3, verbose=1, shuffle=False)
                yhat = model.predict(X_test)
                yhat = yhat * (sst_max - sst_min) + sst_min
                y_test= y_test* (sst_max - sst_min) + sst_min
                a=evaluate_forecasts(y_test, list(yhat), n_lag, n_seq)
                
                
            if ML[mm]=='RF':
                ###reorder data
                X = np.stack((train[:, 0:n_lag], trainW[:, 0:n_lag],trainHEAT[:, 0:n_lag], trainHEATS[:, 0:n_lag], trainHEATINC[:, 0:n_lag], trainSLP[:, 0:n_lag], trainGEO[:, 0:n_lag],trainMM[:, 0:n_lag]),2)
                y=train[:,n_lag:]
                X_test = np.stack((test[:, 0:n_lag], testW[:, 0:n_lag],testHEAT[:, 0:n_lag], testHEATS[:, 0:n_lag], testHEATINC[:, 0:n_lag],testSLP[:, 0:n_lag],testGEO[:, 0:n_lag],testMM[:, 0:n_lag]),2)
                y_test=test[:,n_lag:]
                X_tr=X.reshape([y.shape[0],X.shape[1] * X.shape[2]])
                Y_tr=y
                X_ts=X_test.reshape([y_test.shape[0],X.shape[1] * X.shape[2]])

        
                # Instantiate model with 1000 decision trees
                rf = RandomForestRegressor(n_estimators = 100, random_state = 42, verbose=1)
                rf.fit(X_tr, Y_tr)
                yhat = rf.predict(X_ts)
                yhat = yhat * (sst_max - sst_min) + sst_min
                y_test= y_test* (sst_max - sst_min) + sst_min
                a=evaluate_forecasts(y_test, list(yhat), n_lag, n_seq)
                