In [2]:
import os

import dask
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import patsy
import six
import tables
from toolz import curry
import xarray as xr

from yatsm.config import validate_and_parse_configfile
from yatsm.io._api import get_reader, read_and_preprocess

os.environ['ROOT'] = '/home/ceholden/Documents/CMS/p008r056_subset/images/'
config = validate_and_parse_configfile('/home/ceholden/Documents/yatsm/examples/topic_xarray.yaml')

readers = dict({name: get_reader(**cfg['reader']) for name, cfg in six.iteritems(config['data']['datasets'])})

if not os.path.exists('cache.nc'):
    dat = read_and_preprocess(config['data']['datasets'], readers, window=((0, 100), (0, 100)))
#     arr.to_netcdf('cache.nc')
else:
    print('Loading from cache')
    dat = xr.open_dataset('cache.nc').load()

bands = ['blue', 'green', 'red', 'nir', 'swir1', 'swir2', 'ndvi']

In [3]:
# FUNCTIONS
def sub_lhs(d):
    dsk = d.copy()
    for k in d:
        if isinstance(k, tuple):
            for i, item in enumerate(k):
                # Make a 'select' function
                dsk[item] = (lambda x, i: x[i], k, i)
    return dsk


def _parse_arr_notation(s, name):
    """ Parse "array notation" dependency
    
    Example:
    
    >>> s = "dat[b1, b2, ..., bN]"
    >>> _parse_arr_notation(s, 'dat')
      ['dat-b1', 'dat-b2', ..., 'dat-bN']
      
    Args:
        s (str): String to parse
        name (str): Name of array
    """
    bands = []
    matches = re.findall('{name}\[(.*)\]'.format(name=name), s)
    for match in matches:  # iter over matches
        match = [name + '-' + m for m in match.replace(' ', ',').split(',') if m]
        bands.extend(match)
    return bands or s
        


def expand_rhs(d, name):
    dsk = d.copy()
    # Iterate through graph key:value (left / right)
    for lhs, rhs in six.iteritems(d):
        # RHS has signature of (callable, *args, )
        if isinstance(rhs, tuple) and callable(rhs[0]):
            _rhs = [rhs[0]]
            for _v in rhs[1:]:
                if isinstance(_v, basestring):
                    _rhs.append(_parse_arr_notation(_v, name))
                # Could be a list of reqs, one of which is 'dat\[.*\]'
                elif isinstance(_v, list):
                    ele = []
                    for li in _v:
                        if isinstance(li, basestring):
                            ele.append(_parse_arr_notation(li, name))
                        else:
                            ele.append(li)
                    _rhs.append(ele)
            dsk[lhs] = tuple(_rhs)
    return dsk


def fulfill_rhs(d):
    """ Creates new targets from 'provides' on RHS
    """
    dsk = d.copy()
    # Iterate through graph key:value (left / right)
    for lhs, rhs in six.iteritems(d):
        # RHS has signature of (callable, *args, )
        if isinstance(rhs, tuple) and callable(rhs[0]):
            _rhs = [rhs[0]]



# pprint(dsk)
# expand_rhs(expand_rhs(dsk, 'data'), 'record')

In [104]:
def ND(pipe, requires, provides):
    print('Running ND to provide "{}", requires {}'.format(provides, requires))
    dat = pipe['data']
    a, b = requires['data']
    dat[provides['data'][0]] = ((dat[a] - dat[b]) / (dat[a] + dat[b]))
    return pipe


def ccdc(pipe, requires, provides):
    print('Running CCDC')
    pipe['record'][provides['record'][0]] = 'some CCDC results'
    return pipe


def commission(pipe, requires, provides):
    print('Hey doing a commission test')
    pipe['record']['commission'] = 'a commission test run result'
    return pipe


def merge_results(pipe, requires, provides):
    # Some sort of results merging logic
    print('Heyo merging some number of results together')
    pipe['record']['merged'] = 'asdf'
    return pipe


exc_config = {
    'ndvi': {
        'task': ND,
        'requires': {
            'data': ['nir', 'red'],
        },
        'provides': {
            'data': ['ndvi']
        }
    },
    'ndmi': {
        'task': ND,
        'requires': {
            'data': ['nir', 'swir1'],
        },
        'provides': {
            'data': ['ndmi']
        }
    },
    'asdf': {
        'task': ND,
        'requires': {
            'data': ['nir', 'ndmi'],
        },
        'provides': {
            'data': ['asdf2']
        }
    },
    'ccdc': {
        'task': ccdc,
        'requires': {
            'data': ['red', 'nir', 'swir1', 'ndvi', 'ndmi'],
        },
        'provides': {
            'record': ['ccdc']
        }
    },
    'ccdc_fix': {
        'task': commission,
        'requires': {
            'data': ['asdf2'],
            'record': ['ccdc']
        },
        'provides': {
            'record': ['ccdc_fix']
        }
    },
    'merged': {
        'task': merge_results,
        'requires': {
            'data': ['ndvi'],
            'record': ['ccdc', 'ccdc_fix'],
        },
        'provides': {
            'record': ['fixed']
        }
    }
}

In [5]:
from toolz.curried import curry

def pipe(exc, cfg):
    _pipe = []
    for _exc in exc:
        # Function
        f = TASKS[_exc]
        
        f_curried = curry(f)(
            provides=cfg[_exc]['provides'],
            requires=cfg[_exc]['requires'],
            config=cfg[_exc].get('config', {})
        )
                
        _pipe.append(f_curried)
    return _pipe

In [105]:
from collections import defaultdict

import toposort
import toolz


def format_deps(d):
    """ Return formatted list of dependencies from 'requires'/'provides'
    """
    out = []
    for t, names in six.iteritems(d):
        out.extend(['{t}-{n}'.format(t=t, n=n) for n in names])
    return out


def pipe_deps(pipe):
    dsk = {'pipe': set()}
    deps = {
        'data': pipe['data'].data_vars.keys(),
        'record': pipe['record'].keys()
    }
    _deps = format_deps(deps)
    for _dep in _deps:
        dsk[_dep] = set(['pipe'])
    return dsk


def config_to_deps(cfg, dsk=None):
    dsk = dsk or {}
    for task, spec in six.iteritems(cfg):
        # Add in task requirements
        deps = format_deps(spec['requires'])
        print(task, set(deps))
        dsk[task] = set(deps)
        # Add in data/record provided by task
        prov = format_deps(spec['provides'])
        for _prov in prov:
            dsk[_prov] = set([task])
    return dsk

Having read in data and existing results, we have our 'pipe' ready to be passed around

In [107]:
pipe = {
    'data': dat,
    'record': {}
}

In [None]:
% matplotlib nbagg

plt.imshow(dat['red'].isel(time=200))


In [132]:
dsk = {
    2: {11},
    9: {11, 8, 10},
    10: {11, 3},
    11: {7, 5},
    8: {7, 3},
    15: {9, 8, 2, 10, 16}
}

In [138]:
[(i, i in dsk) for i in list(toposort.toposort_flatten(dsk))]

[(3, False),
 (5, False),
 (7, False),
 (16, False),
 (8, True),
 (11, True),
 (2, True),
 (10, True),
 (9, True),
 (15, True)]

In [134]:
topo = pipe_deps(pipe)
topo = config_to_deps(exc_config, dsk=topo)

topo

('ccdc', set(['data-ndmi', 'data-ndvi', 'data-red', 'data-nir', 'data-swir1']))
('asdf', set(['data-ndmi', 'data-nir']))
('ccdc_fix', set(['data-asdf2', 'record-ccdc']))
('ndmi', set(['data-nir', 'data-swir1']))
('ndvi', set(['data-red', 'data-nir']))
('merged', set(['data-ndvi', 'record-ccdc_fix', 'record-ccdc']))


{'asdf': {'data-ndmi', 'data-nir'},
 'ccdc': {'data-ndmi', 'data-ndvi', 'data-nir', 'data-red', 'data-swir1'},
 'ccdc_fix': {'data-asdf2', 'record-ccdc'},
 'data-asdf2': {'asdf'},
 'data-blue': {'pipe'},
 'data-fmask': {'pipe'},
 'data-green': {'pipe'},
 'data-hh': {'pipe'},
 'data-hh_hv_ratio': {'pipe'},
 'data-hv': {'pipe'},
 'data-ndmi': {'ndmi'},
 'data-ndvi': {'ndvi'},
 'data-nir': {'pipe'},
 'data-red': {'pipe'},
 'data-swir1': {'pipe'},
 'data-swir2': {'pipe'},
 'data-temp': {'pipe'},
 'merged': {'data-ndvi', 'record-ccdc', 'record-ccdc_fix'},
 'ndmi': {'data-nir', 'data-swir1'},
 'ndvi': {'data-nir', 'data-red'},
 'pipe': set(),
 'record-ccdc': {'ccdc'},
 'record-ccdc_fix': {'ccdc_fix'},
 'record-fixed': {'merged'}}

In [135]:
toposort.toposort_flatten(topo)

['pipe',
 'data-blue',
 'data-fmask',
 'data-green',
 'data-hh',
 'data-hh_hv_ratio',
 'data-hv',
 'data-nir',
 'data-red',
 'data-swir1',
 'data-swir2',
 'data-temp',
 'ndmi',
 'ndvi',
 'data-ndmi',
 'data-ndvi',
 'asdf',
 'ccdc',
 'data-asdf2',
 'record-ccdc',
 'ccdc_fix',
 'record-ccdc_fix',
 'merged',
 'record-fixed']

In [122]:
[k for k in toposort.toposort_flatten(topo) if k in exc_config]

['ndmi', 'ndvi', 'asdf', 'ccdc', 'ccdc_fix', 'merged']

In [59]:
list(toolz.filter(lambda x: x in exc_config.keys(), toposort.toposort_flatten(topo)))

['ndmi', 'ndvi', 'asdf', 'ccdc', 'ccdc_fix', 'merged']

In [11]:
from collections import defaultdict
import operator


def parse_deps(d):
    """ Return list of dependencies from 'requires'/'provides'
    """
    out = []
    for t, names in six.iteritems(d):
        out.extend(['{t}-{n}'.format(t=t, n=n) for n in names])
    return out


def config2dask(config, pipe):
    dsk = defaultdict(list)
    
    dsk['pipe'] = pipe
    for idx, d in enumerate(pipe['data'].data_vars.keys()):
        dsk[d] = (operator.getitem, 'pipe', 'data')

    for task, cfg in six.iteritems(config):
        func = cfg['task']
        requires = parse_deps(cfg['requires'])
        provides = parse_deps(cfg['provides'])
        
        f_curry = curry(func, provides=cfg['provides'], requires=cfg['requires'])
        
        print('Task {}: {}'.format(task, requires))
        
        # Add in provides dependencies
        for provide in provides:
            print('Task {} provides {}'.format(task, provide))
            dsk[provide].extend([task])
        
        # Add in task
        print('Adding task {} that runs {}'.format(task, func))
        dsk[task] = (f_curry, 'pipe', requires)
        
    return dict(dsk)

## Tasks

# Dask

In [114]:
def norm_diff(wrk, provides, requires, **config):
    one, two = requires['data']
    name = provides['data']
    
    arr = wrk['data']
    arr[name] = ((arr[one] - arr[two]) / (arr[one] + arr[two]))
    
    return wrk


# def _CCDCesque(data, cfg, design='1 + ordinal'):
def _CCDCesque(wrk, provides, requires, **config):
    bands = requires['data']
    arr = wrk['data'][bands].dropna('time', how='any')
    
    model = CCDCesque(**config.get('init', {}))
    model.py, model.px = arr.y, arr.x
    
    ordinal = arr.indexes['time'].map(lambda x: x.toordinal())
    design = config.get('fit', {}).get('design', '1 + ordinal')
    X = patsy.dmatrix(design, data=arr, eval_env=patsy.EvalEnvironment.capture())
    
    wrk['record'][provides['record']] = model.fit(X, arr.to_array('band'), ordinal)
    return wrk

In [548]:
def process(y, x):
    print('Working on: {}/{}'.format(y.values, x.values))
    dsk = {
        'wrk': {'record': {}, 'data': dat.sel(x=x, y=y)}
    }

    for name, task, dep in zip(exc, pipe(exc, exc_config), ['wrk'] + exc):
        dsk[name] = (task, dep)
    return dask.get(dsk, exc[-1])

out = [dask.delayed(process)(y, x) for x in dat.x for y in dat.y]

In [552]:
answer = dask.compute(*out, get=dask.async.get_sync)

Working on: 531285.0/704865.0
Working on: 530025.0/704445.0
Working on: 532215.0/704745.0
Working on: 531705.0/703875.0
Working on: 529695.0/704925.0
Working on: 530835.0/705255.0
Working on: 532335.0/706635.0
Working on: 531885.0/704385.0
Working on: 532125.0/704505.0
Working on: 531735.0/706605.0
Working on: 531195.0/706635.0
Working on: 531225.0/706305.0
Working on: 531735.0/704265.0
Working on: 532635.0/706515.0
Working on: 530505.0/706455.0
Working on: 531495.0/704865.0
Working on: 531345.0/706155.0
Working on: 530265.0/704505.0
Working on: 532605.0/703875.0
Working on: 529935.0/704235.0
Working on: 529815.0/706305.0
Working on: 531135.0/705765.0
Working on: 531915.0/704985.0
Working on: 530745.0/704385.0
Working on: 530685.0/704415.0
Working on: 529965.0/703935.0
Working on: 529905.0/705405.0
Working on: 532335.0/705285.0
Working on: 531795.0/706575.0
Working on: 531915.0/705885.0
Working on: 530235.0/704355.0
Working on: 531975.0/704595.0
Working on: 529695.0/704565.0
Working on

In [164]:
def pick(arr, y, x):
    return arr.isel(y=y, x=x)


def accumulate(rec):
    out = []
    for r in rec:
        out.extend(r)
    return np.asarray(out)

In [226]:
def start(dsk):
    print('start')
    import pdb; pdb.set_trace()
    
def start_state(dsk, state):
    print('start_state')
    import pdb; pdb.set_trace()
    
    
def pretask(key, dsk, state):
    print('pretask')
    import pdb; pdb.set_trace()

def posttask(key, result, dsk, state, worker_id):
    print('posttask')
    import pdb; pdb.set_trace()
    

cb = dask.callbacks.Callback(start=start, start_state=start_state, pretask=pretask, posttask=posttask)

In [585]:
def f(x, y):
    return x + y

d = {'hi': dat}

dsk = {
    'x': 5,
    'y': 10,
    'arr': dat,
    'd': d,
    ('z', 'w'): (f, 'd', 'x'),
    'q': (f, ('z', 'w'), 1)
}

# out = dask.threaded.get(dsk, 'q')

def printkeys(key, dask, state):
    print("Computing: {0}!".format(repr(key)))

# with dask.callbacks.Callback(pretask=printkeys):
with cb:
    out = dask.threaded.get(dsk, 'q')

start
--Return--
> <ipython-input-583-662ed486ffb9>(3)start()->None
-> import pdb; pdb.set_trace()
(Pdb) c
start_state
--Return--
> <ipython-input-583-662ed486ffb9>(7)start_state()->None
-> import pdb; pdb.set_trace()
(Pdb) state
{'released': set([]), 'waiting': {'q': set([('z', 'w')])}, 'dependencies': {'q': set([('z', 'w')]), 'x': set([]), 'd': set([]), ('z', 'w'): set(['x', 'd'])}, 'waiting_data': {'x': set([('z', 'w')]), 'd': set([('z', 'w')]), ('z', 'w'): set(['q'])}, 'ready': [('z', 'w')], 'dependents': {'q': set([]), 'x': set([('z', 'w')]), 'd': set([('z', 'w')]), ('z', 'w'): set(['q'])}, 'cache': {'x': 5, 'd': {'hi': <xarray.Dataset>
Dimensions:      (time: 225, x: 100, y: 100)
Coordinates:
  * time         (time) datetime64[ns] 1997-08-30 1997-11-18 1997-12-20 ...
  * y            (y) float64 5.297e+05 5.297e+05 5.298e+05 5.298e+05 ...
  * x            (x) float64 7.038e+05 7.039e+05 7.039e+05 7.039e+05 ...
Data variables:
    hh           (time, y, x) float64 nan nan nan nan 

TypeError: unsupported operand type(s) for +: 'dict' and 'int'

Traceback
---------
  File "/games/conda/conda2/envs/yatsm/lib/python2.7/site-packages/dask/async.py", line 267, in execute_task
    result = _execute_task(task, data)
  File "/games/conda/conda2/envs/yatsm/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "<ipython-input-585-cf2b022482da>", line 2, in f
    return x + y


In [175]:
dsk = {
    'x': 5,
    'y': 10,
    'd': d,
    ('z', 'w'): (f, 'd', 'x', 'y'),
    'q': (f, ('z', 'w'), 1, 1)
}

10 loops, best of 3: 71.9 ms per loop


In [None]:
%timeit dask.threaded.get(dsk, 'record-ccdc')

In [176]:
%%timeit

_d = norm_diff(dat, 'ndvi', ('nir', 'red'))
_d = pick(_d, 0, 0)
_CCDCesque(select(_d, bands), {})

10 loops, best of 3: 74.4 ms per loop


In [None]:
for y in range(5):
    for x in range(5):
        