In [1]:
'''

This code is part of the SIPN2 project focused on improving sub-seasonal to seasonal predictions of Arctic Sea Ice. 
If you use this code for a publication or presentation, please cite the reference in the README.md on the
main page (https://github.com/NicWayand/ESIO). 

Questions or comments should be addressed to nicway@uw.edu

Copyright (c) 2018 Nic Wayand

GNU General Public License v3.0


'''


'\n\nThis code is part of the SIPN2 project focused on improving sub-seasonal to seasonal predictions of Arctic Sea Ice. \nIf you use this code for a publication or presentation, please cite the reference in the README.md on the\nmain page (https://github.com/NicWayand/ESIO). \n\nQuestions or comments should be addressed to nicway@uw.edu\n\nCopyright (c) 2018 Nic Wayand\n\nGNU General Public License v3.0\n\n\n'

In [2]:
# Standard Imports
%matplotlib inline
%load_ext autoreload
%autoreload
import matplotlib
import scipy
import matplotlib.pyplot as plt
import datetime
import cartopy.crs as ccrs
from cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTER
import numpy as np
import pandas as pd
import xarray as xr
import xesmf as xe
import os
import re
import glob
import seaborn as sns
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import dask
# dask.set_options(get=dask.threaded.get)

from dask.distributed import Client
client = Client()

# ESIO Imports
import esio
import esiodata as ed

In [3]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:39151  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 16  Cores: 16  Memory: 67.47 GB


In [4]:
# General plotting settings
sns.set_style('whitegrid')
sns.set_context("talk", font_scale=1.5, rc={"lines.linewidth": 2.5})

In [5]:
E = ed.esiodata.load()
model_dir = E.model_dir
# Directories
# Define models to plot
all_models = list(E.model.keys())
all_models = [x for x in all_models if x!='piomas'] # remove some models
# all_models = ['cma']
runType='forecast'
updateall = False
data_vars = ['sic']

In [6]:
ds_region = xr.open_mfdataset(os.path.join(E.grid_dir, 'sio_2016_mask_Update.nc'))

In [7]:
# output dir
data_out = os.path.join(model_dir, 'MME', runType, 'sipn_nc')
if not os.path.exists(data_out):
    os.makedirs(data_out)

In [8]:
# First loop through all models and lazily open them
mod_l = []
cen = 0 # Current ensemble number
for model in all_models:
    if model=='MME':
        continue
    print(model)
    
    data_dir = E.model[model][runType]['sipn_nc']
    all_files = glob.glob(os.path.join(data_dir, '*.nc'))
    
    # Check for any files
    if not all_files:
        continue
        
    # Remove any "empty" files (sometimes happends with ecmwf downloads)
    all_files_new = []
    for cf in all_files:
        if os.stat(cf).st_size > 0:
            all_files_new.append(cf)
    all_files = sorted(all_files_new) # Replace and sort


    ds = xr.open_mfdataset(all_files , concat_dim='init_time', 
                           chunks={'fore_time':10, 'ensemble': 5, 'init_time': 10, 'nj': 304, 'ni': 448},
                          autoclose=True,
                          parallel=True) 

    # Update ensemble number
    ds.coords['ensemble'] = np.arange(cen,cen+ds.ensemble.size)
    cen = cen + ds.ensemble.size
    
    # Grab stuff
    ds = ds[data_vars]
    
    # Drop stuff
    if 'valid_time' in ds.coords:
        ds = ds.drop(['valid_time'])
        
    print(ds.init_time.values[0], ds.init_time.values[-1])
            
    ds.coords['model'] = model
    mod_l.append(ds)

gfdlsipn
2018-03-01T00:00:00.000000000 2018-05-01T00:00:00.000000000
yopp
2018-03-01T00:00:00.000000000 2018-05-19T12:00:00.000000000
bom
2015-01-01T00:00:00.000000000 2018-04-29T00:00:00.000000000
cma
2015-01-01T00:00:00.000000000 2018-04-30T00:00:00.000000000
ecmwf
2015-01-01T00:00:00.000000000 2018-04-30T00:00:00.000000000
hcmr
2015-01-07T00:00:00.000000000 2018-04-26T00:00:00.000000000
isaccnr
2015-11-09T00:00:00.000000000 2018-04-26T00:00:00.000000000
jma
2015-01-06T12:00:00.000000000 2018-04-25T12:00:00.000000000
metreofr
2015-05-01T00:00:00.000000000 2018-04-26T00:00:00.000000000
ukmo
2015-12-01T00:00:00.000000000 2018-04-30T00:00:00.000000000
eccc
2016-01-07T00:00:00.000000000 2018-04-26T00:00:00.000000000
kma
2016-11-01T00:00:00.000000000 2018-04-30T00:00:00.000000000
ncep
2015-01-01T00:00:00.000000000 2018-04-30T00:00:00.000000000
ukmetofficesipn
2014-01-09T00:00:00.000000000 2018-05-01T00:00:00.000000000
ecmwfsipn
1993-01-01T00:00:00.000000000 2018-05-01T00:00:00.000000000
u



2016-10-28T00:00:00.000000000 2018-05-21T00:00:00.000000000
noaasipn
2018-05-01T00:00:00.000000000 2018-05-01T00:00:00.000000000
noaasipn_ext
MME


In [9]:
# Concat data by model
ds_all = xr.concat(mod_l, dim='ensemble')

  (nparts / max_parts))
  (nparts / max_parts))


In [10]:
ds_all

<xarray.Dataset>
Dimensions:    (ensemble: 86, fore_time: 572, init_time: 1800, ni: 448, nj: 304)
Coordinates:
  * fore_time  (fore_time) timedelta64[ns] 0 days 00:00:00 0 days 03:00:00 ...
  * init_time  (init_time) datetime64[ns] 1993-01-01 1993-02-01 1993-03-01 ...
    lon        (nj, ni) float64 168.3 168.4 168.5 168.7 168.8 168.9 169.0 ...
    lat        (nj, ni) float64 31.1 31.25 31.4 31.55 31.69 31.84 31.99 ...
  * ensemble   (ensemble) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ...
    model      (ensemble) <U15 'gfdlsipn' 'gfdlsipn' 'gfdlsipn' 'gfdlsipn' ...
Dimensions without coordinates: ni, nj
Data variables:
    sic        (ensemble, init_time, fore_time, nj, ni) float64 dask.array<shape=(86, 1800, 572, 304, 448), chunksize=(5, 1, 10, 304, 448)>

In [11]:
# Simple multi-model mean
ds_mme = ds_all.mean(dim='ensemble')

In [12]:
# Expand dims
ds_mme = esio.expand_to_sipn_dims(ds_mme)

In [13]:
# Compute
ds_mme.compute()

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.


KeyboardInterrupt

distributed.scheduler - ERROR - Failed to connect to worker 'tcp://127.0.0.1:39781': Timed out trying to connect to 'tcp://127.0.0.1:39781' after 3 s: connect() didn't finish in time
distributed.scheduler - ERROR - Failed to connect to worker 'tcp://127.0.0.1:32967': Timed out trying to connect to 'tcp://127.0.0.1:32967' after 3 s: connect() didn't finish in time
distributed.scheduler - ERROR - Failed to connect to worker 'tcp://127.0.0.1:40141': Timed out trying to connect to 'tcp://127.0.0.1:40141' after 3 s: connect() didn't finish in time
distributed.scheduler - ERROR - Failed to connect to worker 'tcp://127.0.0.1:41465': Timed out trying to connect to 'tcp://127.0.0.1:41465' after 3 s: connect() didn't finish in time
distributed.scheduler - ERROR - Failed to connect to worker 'tcp://127.0.0.1:34739': Timed out trying to connect to 'tcp://127.0.0.1:34739' af

In [None]:
# Write out
ds_mme.to_netcdf(os.path.join(data_out, 'MME.nc'))

ERROR:asyncio:Future exception was never retrieved
future: <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed',)>
Traceback (most recent call last):
  File "/home/disk/sipn/nicway/anaconda3/envs/esio/lib/python3.6/site-packages/distributed/comm/tcp.py", line 179, in read
    n_frames = yield stream.read_bytes(8)
  File "/home/disk/sipn/nicway/anaconda3/envs/esio/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/disk/sipn/nicway/anaconda3/envs/esio/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/disk/sipn/nicway/anaconda3/envs/esio/lib/python3.6/site-packages/distributed/comm/tcp.py", line 200, in read
    convert_stream_closed_error(self, e)
  File "/home/disk/sipn/nicway/anaconda3/