In [1]:
import os
import dask
import xarray as xr
from tqdm import tqdm_notebook as tqdm

import xarray as xr
import numpy as np
from pathlib import Path
from glob import glob
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

In [2]:
dask.config.set({'distributed.dashboard.link':'http://localhost:8777/proxy/{port}/status'})

<dask.config.set at 0x2b8513639d30>

In [3]:
PROJECT = os.environ["DAV_PROJECT"]
USER = os.environ['USER']


In [4]:
cluster = SLURMCluster(project=PROJECT, processes=6, cores=24, memory="40GB",
                           env_extra=['export LANG="en_US.utf8"',
                                      'export LANGUAGE="en_US.utf8"',
                                      'export LC_ALL="en_US.utf8"',
                                      'export LD_LIBRARY_PATH=""',],)

In [5]:
cluster

VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In [6]:
!squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           2151119       dav     srun  abanihi  R    1:30:19      1 casper24
           2151445       dav dask-wor  abanihi  R       0:07      1 casper10
           2151446       dav dask-wor  abanihi  R       0:07      1 casper11
           2151447       dav dask-wor  abanihi  R       0:07      1 casper11


In [7]:
client = Client(cluster)

In [8]:
root_dir = Path("/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS")

## Case 1: `b.e11.BRCP85C5CNBDRD`

For this case, there are two time ranges for each ensemble:
- `2006-01 -> 2080-12`
- `2080-01 -> 2100-12`

In [9]:
CASE1 = 'b.e11.BRCP85C5CNBDRD.f09_g16'

In [10]:
list_1 = sorted(root_dir.glob("b.e11.BRCP85C5CNBDRD.f09_g16.???.cam.h0.*.200601-208012*"))
list_2 = sorted(root_dir.glob("b.e11.BRCP85C5CNBDRD.f09_g16.???.cam.h0.*.208101-210012*"))

We are going to read these files in three steps:

- Step 1: Map files in `list_1` and `list_2` for each ensemble in a list of tuples where each tuple contains files for each ensemble for the two time ranges.
- Step 2: Loop through the resulting list from step 1, and read those files into a list of datasets. Under the hood, xarray concatenates files for each ensemble in one dataset.
- Step 3: Concatenate list of datasets from step 2 into one xarray dataset. We concatenate these datasets along the `ensemble` dimension.

**Step 1**

Map files in `list_1` and `list_2` for each ensemble in a list of tuples where each tuple contains files for each ensemble for the two time ranges.

In [11]:
case_1 = list(zip(list_1, list_2))
case_1[0]

(PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.BRCP85C5CNBDRD.f09_g16.001.cam.h0.TS.200601-208012.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.BRCP85C5CNBDRD.f09_g16.001.cam.h0.TS.208101-210012.nc'))

In [12]:
len(case_1)

33

**Step 2**

Loop through the resulting list from step 1, and read those files into a list of datasets. Under the hood, xarray concatenates files for each ensemble in one dataset.

In [13]:
ds_list = [xr.open_mfdataset(item) for item in case_1]
ds_list[:2]

[<xarray.Dataset>
 Dimensions:       (ilev: 31, lat: 192, lev: 30, lon: 288, nbnd: 2, slat: 191, slon: 288, time: 1140)
 Coordinates:
   * ilev          (ilev) float64 2.255 5.032 10.16 18.56 ... 967.5 985.1 1e+03
   * lat           (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0
   * lev           (lev) float64 3.643 7.595 14.36 24.61 ... 957.5 976.3 992.6
   * lon           (lon) float64 0.0 1.25 2.5 3.75 ... 355.0 356.2 357.5 358.8
   * slat          (slat) float64 -89.53 -88.59 -87.64 ... 87.64 88.59 89.53
   * slon          (slon) float64 -0.625 0.625 1.875 3.125 ... 355.6 356.9 358.1
   * time          (time) object 2006-02-01 00:00:00 ... 2101-01-01 00:00:00
 Dimensions without coordinates: nbnd
 Data variables:
     P0            (time) float64 1e+05 1e+05 1e+05 1e+05 ... 1e+05 1e+05 1e+05
     TS            (time, lat, lon) float32 dask.array<shape=(1140, 192, 288), chunksize=(900, 192, 288)>
     ch4vmr        (time) float64 dask.array<shape=(1140,), chunksize=(

**Step 3**

Concatenate list of datasets from step 2 into one xarray dataset. We concatenate these datasets along the `ensemble` dimension.

In [14]:
dset_case1 = xr.concat(ds_list, dim='ensemble')
dset_case1.attrs['case'] = CASE1
dset_case1

<xarray.Dataset>
Dimensions:       (ensemble: 33, ilev: 31, lat: 192, lev: 30, lon: 288, nbnd: 2, slat: 191, slon: 288, time: 1140)
Coordinates:
  * ilev          (ilev) float64 2.255 5.032 10.16 18.56 ... 967.5 985.1 1e+03
  * lat           (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0
  * lev           (lev) float64 3.643 7.595 14.36 24.61 ... 957.5 976.3 992.6
  * lon           (lon) float64 0.0 1.25 2.5 3.75 ... 355.0 356.2 357.5 358.8
  * slat          (slat) float64 -89.53 -88.59 -87.64 ... 87.64 88.59 89.53
  * slon          (slon) float64 -0.625 0.625 1.875 3.125 ... 355.6 356.9 358.1
  * time          (time) object 2006-02-01 00:00:00 ... 2101-01-01 00:00:00
Dimensions without coordinates: ensemble, nbnd
Data variables:
    P0            (ensemble, time) float64 1e+05 1e+05 1e+05 ... 1e+05 1e+05
    TS            (ensemble, time, lat, lon) float32 dask.array<shape=(33, 1140, 192, 288), chunksize=(1, 900, 192, 288)>
    ch4vmr        (ensemble, time) float64 das

## Case 2: `b.e11.B20TRC5CNBDRD`

In [15]:
CASE2 = 'b.e11.B20TRC5CNBDRD.f09_g16'

In [16]:
list_1 = sorted(root_dir.glob("b.e11.B20TRC5CNBDRD.f09_g16.???.cam.h0.*"))
list_1

[PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.001.cam.h0.TS.185001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.002.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.003.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.004.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.005.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.006.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tse

In [17]:
len(list_1)

42

In [18]:
indices = 0, 33, 34 # indices of special runs to remove for the original list. These runs' output have additional ouput, and/or have special time ranges

In [19]:
updated_list = [item for index, item in enumerate(list_1) if index not in indices]
updated_list

[PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.002.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.003.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.004.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.005.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.006.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS/b.e11.B20TRC5CNBDRD.f09_g16.007.cam.h0.TS.192001-200512.nc'),
 PosixPath('/glade/collections/cdg/data/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tse

In [20]:
dset_case2 = xr.open_mfdataset(updated_list, concat_dim='ensemble')
dset_case2

<xarray.Dataset>
Dimensions:       (ensemble: 39, ilev: 31, lat: 289, lev: 30, lon: 288, nbnd: 2, slat: 256, slon: 288, time: 1032)
Coordinates:
  * lat           (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0
  * slat          (slat) float64 -89.53 -88.59 -87.64 ... 87.64 88.59 89.53
  * ilev          (ilev) float64 2.255 5.032 10.16 18.56 ... 967.5 985.1 1e+03
  * lev           (lev) float64 3.643 7.595 14.36 24.61 ... 957.5 976.3 992.6
  * lon           (lon) float64 0.0 1.25 2.5 3.75 ... 355.0 356.2 357.5 358.8
  * slon          (slon) float64 -0.625 0.625 1.875 3.125 ... 355.6 356.9 358.1
  * time          (time) object 1920-02-01 00:00:00 ... 2006-01-01 00:00:00
Dimensions without coordinates: ensemble, nbnd
Data variables:
    P0            (ensemble) float64 1e+05 1e+05 1e+05 ... 1e+05 1e+05 1e+05
    TS            (ensemble, time, lat, lon) float32 dask.array<shape=(39, 1032, 289, 288), chunksize=(1, 1032, 289, 288)>
    ch4vmr        (ensemble, time) float64 da

In [21]:
dset_case2.attrs['case'] = CASE2

## Persist the data

In [22]:
def save_to_dist(ds, chunks, output_dir):
    zarr_store = ''
    for key, val in sorted(chunks.items()):
        zarr_store += str(key)+str(val) + '-'
        
    zarr_store += '.zarr'
    CASE = ds.attrs['case']
    case_dir = f'{output_dir}/{CASE}'
    os.makedirs(case_dir, exist_ok=True)
    output = f'{case_dir}/{zarr_store}'
    print(f"Writing {output} with chunks = {chunks}")
    ds = ds.chunk(chunks)
    ds.to_zarr(output, mode='w')

In [23]:
chunks = [{"ensemble": 1, "time": 1032},
          {"ensemble": 1, "time": 516},
          {"ensemble": 1, "time": 258},
          {"ensemble": 1, "time": 129},
          {"ensemble": 1, "time": 64},
          {"ensemble": 1, "time": 32}]
output_dir = "/glade/scratch/abanihi/data/AWS/lens"

In [24]:
for chunks_i in chunks:
    save_to_dist(dset_case1, chunks_i, output_dir)

Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.BRCP85C5CNBDRD.f09_g16/ensemble1-time1032-.zarr with chunks = {'ensemble': 1, 'time': 1032}
Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.BRCP85C5CNBDRD.f09_g16/ensemble1-time516-.zarr with chunks = {'ensemble': 1, 'time': 516}
Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.BRCP85C5CNBDRD.f09_g16/ensemble1-time258-.zarr with chunks = {'ensemble': 1, 'time': 258}
Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.BRCP85C5CNBDRD.f09_g16/ensemble1-time129-.zarr with chunks = {'ensemble': 1, 'time': 129}
Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.BRCP85C5CNBDRD.f09_g16/ensemble1-time64-.zarr with chunks = {'ensemble': 1, 'time': 64}
Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.BRCP85C5CNBDRD.f09_g16/ensemble1-time32-.zarr with chunks = {'ensemble': 1, 'time': 32}




In [25]:
for chunks_i in chunks:
    save_to_dist(dset_case2, chunks_i, output_dir)

Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.B20TRC5CNBDRD.f09_g16/ensemble1-time1032-.zarr with chunks = {'ensemble': 1, 'time': 1032}


  dump_to_store(dataset, zstore, writer, encoding=encoding)


Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.B20TRC5CNBDRD.f09_g16/ensemble1-time516-.zarr with chunks = {'ensemble': 1, 'time': 516}




Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.B20TRC5CNBDRD.f09_g16/ensemble1-time258-.zarr with chunks = {'ensemble': 1, 'time': 258}




Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.B20TRC5CNBDRD.f09_g16/ensemble1-time129-.zarr with chunks = {'ensemble': 1, 'time': 129}




Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.B20TRC5CNBDRD.f09_g16/ensemble1-time64-.zarr with chunks = {'ensemble': 1, 'time': 64}




Writing /glade/scratch/abanihi/data/AWS/lens/b.e11.B20TRC5CNBDRD.f09_g16/ensemble1-time32-.zarr with chunks = {'ensemble': 1, 'time': 32}




In [26]:
ls /glade/scratch/abanihi/data/AWS/lens/

[0m[38;5;27mb.e11.B20TRC5CNBDRD.f09_g16[0m/  [38;5;27mb.e11.BRCP85C5CNBDRD.f09_g16[0m/


In [27]:
%load_ext watermark
%watermark -u -n -t -iv -g -m

xarray 0.11.3
dask   1.1.1
numpy  1.16.1
last updated: Mon Feb 04 2019 13:59:59 

compiler   : GCC 7.3.0
system     : Linux
release    : 3.10.0-693.21.1.el7.x86_64
machine    : x86_64
processor  : x86_64
CPU cores  : 72
interpreter: 64bit
Git hash   : 7c5a9bc82aa1fe7cca674ce00df9e9d0477ffb28
