In [12]:
from dask.dot import dot_graph
import itertools
import logging
import netCDF4
import numpy
import dask.array as da
from dask import delayed
import time
from dask.distributed import Client
from urllib import request


# client = Client('scheduler:8786')
client = Client(processes=False)
client.ncores()

download_location = '/temp'
data_url = 'http://172.22.0.1:8234'
max_download_attempts = 5

all_models = ['ACCESS1-0', 'BNU-ESM', 'CCSM4', 'CESM1-BGC', 'CNRM-CM5', 'CSIRO-Mk3-6-0', 'CanESM2', 'GFDL-CM3', 'GFDL-ESM2G', 'GFDL-ESM2M', 'IPSL-CM5A-LR', 'IPSL-CM5A-MR', 'MIROC-ESM-CHEM', 'MIROC-ESM', 'MIROC5', 'MPI-ESM-LR', 'MPI-ESM-MR', 'MRI-CGCM3', 'NorESM1-M', 'bcc-csm1-1', 'inmcm4']
all_models = ['ACCESS1-0'] 
all_vars = ['tasmax', 'pr']
all_years = {
     'historical': list(range(1971, 1973))
}

def get_dataset_url(variable, scenario, model, year, prefix = data_url):
    prefix_filename = '/'.join([prefix, 'NEX-GDDP', 'BCSD', scenario, 'day', 'atmos', variable, 'r1i1p1', 'v1.0'])
    # prefix_filename = data_url + '/data'
    filename = '_'.join([variable, 'day', 'BCSD', scenario, 'r1i1p1', model, str(year) + '.nc'])
    url = '/'.join([prefix_filename, filename])
    return url

def get_context(year, **kwargs):
    variables = [kwargs.get('variable')] if kwargs.get('variable') else all_vars
    scenarios = ['historical']
    models = [kwargs.get('model')] if kwargs.get('model') else all_models
    outlist = []
    combinations = list(itertools.product(variables, scenarios, models))
    result = list(map(lambda comb: [ *comb, year ], combinations))
    return result

def get_year_ensemble(year, variable = 'tasmax'):
    context = get_context(year, variable = variable)
    datasets = list(map(lambda x: str(get_dataset_url(*x)), context))
    return datasets

def to_dataset(filename):
    return netCDF4.Dataset(filename)

def download_file(url):
    print("url: " + url)
    attempts = 0
    success = False
    filename = ""
    while attempts < max_download_attempts and not success:
        time.sleep(2 ** attempts)
        filename = '/'.join([download_location, str(url.split('/')[-1])])
        print("Downloading file at " + filename)
        u = request.urlopen(url)
        f = open(filename, 'wb')
        f.write(u.read())
        f.close()
        success = True
        break
    return filename

In [13]:
datasets_tasmax = list(map(get_year_ensemble, all_years['historical']))
datasets_pr = list(map(lambda x : get_year_ensemble(x, variable = 'pr'), all_years['historical']))

In [14]:
client.ncores()

{'inproc://172.22.0.2/23/2': 8}

In [15]:
dsets = []
da_dsets = []
for row in datasets_tasmax:
    print("Processing year")
    fnames = list(map(download_file, row))
    datasets_year = list(map(lambda dset: netCDF4.Dataset(dset), fnames))
    dsets.append(datasets_year)
    
    dask_dsets_year = map(lambda dset: da.from_array(dset['tasmax'], chunks=(366, 120, 120)), datasets_year)
    da_dsets.append(list(dask_dsets_year))


Processing year
url: http://172.22.0.1:8234/NEX-GDDP/BCSD/historical/day/atmos/tasmax/r1i1p1/v1.0/tasmax_day_BCSD_historical_r1i1p1_ACCESS1-0_1971.nc
Downloading file at /temp/tasmax_day_BCSD_historical_r1i1p1_ACCESS1-0_1971.nc
Processing year
url: http://172.22.0.1:8234/NEX-GDDP/BCSD/historical/day/atmos/tasmax/r1i1p1/v1.0/tasmax_day_BCSD_historical_r1i1p1_ACCESS1-0_1972.nc
Downloading file at /temp/tasmax_day_BCSD_historical_r1i1p1_ACCESS1-0_1972.nc


In [16]:
da_dsets = [row[0] for row in da_dsets]
da_dsets

[dask.array<array, shape=(365, 720, 1440), dtype=float32, chunksize=(365, 120, 120)>,
 dask.array<array, shape=(366, 720, 1440), dtype=float32, chunksize=(366, 120, 120)>]

In [17]:
# To concatenate dask arrays
stack = da.concatenate(da_dsets, axis = 0)
stack_rechunked = stack.rechunk(731, 120, 120)


In [23]:
@delayed
def custom_percentile(a, q=99):
    b = numpy.sum(a, q)
    c = numpy.array(b, ndmin=a.ndim)
    return c

In [24]:
custom_percentile

Delayed('custom_percentile-24ea20c8-2ec8-46d3-bbf4-87ac298f2278')

In [25]:
op = da.apply_along_axis(custom_percentile, 0, stack_rechunked)
op

dask.array<apply_along_axis, shape=(720, 1440), dtype=object, chunksize=(720, 731)>

In [21]:
dict(op.dask)

{('apply_along_axis-ce49171bd9c18297afdf870e03d65baa',
  0,
  0): (<function dask.compatibility.apply>, <function dask.array.routines._inner_apply_along_axis>, [('rechunk-merge-577367bb52ce20dbbf721294eae29f37',
    0,
    0,
    0)], (dict,
   [['func1d_axis', 0],
    ['func1d_args', (tuple, [])],
    ['func1d', 'custom_percentile-5a46e6fc-4af2-429a-b886-864dfcdc49c8'],
    ['func1d_kwargs', (dict, [])]])),
 ('apply_along_axis-ce49171bd9c18297afdf870e03d65baa',
  0,
  1): (<function dask.compatibility.apply>, <function dask.array.routines._inner_apply_along_axis>, [('rechunk-merge-577367bb52ce20dbbf721294eae29f37',
    0,
    0,
    1)], (dict,
   [['func1d_axis', 0],
    ['func1d_args', (tuple, [])],
    ['func1d', 'custom_percentile-5a46e6fc-4af2-429a-b886-864dfcdc49c8'],
    ['func1d_kwargs', (dict, [])]])),
 ('array-714cf854e5d29eb9ad3ac039fa52ec74',
  0,
  0,
  0): (<function dask.array.core.getter>, 'array-original-714cf854e5d29eb9ad3ac039fa52ec74', (slice(0, 366, None),
   slic

In [None]:
res = op.compute()
res

In [None]:
x = da.ones(1000, chunks=(2,)).sum()
x.compute()

In [None]:
def load_tasmax(fn):
    with netCDF4.Dataset(fn) as f:
        return f.variables['tasmax']

filename = 'tasmax_day_BCSD_historical_r1i1p1_ACCESS1-0_1972.nc'
load_tasmax(filename)

In [None]:
from distributed import Client, progress
client = Client('scheduler:8786')


