# Data processing

Download, save, visualize and load dataset.



In [1]:
import xarray as xr
import matplotlib.pyplot as plt
import numpy as np
import time
from pathlib import Path
import os

In [2]:
DATA_DIR = "../../data/"
resolution = "5.625deg"

## 1. geopotential_500, temperature_850

### The data

- Test: years 2017 and 2018
- Validation: year 2016
- Train: year 2015

In [3]:
from torch.utils.data import Dataset
from torchvision import transforms

In [4]:
def preprocess_data(DATA_DIR, train_years, val_years, test_years):
    
    time_slices = {'train': train_years, 'val': val_years, 'test': test_years}
    
    
    zpath = DATA_DIR + '5.625deg/geopotential_500/'
    tpath = DATA_DIR + '5.625deg/temperature_850/'
    
    z = xr.open_mfdataset(zpath+'/*.nc', combine='by_coords')['z'].assign_coords(level=1)
    t = xr.open_mfdataset(tpath+'/*.nc', combine='by_coords')['t'].assign_coords(level=1)

    ratio = len(z.coords['lon'])/len(z.coords['lat'])

    data = xr.concat([z, t], 'level').stack(v=('lat', 'lon')).transpose('time', 'v', 'level').drop('level')
    
    data_paths = []
    for set_name in ['train', 'val', 'test']:
    
        # Create directory
        out_path = DATA_DIR + set_name + "/"
        Path(out_path).mkdir(parents=True, exist_ok=True)
        data_paths.append(out_path)
        
        # Select relevant years
        dataset = data.sel(time=time_slices[set_name])

        # Compute mean and std
        mean = data.mean(('time', 'v')).compute()
        std = data.std('time').mean(('v')).compute()
        np.save(out_path + 'mean.npy', mean.values)
        np.save(out_path + 'std.npy', std.values)
    
        # Save individual arrays
        for i, array in enumerate(dataset):
            np.save(out_path + str(i) + '.npy', array.values)
    
    return tuple(data_paths)
    

In [5]:
class WeatherBenchDataset(Dataset):
    
    def __init__(self, data_path, delta_t, mean=None, std=None):
        
        self.delta_t = delta_t
        
        self.mean = np.load(data_path + 'mean.npy') if mean is None else mean
        self.std = np.load(data_path + 'std.npy') if std is None else std
        
        '''self.transform = transforms.Compose([torch.Tensor(), 
                                              transforms.Normalize(mean=self.mean, std=self.std)])'''
        
        total_samples = len(os.listdir(data_path)) - 2
        self.datafiles = [data_path+str(id)+'.npy' for id in list(range(total_samples))]
        
        self.n_samples = len(self.datafiles) - self.delta_t
    
    def __len__(self):
        return self.n_samples
    
    def __getitem__(self, idx):
        """ Returns sample and label corresponding to an index as torch.Tensor objects
            The return tensor shapes are (for the sample and the label): [n_vertex, n_features]
        """
        
        '''X = self.transform(np.load(self.datafiles[idx]))
        y = self.transform(np.load(self.datafiles[idx+delta_t]))'''
        
        X = torch.Tensor((np.load(self.datafiles[idx])-self.mean)/self.std)
        y = torch.Tensor((np.load(self.datafiles[idx+delta_t])-self.mean)/self.std)
        
        return X, y

### The graphs

Bandwidth = [dim1/2, dim2/2]  
sampling = 'SOFT'

In [6]:
from deepsphere.utils.laplacian_funcs import prepare_laplacian
from deepsphere.utils.samplings import equiangular_dimension_unpack
from pygsp.graphs.sphereequiangular import SphereEquiangular

In [7]:
def compute_laplacian(nodes, ratio, laplacian_type):
    dim1, dim2 = equiangular_dimension_unpack(nodes, ratio)
    
    bw = [int(dim1/2), int(dim2/2)]

    G = SphereEquiangular(bandwidth=bw, sampling="SOFT")
    G.compute_laplacian(laplacian_type)
    laplacian = prepare_laplacian(G.L)
    
    return laplacian

In [8]:
def get_equiangular_laplacians2(nodes, ratio, depth, laplacian_type, pool_size):
    """Get the equiangular laplacian list for a certain depth.
    Args:
        nodes (tuple): input signal size (lat x lon)
        depth (int): the depth of the UNet.
        laplacian_type ["combinatorial", "normalized"]: the type of the laplacian.
        pool_size: size of the pooling kernel
    Returns:
        laps (list): increasing list of laplacians
    """
    laps = []
    dim1, dim2 = equiangular_dimension_unpack(nodes, ratio)
    
    for i in range(depth):
        # Adjust dimensions with depth!
        bw1 = int(dim1/(2*pool_size**i))
        bw2 =int(dim2/(2*pool_size**i))
        bw = [bw1, bw2]
        
        G = SphereEquiangular(bandwidth=bw, sampling="SOFT")
        G.compute_laplacian(laplacian_type)
        laplacian = prepare_laplacian(G.L)
        laps.append(laplacian)
    return laps

### The models

https://github.com/ArcaniteSolutions/deepsphere

In [9]:
from torch import nn

from deepsphere.models.spherical_unet.encoder import SphericalChebBN2
from deepsphere.models.spherical_unet.utils import SphericalChebBNPool
from deepsphere.models.spherical_unet.decoder import SphericalChebBNPoolConcat, SphericalChebBNPoolCheb


from deepsphere.layers.chebyshev import SphericalChebConv
from deepsphere.utils.laplacian_funcs import get_equiangular_laplacians
from deepsphere.layers.samplings.equiangular_pool_unpool import Equiangular, reformat
from deepsphere.utils.samplings import equiangular_calculator
import torch.nn.functional as F

In [10]:
class EquiangularMaxPool2(nn.MaxPool1d):
    """EquiAngular Maxpooling module using MaxPool 1d from torch
    """

    def __init__(self, ratio, kernel_size, return_indices=True):
        """Initialization
        Args:
            ratio (float): ratio between latitude and longitude dimensions of the data
        """
        self.ratio = ratio
        super().__init__(kernel_size=kernel_size, return_indices=return_indices)

    def forward(self, x):
        """calls Maxpool1d and if desired, keeps indices of the pixels pooled to unpool them
        Args:
            input (:obj:`torch.tensor`): batch x pixels x features
        Returns:
            tuple(:obj:`torch.tensor`, list(int)): batch x pooled pixels x features and the indices of the pixels pooled
        """
        x, _ = equiangular_calculator(x, self.ratio)
        x = x.permute(0, 3, 1, 2)

        if self.return_indices:
            x, indices = F.max_pool2d(x, self.kernel_size, return_indices=self.return_indices)
        else:
            x = F.max_pool2d(x, self.kernel_size)
        x = reformat(x)

        if self.return_indices:
            output = x, indices
        else:
            output = x

        return output


class EquiangularMaxUnpool2(nn.MaxUnpool1d):
    """Equiangular Maxunpooling using the MaxUnpool1d of pytorch
    """

    def __init__(self, ratio, kernel_size):
        """Initialization
        Args:
            ratio (float): ratio between latitude and longitude dimensions of the data
        """
        self.ratio = ratio
        
        super().__init__(kernel_size=(kernel_size, kernel_size))

    def forward(self, x, indices):
        """calls MaxUnpool1d using the indices returned previously by EquiAngMaxPool
        Args:
            x (:obj:`torch.tensor`): batch x pixels x features
            indices (int): indices of pixels equiangular maxpooled previously
        Returns:
            :obj:`torch.tensor`: batch x unpooled pixels x features
        """
        x, _ = equiangular_calculator(x, self.ratio)
        x = x.permute(0, 3, 1, 2)
        x = F.max_unpool2d(x, indices, self.kernel_size)
        x = reformat(x)
        return x

In [11]:
class SphericalConvNet(nn.Module):
    """Spherical GCNN Autoencoder.
    """

    def __init__(self, nodes, ratio, depth, channels_in, channels_out, laplacian_type, kernel_size):
        """Initialization.
        Args:
            N (int): Number of pixels in the input image
            depth (int): The depth of the UNet, which is bounded by the N and the type of pooling
            kernel_size (int): chebychev polynomial degree
            ratio (float): Parameter for equiangular sampling -> width/height
        """
        super().__init__()
        
        self.kernel_size = kernel_size
        self.laplacian = compute_laplacian(nodes, ratio, laplacian_type)
        
        self.conv1 = SphericalChebConv(channels_in, 64, self.laplacian, self.kernel_size)
        self.conv2 = SphericalChebConv(64, 64, self.laplacian, self.kernel_size)
        self.conv3 = SphericalChebConv(64, 64, self.laplacian, self.kernel_size)
        self.conv4 = SphericalChebConv(64, 64, self.laplacian, self.kernel_size)
        self.conv5 = SphericalChebConv(64, channels_out, self.laplacian, self.kernel_size)

    def forward(self, x):
        """Forward Pass.
        Args:
            x (:obj:`torch.Tensor`): input to be forwarded.
        Returns:
            :obj:`torch.Tensor`: output
        """
        
        x = F.elu(self.conv1(x))
        x = F.elu(self.conv2(x))
        x = F.elu(self.conv3(x))
        x = F.elu(self.conv4(x))
        x = self.conv5(x)
        
        return x

In [12]:
class Encoder2(nn.Module):
    """
    SphericalChebConv has "same" padding
    
    """
    def __init__(self, channels_in, kernel_size, ratio, laplacians, pool_size, pooling="max"):
    
        super().__init__()

        self.kernel_size = kernel_size

        self.conv1 = SphericalChebConv(channels_in, 64, laplacians[0], self.kernel_size)
        self.conv2 = SphericalChebConv(64, 128, laplacians[1], self.kernel_size)
        self.conv3 = SphericalChebConv(128, 128, laplacians[2], self.kernel_size)
        self.conv4 = SphericalChebConv(128, 128, laplacians[3], self.kernel_size)

        self.pool = EquiangularMaxPool2(ratio, pool_size, return_indices=True)
        

    def forward(self, x):
        """Forward Pass.
        Args:
            x (:obj:`torch.Tensor`): input [batch x vertices x channels/features]
        Returns:
            x_enc* :obj: `torch.Tensor`: output [batch x vertices x channels/features]
        """
        
        x_enc1 = F.relu(self.conv1(x))
        
        x_enc2, idx2 = self.pool(x_enc1)
        x_enc2 = F.relu(self.conv2(x_enc2))
        
        x_enc3, idx3 = self.pool(x_enc2)
        x_enc3 = F.relu(self.conv3(x_enc3))
        
        x_enc4, idx4 = self.pool(x_enc3)
        x_enc4 = self.conv4(x_enc4)

        return x_enc2, idx2, x_enc3, idx3, x_enc4, idx4

class Decoder2(nn.Module):
    """The decoder of the Spherical UNet.
    """
    
    def __init__(self, channels_out, kernel_size, ratio, laplacians, pool_size, pooling="max"):
        """Initialization.
        Args:
            unpooling (:obj:`torch.nn.Module`): The unpooling object.
            laps (list): List of laplacians.
        """
        super().__init__()
        self.kernel_size = kernel_size
        
        self.unpool = EquiangularMaxUnpool2(ratio, pool_size)

        self.deconv3 = SphericalChebConv(128, 128, laplacians[2], self.kernel_size)
        self.deconv2 = SphericalChebConv(128, 64, laplacians[1], self.kernel_size)
        self.deconv1 = SphericalChebConv(64, channels_out, laplacians[0], self.kernel_size)
        
        self.conv3 = SphericalChebConv(128+128, 128, laplacians[2], self.kernel_size)
        self.conv2 = SphericalChebConv(128+64, 64, laplacians[1], self.kernel_size)

        # Switch from Logits to Probabilities if evaluating model
        self.softmax = nn.Softmax(dim=2)

    def forward(self, x_enc2, idx2, x_enc3, idx3, x_enc4, idx4):
        """Forward Pass.
        Args:
            x_enc* (:obj:`torch.Tensor`): input tensors.
        Returns:
            :obj:`torch.Tensor`: output after forward pass.
        """
        
        x = self.unpool(x_enc4, idx4)
        x = F.relu(self.deconv3(x))
        x = torch.cat((x, x_enc3), dim=2)
        x = F.relu(self.conv3(x))
        
        x = self.unpool(x, idx3)
        x = F.relu(self.deconv2(x))
        x = torch.cat((x, x_enc2), dim=2)
        x = F.relu(self.conv2(x))
        
        x = self.unpool(x, idx2)
        x = F.relu(self.deconv1(x))
        
        '''if not self.training:
            x = self.softmax(x)'''
        return x
    
class SphericalUNet2(nn.Module):
    """Spherical GCNN Autoencoder.
    """

    def __init__(self, nodes, ratio, depth, channels_in, channels_out, laplacian_type, kernel_size, pooling_size, 
                 pooling="max"):
        """Initialization.
        Args:
            N (int): Number of pixels in the input image
            depth (int): The depth of the UNet, which is bounded by the N and the type of pooling
            kernel_size (int): chebychev polynomial degree
            ratio (float): Parameter for equiangular sampling -> width/height
        """
        super().__init__()
        self.kernel_size = kernel_size
        
        self.pooling_class = Equiangular(mode=pooling)
        self.laplacians = get_equiangular_laplacians2(nodes, ratio, depth, laplacian_type, pooling_size)

        self.encoder = Encoder2(channels_in, self.kernel_size, ratio, self.laplacians, pooling_size, pooling)
        self.decoder = Decoder2(channels_out, self.kernel_size, ratio, self.laplacians, pooling_size, pooling)

    def forward(self, x):
        """Forward Pass.
        Args:
            x (:obj:`torch.Tensor`): input to be forwarded.
        Returns:
            :obj:`torch.Tensor`: output
        """
        x_encoder = self.encoder(x)
        output = self.decoder(*x_encoder)
        return output

### Train 

In [13]:
import torch

from torch.utils.data import DataLoader
from torch import nn, optim 

In [14]:
def train_model(model, lr, device, train_generator, val_generator, patience):
    
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    min_val_loss = float("inf")
    epoch_no_improve = 0
    
    
    for epoch in range(n_epochs):
        time1 = time.time()
        
        val_loss = 0
        train_loss = 0
        
        model.train()
        for batch_idx, (batch, labels) in enumerate(train_generator):
            # Transfer to GPU
            batch, labels = batch.to(device), labels.to(device)
            
            
            # Model
            output = model(batch)

            loss = criterion(output, labels)
            train_loss = train_loss + loss
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
        train_loss = train_loss / len(train_generator)
        
        model.eval()
        with torch.set_grad_enabled(False):
            for batch, labels in val_generator:
                # Transfer to GPU
                batch, labels = batch.to(device), labels.to(device)
                
                output = model(batch)

                val_loss = val_loss + criterion(output, labels).item()
                
        val_loss = val_loss/len(train_generator)
        
        time2 = time.time()
        # Print stuff
        print('Epoch: {e:3d}/{n_e:3d}  - loss: {l:.3f}  - val_loss: {v_l:.3f}  - time: {t:2f}'
              .format(e=epoch+1, n_e=n_epochs, l=train_loss, v_l=val_loss, t=time2-time1))
                
        # Check for early stopping
        if val_loss < min_val_loss:
            epoch_no_improve = 0
            min_val_loss = val_loss
        else:
            epoch_no_improve += 1

        if epoch_no_improve == patience:
            print('Epoch {e:3d}: early stopping'.format(e=epoch+1))
            torch.save(model.state_dict(), 'model.pt')
            break
            
    torch.save(model.state_dict(), 'model.pt')

In [15]:
def init_device(model, ids=None):
    """Initialize device based on cpu/gpu and number of gpu
    Args:
        device (str): cpu or gpu
        ids (list of int or str): list of gpus that should be used
        unet (torch.Module): the model to place on the device(s)
    Raises:
        Exception: There is an error in configuring the cpu or gpu
    Returns:
        torch.Module, torch.device: the model placed on device, the device
    """
    
    if torch.cuda.is_available():
        if ids is None:
            device = torch.device("cuda")
            model = model.to(device)
            model = nn.DataParallel(model)
        elif len(ids) == 1:
            device = torch.device("cuda:{}".format(ids[0]))
            model = model.to(device)
        else:
            device = torch.device("cuda:{}".format(ids[0]))
            model = model.to(device)
            model = nn.DataParallel(model, device_ids=[int(i) for i in ids])
        #cudnn.benchmark = True
    else:
        device = torch.device("cpu")
        model = model.to(device)

    return model, device

In [16]:
train_path = DATA_DIR + "train/"
val_path = DATA_DIR + "val/"
test_path = DATA_DIR + "test/"

In [17]:
delta_t = 3*24  # 3 days
ratio = 64/32   # lon/lat

batch_size = 32
learning_rate = 1e-4  # we doubled the learning rate as we doubled the batch size
n_epochs = 100

# Data
training_set = WeatherBenchDataset(train_path, delta_t)
validation_set = WeatherBenchDataset(val_path, delta_t, training_set.mean, training_set.std)
dataloader_train = DataLoader(training_set, batch_size=batch_size, shuffle=False, num_workers=8)
dataloader_validation = DataLoader(validation_set, batch_size=batch_size, shuffle=False, num_workers=8)

N = len(training_set[0][0])

In [18]:
# Model - CONVNET
convnet = SphericalConvNet(nodes=N, ratio=ratio, depth=4, channels_in=2, channels_out=2, laplacian_type="combinatorial", 
                           kernel_size=5)

convnet, device = init_device(model=convnet, ids=[3])

In [None]:
train_model(convnet, learning_rate, device, dataloader_train, dataloader_validation, patience=2)

Epoch:   1/100  - loss: 0.663  - val_loss: 0.743  - time: 34.713446
Epoch:   2/100  - loss: 0.659  - val_loss: 0.744  - time: 34.597876
Epoch:   3/100  - loss: 0.657  - val_loss: 0.741  - time: 34.538268
Epoch:   4/100  - loss: 0.655  - val_loss: 0.738  - time: 34.575342
Epoch:   5/100  - loss: 0.653  - val_loss: 0.734  - time: 34.546397
Epoch:   6/100  - loss: 0.651  - val_loss: 0.731  - time: 34.661316


### Test

In [None]:
'''trained_convnet = SphericalConvNet(nodes=N, ratio=ratio, depth=4, channels_in=2, channels_out=2, laplacian_type="combinatorial", kernel_size=5)
trained_convnet, device = init_device(model=trained_convnet, ids=[3])

trained_convnet.load_state_dict(torch.load('model.pt'))'''

In [None]:
def predict(model, dataset, mean, std):
    
    predictions = np.empty((len(dataset), *tuple(dataset[0][0].size())))
    labels = np.empty((len(dataset), *tuple(dataset[0][0].size())))
    
    loader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=8)
    
    model.eval()
    
    for i, (sample, label) in enumerate(loader):
        sample, label = sample.to(device), label.to(device)
        
        output = model(sample)
        
        predictions[i] = output.detach().cpu().clone().numpy()
        labels[i] = label.detach().cpu().clone().numpy()
    
    predictions = predictions * std + mean
    labels = labels * std + mean
    
    return predictions, labels  

def to_xarray(array, N, ratio):
    d1, d2 =  equiangular_dimension_unpack(N, ratio)
    bw = [int(d1/2), int(d2/2)]
    G = SphereEquiangular(bandwidth=bw, sampling="SOFT")
    latitudes = G.lat[:, 0] * (180/np.pi)
    longitudes = G.lon[0] * (180/np.pi)

    array = array.reshape((array.shape[0], d1, d2, array.shape[2]))
    
    coordinates = {'time': list(range(array.shape[0])), 'lat': latitudes, 'lon': longitudes}
    
    dataset = xr.Dataset(data_vars={'z':  (('time', 'lat', 'lon'), array[:, :, :, 0]), 
                                    't':  (('time', 'lat', 'lon'), array[:, :, :, 1])}, 
                         coords=coordinates)
    
    return dataset

def compute_weighted_rmse(da_fc, da_true, mean_dims=xr.ALL_DIMS):
    """
    Compute the RMSE with latitude weighting from two xr.DataArrays.
    Args:
        da_fc (xr.DataArray): Forecast. Time coordinate must be validation time.
        da_true (xr.DataArray): Truth.
    Returns:
        rmse: Latitude weighted root mean squared error
    """
    error = da_fc - da_true
    weights_lat = np.cos(np.deg2rad(error.lat))
    weights_lat /= weights_lat.mean()
    rmse = np.sqrt(((error)**2 * weights_lat).mean(mean_dims))
    if type(rmse) is xr.Dataset:
        rmse = rmse.rename({v: v + '_rmse' for v in rmse})
    else: # DataArray
        rmse.name = error.name + '_rmse' if not error.name is None else 'rmse'
    return rmse

WeatherBench CNN:  
**3 days** 

  Z500: > ~600  
  T850: < 3


In [168]:
testing_set = WeatherBenchDataset(test_path, delta_t)

predictions, labels = predict(convnet, testing_set, training_set.mean, training_set.std)
pred = to_xarray(predictions, N, ratio)
valid = to_xarray(labels, N, ratio)

print(compute_weighted_rmse(pred, valid).load())

<xarray.Dataset>
Dimensions:  ()
Data variables:
    z_rmse   float64 934.3
    t_rmse   float64 4.03


In [None]:
def create_predictions(model, dg):
    """Create predictions for non-iterative model"""
    preds = model.predict_generator(dg)
    # Unnormalize
    preds = preds * dg.std.values + dg.mean.values
    fcs = []
    lev_idx = 0
    for var, levels in dg.var_dict.items():
        if levels is None:
            fcs.append(xr.DataArray(
                preds[:, :, :, lev_idx],
                dims=['time', 'lat', 'lon'],
                coords={'time': dg.valid_time, 'lat': dg.ds.lat, 'lon': dg.ds.lon},
                name=var
            ))
            lev_idx += 1
        else:
            nlevs = len(levels)
            fcs.append(xr.DataArray(
                preds[:, :, :, lev_idx:lev_idx+nlevs],
                dims=['time', 'lat', 'lon', 'level'],
                coords={'time': dg.valid_time, 'lat': dg.ds.lat, 'lon': dg.ds.lon, 'level': levels},
                name=var
            ))
            lev_idx += nlevs
    return xr.merge(fcs)