# Tutorial demonstrating verification of v1 SOI index against jra55

#### Import pyLatte package

In [2]:
from pylatte import utils
from pylatte import skill
from pylatte import indices

#### Currently, the following packages are required to load the data - this process will be replaced by the CAFE cookbook

In [3]:
import numpy as np
import pandas as pd
import xarray as xr
import glob

#### Import some plotting packages and widgets

In [4]:
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable

import warnings    
warnings.filterwarnings("ignore")

# Jupyter specific -----
from ipywidgets import FloatProgress
%matplotlib inline

# A note about the methodology of pyLatte
The pyLatte package is constructed around the xarray Python package. This is particularly useful for verification computations which require large numbers of samples (different model runs) to converge. 

The approach here is to generate very large xarray objects that reference all data required for the verification, but do not store the data in memory. Operations are performed on these xarray objects out-of-memory. When it is necessary to perform a compute (e.g. to produce a plot), this is distributed over multiple processors using the dask Python package.

# Initialise dask (currently not working on vm31)

In [10]:
# import dask
# import distributed
# client = distributed.Client(local_dir='/tmp/squ027-dask-worker-space', n_workers=4, ip='*')
# client

0,1
Client  Scheduler: tcp://152.83.3.52:59251  Dashboard: http://152.83.3.52:8787/status,Cluster  Workers: 4  Cores: 32  Memory: 271.04 GB


# Construct xarray objects for forecasts and observations
(The CAFE cookbook will replace these code blocks)

In [11]:
# Location of forecast data -----
fcst_folder = '/OSM/CBR/OA_DCFP/data/model_output/CAFE/forecasts/v1/'
fcst_filename = 'atmos_daily*'
fcst_variable = 'slp'

# Location of observation data -----
obsv_folder = '/OSM/CBR/OA_DCFP/data/observations/jra55/isobaric/002_prmsl/cat/'
obsv_filename = 'jra.55.prmsl.000.1958010100_2016123118.nc'
obsv_variable = 'PRMSL_GDS0_MSL'

In [12]:
# Initial dates to include (takes approximately 1 min 30 sec per date) -----
init_dates = pd.date_range('2003-1','2003-3' , freq='1MS')

# Ensembles to include -----
ensembles = range(1,12)

# Forecast length -----
FCST_LENGTH = 2 # years

In [13]:
# Resampling details -----
resample_freq = 'MS'

### Construct forecasts xarray object
Note, dask has a known bug that manifests when trying to concatentate data containing timedelta64 arrays (see https://github.com/pydata/xarray/issues/1952 for further details). For example, try to concatenate the following two Datasets:

`In : path = '/OSM/CBR/OA_DCFP/data/model_output/CAFE/forecasts/v1/yr2002/mn7/'`

`In : ens5 = xr.open_mfdataset(path + 'OUTPUT.5/atmos_daily*.nc', autoclose=True)`

`In : ens6 = xr.open_mfdataset(path + 'OUTPUT.6/atmos_daily*.nc', autoclose=True)`

`In : xr.concat([ens5, ens6],'ensemble')`

`Out : TypeError: invalid type promotion`

The error here is actually caused by the variables `average_DT` and `time_bounds`, which are timedelta64 arrays. However, I still do not fully unstand the bug: concatenation of `ens4` and `ens5`, for example, works fine, even though `ens4` also contains the timedelta64 variables `average_DT` and `time_bounds`. Regardless, because of this bug, it is not possible currently to create an xarray Dataset object containing all model variables. Instead, only the variable of interest (i.e. `fcst_variable` and `obsv_variable`) are retained in the concatenated xarray object.

In [17]:
# Instantiate progress bar -----
f = FloatProgress(min=0, max=len(init_dates)*len(ensembles), description='Loading...') 
display(f)

# Loop over initial dates -----
fcst_list = []
for init_date in init_dates:
    year = init_date.year
    month = init_date.month
    
    # Loop over ensembles -----
    ens_list = []
    for ensemble in ensembles:
        # Signal to increment the progress bar -----
        f.value += 1 
        
        
        path = fcst_folder + '/yr' + str(year) + '/mn' + str(month) + \
               '/OUTPUT.' + str(ensemble) + '/' + fcst_filename + '.nc'
            
        # xr.open_mfdataset() is slow - manually concatenate in time -----
        files = glob.glob(path)
        datasets = []
        for file in files:
            dataset = xr.open_dataset(file, autoclose=True)[fcst_variable]
            datasets.append(dataset)
        dataset = xr.concat(datasets, dim='time', coords='all').sortby('time')
        
        # Stack ensembles into a list -----
        ens_list.append(dataset.resample(time=resample_freq) \
                               .mean(dim='time'))
        
    # Concatenate ensembles -----
    ens_object = xr.concat(ens_list, dim='ensemble')
    ens_object['ensemble'] = ensembles
    
    # Stack concatenated ensembles into a list for each initial date -----                       
    fcst_list.append(utils.datetime_to_leadtime(ens_object))

# Keep track of the lead time for each initialization -----
n_lead_time = [len(x.lead_time) for x in fcst_list]

# Concatenate initial dates -----
da_fcst = xr.concat(fcst_list, dim='init_date')

# Rechunk for chunksizes of at least 1,000,000 elements -----
da_fcst = utils.prune(da_fcst.chunk(chunks={'ensemble' : len(da_fcst.ensemble), 
                                            'lead_time' : len(da_fcst.lead_time)}).squeeze())

<xarray.DataArray 'slp' (time: 2192, lat: 90, lon: 144)>
array([[[ 1015.948669,  1015.555786, ...,  1016.794373,  1016.361267],
        [ 1014.656128,  1014.874756, ...,  1014.236694,  1014.445068],
        ..., 
        [ 1033.73938 ,  1033.666016, ...,  1033.835327,  1033.795044],
        [ 1033.043457,  1032.938232, ...,  1033.280518,  1033.15979 ]],

       [[ 1017.526428,  1017.133423, ...,  1018.372375,  1017.939087],
        [ 1017.456116,  1017.698364, ...,  1016.969604,  1017.21228 ],
        ..., 
        [ 1034.135132,  1034.05127 , ...,  1034.241333,  1034.19873 ],
        [ 1033.817749,  1033.712402, ...,  1034.054688,  1033.934082]],

       ..., 
       [[ 1017.564087,  1017.184692, ...,  1018.380859,  1017.962585],
        [ 1011.258362,  1011.498657, ...,  1010.799316,  1011.021118],
        ..., 
        [ 1006.279846,  1006.520752, ...,  1005.725708,  1006.014954],
        [ 1006.879211,  1006.787903, ...,  1007.084778,  1006.980103]],

       [[ 1012.47876 ,  1012.0

<xarray.DataArray 'slp' (time: 2192, lat: 90, lon: 144)>
array([[[ 1015.948242,  1015.555359, ...,  1016.793945,  1016.36084 ],
        [ 1014.656006,  1014.874695, ...,  1014.236572,  1014.445007],
        ..., 
        [ 1033.739258,  1033.665894, ...,  1033.835205,  1033.794922],
        [ 1033.043457,  1032.93811 , ...,  1033.280518,  1033.15979 ]],

       [[ 1017.547363,  1017.154358, ...,  1018.393311,  1017.960083],
        [ 1017.479065,  1017.722961, ...,  1016.989075,  1017.233704],
        ..., 
        [ 1034.130493,  1034.046509, ...,  1034.237183,  1034.194336],
        [ 1033.814819,  1033.709473, ...,  1034.051758,  1033.931152]],

       ..., 
       [[ 1028.437012,  1028.052368, ...,  1029.264771,  1028.84082 ],
        [ 1025.712891,  1025.879761, ...,  1025.336548,  1025.53064 ],
        ..., 
        [ 1015.606689,  1015.09375 , ...,  1016.401855,  1016.04895 ],
        [ 1011.928589,  1011.837036, ...,  1012.134521,  1012.029663]],

       [[ 1023.754578,  1023.3

<xarray.DataArray 'slp' (time: 2192, lat: 90, lon: 144)>
array([[[ 1015.948425,  1015.555542, ...,  1016.794128,  1016.361023],
        [ 1014.655579,  1014.874268, ...,  1014.236206,  1014.44458 ],
        ..., 
        [ 1033.73938 ,  1033.666016, ...,  1033.835449,  1033.795166],
        [ 1033.043457,  1032.93811 , ...,  1033.280396,  1033.15979 ]],

       [[ 1017.516907,  1017.123962, ...,  1018.362854,  1017.929626],
        [ 1017.479797,  1017.724243, ...,  1016.988342,  1017.233704],
        ..., 
        [ 1034.134033,  1034.049927, ...,  1034.240479,  1034.197754],
        [ 1033.816895,  1033.711548, ...,  1034.053833,  1033.933228]],

       ..., 
       [[  997.971008,   997.592346, ...,   998.786133,   998.368652],
        [  997.162292,   997.328369, ...,   996.822021,   996.990356],
        ..., 
        [ 1018.607971,  1018.69397 , ...,  1018.43396 ,  1018.520752],
        [ 1017.067932,  1016.977478, ...,  1017.271484,  1017.167847]],

       [[ 1003.228699,  1002.8



KeyboardInterrupt: 

#### Truncate the forecast lead times at 2 years
The January and July forecasts are run for 5 years rather than 2 years. The xarray concatenation above can deal with this, but fills the shorter forecasts with nans for lead times longer than 2 years. Let's get rid of some of these nans by truncating the forecasts at the lead time corresponding to the longest 2 year forecast.

In [None]:
max_increments = FCST_LENGTH * 12
n_trunc = max([i for i in n_lead_time if i <= max_increments])
da_fcst = da_fcst.isel(lead_time=range(n_trunc))

### Construct observations xarray object

In [None]:
# Instantiate progress bar -----
f = FloatProgress(min=0, max=1, description='Loading...') 
display(f)

# JRA temperature fields are only save in a time-concatenated form -----
path = obsv_folder + obsv_filename
dataset = xr.open_mfdataset(path, autoclose=True)[obsv_variable]
da_obsv = dataset.rename(fcst_variable) \
                 .rename({'initial_time0_hours' : 'time', 'g0_lon_2' : 'lon', 'g0_lat_1' : 'lat'}) \
                 .resample(time=resample_freq) \
                 .mean(dim='time')

# Stack by initial date to match forecast structure -----
da_obsv = utils.stack_by_init_date(da_obsv,da_fcst.init_date.values,n_trunc)
f.value += 1

# Average over forecast dimension if it is exists -----
if 'forecast_time1' in da_obsv.coords:
    da_obsv = da_obsv.mean(dim='forecast_time1')

# Rechunk for chunksizes of at least 1,000,000 elements -----
da_obsv = utils.prune(da_obsv.chunk(chunks={'init_date' : len(da_obsv.init_date)}).squeeze())

# Compute the SOI index

##### Extract forecast and observation
Note, we `compute()` the xarray objects here to save time later on. Once dask is working, it will probably be most sensible to leave the objects uncomputed.

In [None]:
with utils.timer():
    da_fcst = da_fcst.compute() * 100
    da_obsv = da_obsv.compute()

##### Load climatology data
Various climatologies are/will be accessable using `utils.load_mean_climatology()`. Here we use a climatology computed over the full 55 year jra reanalysis

In [None]:
jra_clim = utils.load_mean_climatology('jra_1958-2016', 'slp', freq='MS')

da_jra_clim = jra_clim.compute()

##### Compute anomaly data
Recall that the forecast and observation data are saved as functions of lead time and initial date. The function `utils.anomalize()` computes anomalies given data and a climatology which each have a datetime dimension `time`. Thus it is necessary to first convert from the lead time/initial date format to a datetime format, then compute the anomaly, the convert back to the lead time/initial date format.  

In [None]:
anomalize = lambda data, clim: utils.datetime_to_leadtime(
                                   utils.anomalize(
                                       utils.leadtime_to_datetime(data),clim))

In [None]:
da_fcst_anom = da_fcst #da_fcst.groupby('init_date').apply(anomalize, clim=da_jra_clim)

da_obsv_anom = da_obsv.groupby('init_date').apply(anomalize, clim=da_jra_clim)

In [None]:
compute_soi(da_obsv_anom, std_dim='lead_time')

In [None]:
da_fcst_Tahiti = utils.get_nearest_point(ds_fcst['slp'], lat_Tahiti, lon_Tahiti) * 100
da_fcst_Tahiti_anom = da_fcst_Tahiti.groupby('init_date').apply(anomalize, clim=da_clim_Tahiti)
da_fcst_Tahiti_std = da_fcst_Tahiti_anom.std(dim='lead_time')
da_fcst_Tahiti_stdzd = da_fcst_Tahiti_anom / da_fcst_Tahiti_std

da_fcst_Darwin = utils.get_nearest_point(ds_fcst['slp'], lat_Darwin, lon_Darwin) * 100
da_fcst_Darwin_anom = da_fcst_Darwin.groupby('init_date').apply(anomalize, clim=da_clim_Darwin)
da_fcst_Darwin_std = da_fcst_Darwin_anom.std(dim='lead_time')
da_fcst_Darwin_stdzd = da_fcst_Darwin_anom / da_fcst_Darwin_std

MSD_fcst = (da_fcst_Tahiti_stdzd - da_fcst_Darwin_stdzd).std(dim='lead_time')

da_SOI_fcst = ((da_fcst_Tahiti_stdzd - da_fcst_Darwin_stdzd) / MSD_fcst).compute()

In [None]:
da_obsv_Tahiti = utils.get_nearest_point(ds_obsv['slp'], lat_Tahiti, lon_Tahiti)
da_obsv_Tahiti_anom = da_obsv_Tahiti.groupby('init_date').apply(anomalize, clim=da_clim_Tahiti)
da_obsv_Tahiti_std = da_obsv_Tahiti_anom.std(dim='lead_time')
da_obsv_Tahiti_stdzd = da_obsv_Tahiti_anom / da_obsv_Tahiti_std

da_obsv_Darwin = utils.get_nearest_point(ds_obsv['slp'], lat_Darwin, lon_Darwin)
da_obsv_Darwin_anom = da_obsv_Darwin.groupby('init_date').apply(anomalize, clim=da_clim_Darwin)
da_obsv_Darwin_std = da_obsv_Darwin_anom.std(dim='lead_time')
da_obsv_Darwin_stdzd = da_obsv_Darwin_anom / da_obsv_Darwin_std

MSD_obsv = (da_obsv_Tahiti_stdzd - da_obsv_Darwin_stdzd).std(dim='lead_time')

da_SOI_obsv = ((da_obsv_Tahiti_stdzd - da_obsv_Darwin_stdzd) / MSD_obsv).compute()

##### Compute persistence data
This requires repeating the data at the first lead time over all lead times. `utils.repeat_data()` allows us to do this

In [None]:
da_SOI_pers = utils.repeat_data(da_SOI_obsv,'lead_time')

### Plot one initialization date

In [None]:
fig1 = plt.figure(figsize=(10,5))

ax = fig1.add_axes([0.1, 0.1, 0.8, 0.8])
ax.grid()
ax.plot(da_SOI_fcst['lead_time'],da_SOI_fcst.isel(init_date=[0]).squeeze())
ax.plot(da_SOI_obsv['lead_time'],da_SOI_obsv.isel(init_date=[0]).squeeze(),'k-',linewidth=2)
ax.set_xlabel('lead time')
ax.set_ylabel('SOI [Pa]');

# Compute some skill metrics

## (Continuous) ranked probability score

In [None]:
# Specify bins for computation of cdf -----
bins = np.linspace(0,5,50)

# Compute ranked probability score -----
rps = skill.compute_rps(da_SOI_fcst, da_SOI_obsv, bins=bins, indep_dims=None, ensemble_dim='ensemble') \
           .groupby('init_date.month').mean(dim='init_date')

In [None]:
with utils.timer():
    fig1 = plt.figure(figsize=(8,4))

    ax = fig1.add_axes([0.1, 0.1, 0.8, 0.8])
    ax.grid()
    ax.imshow(rps.transpose(), extent=[1,24,1,2])
    ax.set_xlabel('Lead time [months]')
    ax.set_ylabel('Initialized month');

## Root mean squared error

In [None]:
with utils.timer():
    rms_error = skill.compute_rms_error(da_SOI_fcst, da_SOI_obsv, 
                                        indep_dims=None, ensemble_dim='ensemble') \
                     .groupby('init_date.month').mean(dim='init_date')

In [None]:
with utils.timer():
    fig1 = plt.figure(figsize=(8,4))

    ax = fig1.add_axes([0.1, 0.1, 0.8, 0.8])
    ax.grid()
    ax.imshow(rms_error, extent=[1,24,1,2])
    ax.set_xlabel('Lead time [months]')
    ax.set_ylabel('Initialized month');

In [None]:
with utils.timer():
    rms_error = skill.compute_rms_error(da_SOI_pers, da_SOI_obsv, 
                                        indep_dims=None, ensemble_dim=None) \
                     .groupby('init_date.month').mean(dim='init_date')

In [None]:
with utils.timer():
    fig1 = plt.figure(figsize=(8,4))

    ax = fig1.add_axes([0.1, 0.1, 0.8, 0.8])
    ax.grid()
    ax.imshow(rms_error, extent=[1,24,1,2])
    ax.set_xlabel('Lead time [months]')
    ax.set_ylabel('Initialized month');

# Close dask client

In [18]:
# with utils.timer():
#     client.close()

Future exception was never retrieved
future: <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed',)>
Traceback (most recent call last):
  File "/OSM/CBR/OA_DCFP/apps/squ027/anaconda3/envs/dts3_env/lib/python3.6/site-packages/distributed/comm/tcp.py", line 179, in read
    n_frames = yield stream.read_bytes(8)
  File "/OSM/CBR/OA_DCFP/apps/squ027/anaconda3/envs/dts3_env/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/OSM/CBR/OA_DCFP/apps/squ027/anaconda3/envs/dts3_env/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/OSM/CBR/OA_DCFP/apps/squ027/anaconda3/envs/dts3_env/lib/python3.6/site-packages/distributed/comm/tcp.py", line 200, in read
    convert_stream_closed_error(self, e)
  File "/OSM/CB

   Elapsed: 0.15787982940673828 sec
