# How to work with ERA5 land hourly and Eurostat data on Earth Data Hub
### Forecast of crop production

***
This notebook will provide you guidance on how to access and use the [`reanalysis-era5-land-no-antartica-v0.zarr`](https://earthdatahub.com/collections/era5/datasets/reanalysis-era5-land) dataset on Earth Data Hub in combination with Eurostat crop production.

Eurostat crop production is available for NUTS levels 0, 1, 2 and starting with the year 2000. The time lag for updates to the database is about two years. Detailed data on NUTS level 2 stop with 2015.

The crops of interest here are Cereals (excluding rice) for the production of grain (including seed) for which annual yield in t/ha will be used.

In this tutorial, we try to use air temperature and precipitation as a predictor for annual crop yields.

The goal is to train and evaluate a neural network to predict annual crop yields from air temperature and precipitation.


## What you will learn:

* how to access and preview the dataset
* select and reduce the data
* set up and train a neural network
* plot the results

## Installations for Google Colab

Install required packages for Google Colab.

In [None]:
!pip install zarr
!pip install s3fs==2023.6.0
!pip install cartopy==0.21.0

## Software Requirements

Load required packages.

In [None]:
%matplotlib inline
import xarray as xr
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import sklearn
import sklearn.ensemble
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import scipy.stats
from scipy.stats import pearsonr
from tqdm import tqdm
import xarray as xr
import pandas as pd
import numpy as np
from cartopy import crs as ccrs
import cartopy.feature as cfeature
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader


## Data access and preview
***

Xarray and Dask work together following a lazy principle. This means when you access and manipulate a Zarr store the data is in not immediately downloaded and loaded in memory. Instead, Dask constructs a task graph that represents the operations to be performed. A smart user will reduce the amount of data that needs to be downloaded before the computation takes place (e.g., when the `.compute()` or `.plot()` methods are called).

To preview the data, only the dataset metadata must be downloaded. Xarray does this automatically:

***

Data access with HTTPS:

In [None]:
# import with https

# ERA5 land
#dataset = "https://earthdatahub.com/stores/ecmwf-era5-land/reanalysis-era5-land-no-antartica-v1.zarr"
# ERA5 single levels land + ocean
dataset = "https://earthdatahub.com/stores/ecmwf-era5-single-levels/reanalysis-era5-single-levels-v0.zarr"
ds_era5 = xr.open_dataset(
    dataset,
    chunks={},
    engine="zarr",
    storage_options={"client_kwargs": {"trust_env": "true"}},
)

ds_era5

Eurostat data, spatial extents should be latitude [26.0, 73.9], longitude [-27.0, 44.9]

In [None]:
dataset = "https://earthdatahub.com/stores/eurostat/apro_cpshr-20000101-20240101.zarr"

ds_eurostat = xr.open_dataset(
    dataset,
    engine="zarr",
    chunks={}
)

ds_eurostat

## Working with data

Datasets on EDH are typically very large and remotely hosted. Typical use cases imply a selection of the data followed by one or more reduction steps to be performed in a local or distributed Dask environment.

The structure of a workflow that uses EDH data looks like this:
1. data selection
2. data reduction
3. (optional) visualization

The Eurostat dataset used here starts with the year 2000, while the ERA5 data start with 1940, thus we select only the time period from 2000 onwards.

In [None]:
# appropriate time period
ds_era5 = ds_era5.sel(valid_time=slice("2000-01-01", "2024-12-31"))

Note that latitudes are decreasing from 90 to -90 degrees and longitudes are increasing from 0 to 360 degrees. Longitudes need to be wrapped to -180, 180.

In [None]:
# wrap longitudes from 0, 360 to -180, 180

# from https://docs.xarray.dev/en/stable/generated/xarray.DataArray.assign_coords.html
ds_era5 = ds_era5.assign_coords(longitude=(((ds_era5.longitude + 180) % 360) - 180))

#ds_era5.assign_coords(longitude=(xr.where(ds_era5['longitude'] > 180, ds_era5['longitude'] - 360, ds_era5['longitude'])))
# sort the data, needed!
ds_era5 = ds_era5.reindex({ 'longitude' : np.sort(ds_era5['longitude'])})

ds_era5

## Air temperature anomaly

### 1. Data selection

From the original dataset we extract the air temperature (variable `t2m`) and perform a geographical selection corresponding to Europe. This greatly reduces the amount of data that will be downloaded from EDH.

Convert from Kelvin to Celsius.

Select temperature (t2m) and precipitation (tp)

In [None]:
# geographical selection
# Eurostat extent: latitude [26.0, 73.9], longitude [-27.0, 44.9]
ds_t2m = ds_era5.t2m.sel(**{"latitude": slice(72, 36), "longitude": slice(-10, 32)})
# convert from Kelvin to Celsius
ds_t2m = ds_t2m.astype("float32") - 273.15
ds_t2m.attrs["units"] = "C"
ds_t2m

At this point, no data has been downloaded yet, nor loaded in memory.

### 2. Data reduction

We compute the monthly air temperature averages in the selected region over the reference period.

Monthly averages:

In [None]:
%%time

ds_t2m_monthly = ds_t2m.resample(valid_time="1M").mean(dim="valid_time")
ds_t2m_monthly = ds_t2m_monthly.compute()
ds_t2m_monthly

Long-term monthly averages

In [None]:
ds_t2m_monthly_lt = ds_t2m_monthly.groupby("valid_time.month").mean("valid_time")

Monthly anomalies

In [None]:
#climatology = ds.groupby("time.month").mean("valid_time")
#anomalies = ds.groupby("time.month") - climatology

climatology = ds_t2m_monthly.groupby("valid_time.month").mean("valid_time")
anomalies = ds_t2m_monthly.groupby("valid_time.month") - climatology
anomalies

Growing season from May to September: anomalies

In [None]:
ds_t2m_anom_season = anomalies.sel(valid_time=anomalies.valid_time.dt.month.isin([5, 6, 7, 8, 9]))

Averages for each year

In [None]:
#ds_t2m_anom_year = ds_t2m_anom_season.groupby("valid_time.year")

# or

ds_t2m_anom_year = ds_t2m_anom_season.resample(valid_time="1Y").mean(dim="valid_time")

After that, we can compute the monthly temperature anomalies in the same area. Calling `compute()` on the result will trigger the download and computation, needed to load the data to the neural network.

We can mesure the time it takes, should be about 4 min:

In [None]:
%%time

ds_t2m_anom_year = ds_t2m_anom_year.compute()
ds_t2m_anom_year

## Precipitation anomaly

Same procedure as for temperature.

### 1. Data selection

In [None]:
# geographical selection
ds_tp = ds_era5.tp.sel(**{"latitude": slice(72, 36), "longitude": slice(-10, 32)})
# convert from meters to millimeters
ds_tp = ds_tp.astype("float32") * 1000.0
ds_tp

### 2. Data reduction

Instead of monthly averages, we compute the monthly precipitation sums in the selected region over the reference period.

Monthly sums:

In [None]:
%%time

ds_tp_monthly = ds_tp.resample(valid_time="1M").reduce(np.sum)
ds_tp_monthly = ds_tp_monthly.compute()
ds_tp_monthly

Long-term monthly averages

In [None]:
ds_tp_monthly_lt = ds_tp_monthly.groupby("valid_time.month").mean("valid_time")

Monthly anomalies

In [None]:
#climatology = ds.groupby("time.month").mean("valid_time")
#anomalies = ds.groupby("time.month") - climatology

climatology = ds_tp_monthly.groupby("valid_time.month").mean("valid_time")
anomalies = ds_tp_monthly.groupby("valid_time.month") - climatology


Growing season from May to September: anomalies

In [None]:
ds_tp_anom_season = anomalies.sel(valid_time=anomalies.valid_time.dt.month.isin([5, 6, 7, 8, 9]))

Averages for each year

In [None]:
#ds_tp_anom_year = ds_tp_anom_season.groupby("valid_time.year")

# or

ds_tp_anom_year = ds_tp_anom_season.resample(valid_time="1Y").mean(dim="valid_time")

After that, we can compute the monthly precipitation anomalies in the same area. Calling compute() on the result will trigger the download and computation, needed to load the data to the neural network.

We can mesure the time it takes, should be about 4 min:


In [None]:
%%time

ds_tp_anom_year = ds_tp_anom_year.compute()
ds_tp_anom_year

## Crop yields

Annual crop yields on NUTS2 level are calculated as t/ha from harvested production and harvested area.

In [None]:
# TODO
ds_eurostat = ds_eurostat.sel(**{"lat": slice(36, 72), "lon": slice(-10, 32)})
#  && !np.isnan(x.L2_A_C1000_AR)
ds_eurostat = ds_eurostat.assign(cropyield = lambda x: x.L2_A_C1000_PR_HU_EU.where((x.L2_A_C1000_AR != 0) & (np.isnan(x.L2_A_C1000_AR) == False), 0) / x.L2_A_C1000_AR)
ds_eurostat

In [None]:
#ds_eurostat = ds_eurostat.compute()
ds_cropyield = ds_eurostat.cropyield
ds_cropyield.compute()
ds_cropyield

### 3. Visualization
We can plot crop yields for a given year, e.g. 2015, on a map.

**! This crashes the Google Colab session !**

In [None]:
if False:
  ds_cropyield_2015 = ds_cropyield.sel(time=["2015-01-01"])

  _, ax = plt.subplots(
      figsize=(6, 6),
      subplot_kw={"projection":  ccrs.Miller()},
  )
  ds_cropyield_2015.plot(
      ax=ax,
      cmap="Blues",
      transform=ccrs.PlateCarree(),
      cbar_kwargs={"orientation": "horizontal", "pad": 0.05, "aspect": 40, "label": "Sea Surface Height anomaly [m]"},
  )
  ax.coastlines()
  ax.add_feature(cfeature.BORDERS)
  ax.set_title("Crop yields 2015")
  plt.show()

## Data preparation for AI

The two xarrays must have the same spatial resolution and extents. Therefore we need to adjust one xarray to the other.

In [None]:
# interpolate ERA5 data to match eurostat data

ds_t2m_anom_year_interp = ds_t2m_anom_year.interp(latitude=ds_cropyield["lat"], longitude=ds_cropyield["lon"])
ds_tp_anom_year_interp = ds_tp_anom_year.interp(latitude=ds_cropyield["lat"], longitude=ds_cropyield["lon"])


In [None]:
ds_t2m_anom_year_interp

We have already computed monthly temperature and precipitation anomalies from the ERA5 dataset and now need to train a network.

Helper functions to manage input data:

In [None]:
# Scaffold code to load in data.  This code cell is mostly data wrangling

def assemble_predictors(ds_t2m, ds_tp,
                        start_date, end_date):
  """
  inputs
  ------

      ds_t2m            xarray : input xarray dataset with temperature
      ds_tp             xarray : input xarray dataset with precipitation
      start_date           str : the start date from which to extract sst
      end_date             str : the end date

  outputs
  -------
      Returns the predictors (np array of temperature and precipitation anomalies)

  """

  t2m = ds_t2m.sel(valid_time=slice(start_date, end_date))
  tp = ds_tp.sel(valid_time=slice(start_date, end_date))

  # t2m and tp are (num_samples, lat, lon) arrays
  # the line below combines them to (num_samples, num_params, lat, lon)
  # with num_params = 2
  # combine two xarray data arrays to one numpy array
  combined_arr = np.stack((t2m, tp), axis=1)

  num_samples = combined_arr.shape[0]

  combined_arr[np.isnan(combined_arr)] = 0

  return combined_arr.astype(np.float32)

def assemble_predictors_predictands(ds_t2m, ds_tp, ds_crop,
                                    start_date, end_date):
  """
  inputs
  ------

      ds_t2m            xarray : input xarray dataset with temperature
      ds_tp             xarray : input xarray dataset with precipitation
      ds_crop           xarray : input xarray dataset with crop yields
      start_date           str : the start date from which to extract sst
      end_date             str : the end date

  outputs
  -------
      Returns a tuple of the predictors (np array of temperature and precipitation anomalies)
      and the predictands (np array of crop yields).

  """

  X = assemble_predictors(ds_t2m, ds_tp, start_date, end_date)

  cropyields = ds_crop.sel(time=slice(start_date, end_date))

  # convert xarray data array to numpy array
  cropyields = cropyields.to_numpy()
  cropyields[np.isnan(cropyields)] = 0
  y = cropyields

  return X.astype(np.float32), y.astype(np.float32)


class ERA5Dataset(Dataset):
    def __init__(self, predictors, predictands):
        self.predictors = predictors
        self.predictands = predictands
        assert self.predictors.shape[0] == self.predictands.shape[0], \
               "The number of predictors must equal the number of predictands!"

    def __len__(self):
        return self.predictors.shape[0]

    def __getitem__(self, idx):
        return self.predictors[idx], self.predictands[idx]



## Train a Simple Convolutional Neural Network to model crop yields

Let's define a simple convolutional neural network architecture.  This architecture has two convolutional layers followed by two transposed convolutional layers.  The output of the final layer is a 2-D array, since we are trying to model one value for each pixel: the crop yield.

### Define the network

In [None]:
class CNN(nn.Module):
    def __init__(self, num_params=2, print_feature_dimension=False):
        """
        inputs
        -------
            num_params                  (int) : the number of parameters
                                                in the predictor
            print_feature_dimension    (bool) : whether or not to print
                                                out the dimension of the features
                                                extracted from the conv layers
        """
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(num_params, 16, 3)
        self.conv2 = nn.Conv2d(16, 64, 3)
        self.deconv1 = nn.ConvTranspose2d(64, 16, 3)
        self.deconv2 = nn.ConvTranspose2d(16, 1, 3)
        self.print_layer = Print()
        self.print_feature_dimension = print_feature_dimension

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.deconv1(x))
        x = F.relu(self.deconv2(x))
        if self.print_feature_dimension:
          x = self.print_layer(x)
        return x

class Print(nn.Module):
    """
    This class prints out the size of the features
    """
    def forward(self, x):
        print(x.size())
        return x

Next, let's define a method that trains our neural network.

In [None]:
#import numpy.ma as ma

def train_network(net, criterion, optimizer, trainloader, testloader,
                  experiment_name, num_epochs=40):
  """
  inputs
  ------

      net               (nn.Module)   : the neural network architecture
      criterion         (nn)          : the loss function (i.e. root mean squared error)
      optimizer         (torch.optim) : the optimizer to use update the neural network
                                        architecture to minimize the loss function
      trainloader       (torch.utils.data.DataLoader): dataloader that loads the
                                        predictors and predictands
                                        for the train dataset
      testloader        (torch.utils.data. DataLoader): dataloader that loads the
                                        predictors and predictands
                                        for the test dataset
  outputs
  -------
      predictions (np.array), and saves the trained neural network as a .pt file
  """
  device = "cuda:0" if torch.cuda.is_available() else "cpu"
  net = net.to(device)
  best_loss = np.infty
  train_losses, test_losses = [], []

  for epoch in range(num_epochs):
    for mode, data_loader in [('train', trainloader), ('test', testloader)]:
      # Set the model to train mode to allow its weights to be updated
      # while training
      if mode == 'train':
        net.train()

      # Set the model to eval model to prevent its weights from being updated
      # while testing
      elif mode == 'test':
        net.eval()

      running_loss = 0.0
      for i, data in enumerate(data_loader):
          # get a mini-batch of predictors and predictands
          batch_predictors, batch_predictands = data
          training_mask = batch_predictands > 0
          batch_predictands_masked = batch_predictands * training_mask
          batch_predictors_masked = batch_predictors * training_mask

          batch_predictands = batch_predictands_masked.to(device)
          batch_predictors = batch_predictors_masked.to(device)

          # zero the parameter gradients
          optimizer.zero_grad()

          # calculate the predictions of the current neural network
          predictions = net(batch_predictors).squeeze()

          # quantify the quality of the predictions using a
          # loss function (aka criterion) that is differentiable
          loss = criterion(predictions, batch_predictands)

          if mode == 'train':
            # the 'backward pass: calculates the gradients of each weight
            # of the neural network with respect to the loss
            loss.backward()

            # the optimizer updates the weights of the neural network
            # based on the gradients calculated above and the choice
            # of optimization algorithm
            optimizer.step()

          # Save the model weights that have the best performance!

          running_loss += loss.item()

      if running_loss < best_loss and mode == 'test':
          best_loss = running_loss
          torch.save(net, '{}.pt'.format(experiment_name))
      print('{} Set: Epoch {:02d}. loss: {:3f}'.format(mode, epoch+1, \
                                            running_loss/len(data_loader)))
      if mode == 'train':
          train_losses.append(running_loss/len(data_loader))
      else:
          test_losses.append(running_loss/len(data_loader))

  return "{}.pt".format(experiment_name), train_losses, test_losses

  net = torch.load('{}.pt'.format(experiment_name))
  net.eval()
  net.to(device)

  # the remainder of this function calculates the predictions of the best
  # saved model
  #predictions = np.asarray([])
  predictions = []
  for i, data in enumerate(testloader):
    batch_predictors, batch_predictands = data
    batch_predictands = batch_predictands.to(device)
    batch_predictors = batch_predictors.to(device)

    batch_predictions = net(batch_predictors).squeeze()
    # Edge case: if there is 1 item in the batch, batch_predictions becomes a float
    # not a Tensor. the if statement below converts it to a Tensor
    # so that it is compatible with np.concatenate
    if len(batch_predictions.size()) == 0:
      batch_predictions = torch.Tensor([batch_predictions])
    predictions.append(batch_predictions.detach().cpu().numpy())
  return predictions, train_losses, test_losses


A method to use the trained network for predictions (inference).

In [None]:
def predict(model_name, testloader):
  """
  inputs
  ------

      model_name        (string)      : filename of saved model
      testloader        (torch.utils.data. DataLoader): dataloader that loads the
                                        predictors for the test dataset
  outputs
  -------
      predictions (np.array)
  """

  device = "cuda:0" if torch.cuda.is_available() else "cpu"
  net = torch.load(model_name)
  net.eval()
  net.to(device)

  # the remainder of this function calculates the predictions of the best
  # saved model
  #predictions = np.asarray([])
  predictions = None
  for i, data in enumerate(testloader):
    batch_predictors, batch_predictands = data
    batch_predictands = batch_predictands.to(device)
    batch_predictors = batch_predictors.to(device)

    batch_predictions = net(batch_predictors).squeeze()
    # Edge case: if there is 1 item in the batch, batch_predictions becomes a float
    # not a Tensor. the if statement below converts it to a Tensor
    # so that it is compatible with np.concatenate
    if len(batch_predictions.size()) == 0:
      batch_predictions = torch.Tensor([batch_predictions])
    if predictions is None:
     predictions = batch_predictions.detach().cpu().numpy()
    else:
      predictions = np.concatenate([predictions, batch_predictions.detach().cpu().numpy()])
  return predictions


### Actual training

Prepare data for the network and train it.

The earliest possible start date is 2000-01-01, the latest possible end date is 2015-12-31.

In [None]:
%%time

# Assemble numpy arrays corresponding to predictors and predictands
train_start_date = '2000-01-01'
train_end_date = '2011-12-31'

test_start_date = '2012-01-01'
test_end_date = '2015-12-31'

train_predictors, train_predictands = assemble_predictors_predictands(ds_t2m_anom_year_interp, ds_tp_anom_year_interp, ds_cropyield,
                                                                      train_start_date, train_end_date)

print("train_predictors: %d" % train_predictors.shape[0])
print("train_predictands: %d" % train_predictands.shape[0])

test_predictors, test_predictands = assemble_predictors_predictands(ds_t2m_anom_year_interp, ds_tp_anom_year_interp, ds_cropyield,
                                                                    test_start_date, test_end_date)

print("test_predictors: %d" % test_predictors.shape[0])
print("test_predictands: %d" % test_predictands.shape[0])

# Convert the numpy ararys into ERA5Dataset, which is a subset of the
# torch.utils.data.Dataset class.  This class is compatible with
# the torch dataloader, which allows for data loading for a CNN
train_dataset = ERA5Dataset(train_predictors, train_predictands)
test_dataset = ERA5Dataset(test_predictors, test_predictands)

# Create a torch.utils.data.DataLoader from the ERA5Dataset() created earlier!
# the similarity between the name DataLoader and Dataset in the pytorch API is unfortunate...
trainloader = DataLoader(train_dataset, batch_size=2)
testloader = DataLoader(test_dataset, batch_size=2)
net = CNN(num_params=2, print_feature_dimension=False)
optimizer = optim.Adam(net.parameters(), lr=0.0001)

# train the model and make predictions for the test time period
experiment_name = "simpleCNN_{}_{}".format(train_start_date, train_end_date)
model_name, train_losses, test_losses = train_network(net, nn.MSELoss(),
                  optimizer, trainloader, testloader, experiment_name, num_epochs=100)

## Results of NN training

Plot train and test losses:

In [None]:
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Performance of {} Neural Network During Training'.format(experiment_name))
plt.legend(loc='best')
plt.show()

If test loss does not look satisfactory, try reducing the number of parameters of the network. You could define your own network architecture, which uses a different number of parameters.

Alternatively, try increasing the number of training samples by using a longer time period or a larger area.

Create predictions

In [None]:
predictions = predict(model_name, testloader)

Correlation and root mean squared error:

In [None]:
test_predictands_1d = test_predictands.reshape(4 * 361 * 421)
predictions_1d = predictions.reshape(4 * 361 * 421)
corr, _ = pearsonr(test_predictands_1d, predictions_1d)
#corr, _ = pearsonr(test_predictands, predictions)

rmse = mean_squared_error(test_predictands_1d, predictions_1d) ** 0.5
print(corr)
print("RMSE: {:.2f}".format(rmse))


Show observations and predictions:

In [None]:
fig = plt.figure(figsize=(12, 12))
# observations
ax1 = fig.add_subplot(1, 2, 1)
plot1 = ax1.imshow(test_predictands[0, :, :])
ax1.title.set_text("observed")
ax1_axes = plot1.axes
ax1_axes.invert_yaxis()
# predictions
ax2 = fig.add_subplot(1, 2, 2)
plot2 = ax2.imshow(predictions[0, :, :])
ax2.title.set_text("predicted")
ax2_axes = plot2.axes
ax2_axes.invert_yaxis()

plt.show()

# **Improvements and limitations**

The most important limitation here is the small amount of training data available from Eurostat. Unfortunately, this can not be changed.

Possible improvements could be
* tiling of the training data to e.g. 128x128 or 256x256 pixels, but this would make the code in this notebook more complex than it alread is
* masking out oceans and inland waterbodies in the predictions because crops are not planted in these areas and the results are thus confusing / unrealistic
