In [142]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [193]:
import os

import ipyparallel as ipp

rc = ipp.Client()
v = rc.load_balanced_view()
v.block = False
dv = rc[:]
dv.block = False

Experiment into threading writes to NetCDF files

See: 
* https://github.com/Unidata/netcdf4-python/issues/369
* https://github.com/Unidata/netcdf4-python/issues/18

In [194]:
%%writefile process.py
import logging
import sys
import threading
import time

FORMAT = '%(asctime)-15s %(clientip)s %(user)-8s %(message)s'
logging.basicConfig(format=FORMAT, stream=sys.stderr)

logger = logging.getLogger('test')

_GLOBAL_LOCK = threading.Lock()
print('Global Lock: {}'.format(_GLOBAL_LOCK))


class Process(object):
    
    def __init__(self, a):
        self.a = a
        
    def add(self, value):
        with _GLOBAL_LOCK:
            logger.info("Adding {} to {}".format(value, self.a))
            print("{}: Adding {} to {}".format(time.asctime(), value, self.a))
            self.a += value
            time.sleep(2)
            
    def subtract(self, value):
        
        with _GLOBAL_LOCK:
            logger.info("Subtracting {} from {}".format(value, self.a))
            print("{}: Subtracting {} from {}".format(time.asctime(), value, self.a))
            self.a -= value
            time.sleep(2)

Overwriting process.py


In [139]:
import process

Lock: <unlocked _thread.lock object at 0x7f31b49fe198>


In [79]:
pp = process.Process(0)
pp.add(5)
pp.subtract(5)

In [195]:
def test(to_add, to_subtract):
    import process
    pp = process.Process(0)
    pp.add(to_add)
    pp.subtract(to_subtract / 2.)
    return pp.a


def test2(pp, to_add, to_subtract):
    pp.add(to_add)
    pp.subtract(to_subtract / 2.)
    return pp.a


to_add = range(1, 51, 10)
to_subtract = range(1, 10, 2)

Global Lock: <unlocked _thread.lock object at 0x7f31b49c4b20>


In [196]:
ar = rc[:].map(test, to_add, to_subtract)

In [197]:
ar.get()

[0.5, 9.5, 18.5, 27.5, 36.5]

In [199]:
[md['stdout'].split('\n') for md in ar.metadata]

[['Global Lock: <unlocked _thread.lock object at 0x7f55682c5328>',
  'Fri Feb 26 10:10:44 2016: Adding 1 to 0',
  'Fri Feb 26 10:10:46 2016: Subtracting 0.5 from 1',
  'Fri Feb 26 10:10:48 2016: Adding 11 to 0',
  'Fri Feb 26 10:10:50 2016: Subtracting 1.5 from 11',
  ''],
 ['Global Lock: <unlocked _thread.lock object at 0x7f8f38dc6328>',
  'Fri Feb 26 10:10:44 2016: Adding 21 to 0',
  'Fri Feb 26 10:10:46 2016: Subtracting 2.5 from 21',
  ''],
 ['Global Lock: <unlocked _thread.lock object at 0x7fee4419a328>',
  'Fri Feb 26 10:10:44 2016: Adding 31 to 0',
  'Fri Feb 26 10:10:46 2016: Subtracting 3.5 from 31',
  ''],
 ['Global Lock: <unlocked _thread.lock object at 0x7fdb2dbce328>',
  'Fri Feb 26 10:10:44 2016: Adding 41 to 0',
  'Fri Feb 26 10:10:46 2016: Subtracting 4.5 from 41',
  '']]

In [200]:
pps = [process.Process(0) for i in range(4)]

ar = rc[:].map(test2, pps, to_add, to_subtract)

In [201]:
ar.get()

[0.5, 9.5, 18.5, 27.5]

In [202]:
[md['stdout'].split('\n') for md in ar.metadata]

[['Fri Feb 26 10:11:53 2016: Adding 1 to 0',
  'Fri Feb 26 10:11:55 2016: Subtracting 0.5 from 1',
  'Fri Feb 26 10:11:57 2016: Adding 11 to 0',
  'Fri Feb 26 10:11:59 2016: Subtracting 1.5 from 11',
  ''],
 ['Fri Feb 26 10:11:53 2016: Adding 21 to 0',
  'Fri Feb 26 10:11:55 2016: Subtracting 2.5 from 21',
  ''],
 ['Fri Feb 26 10:11:53 2016: Adding 31 to 0',
  'Fri Feb 26 10:11:55 2016: Subtracting 3.5 from 31',
  ''],
 ['']]

In [185]:
import time
time.asctime()

'Fri Feb 26 10:09:59 2016'

In [160]:
[md['stdout'] for md in ar.metadata]

['Global Lock: <unlocked _thread.lock object at 0x7f963c07d328>\n',
 'Global Lock: <unlocked _thread.lock object at 0x7f27b8315328>\n',
 'Global Lock: <unlocked _thread.lock object at 0x7f5460690328>\n',
 '']

In [227]:
def nc_stats(files):
    from netCDF4 import Dataset
    import numpy as np
    import time
    means = {}
    for f in files:
        with Dataset(f, 'as') as ds:
            data = ds.variables['grid'][:]
            
            time.sleep(1)
            means[f] = (time.asctime(), data.mean())
    return means    

In [221]:
files = ['test%d.nc' % i for i in range(4)]
files

['test0.nc', 'test1.nc', 'test2.nc', 'test3.nc']

In [228]:
ar = rc[:].apply_async(nc_stats, files)

In [229]:
ar.get()

[{'test0.nc': ('Fri Feb 26 10:23:16 2016', -0.0001619481),
  'test1.nc': ('Fri Feb 26 10:23:18 2016', -6.604439e-06),
  'test2.nc': ('Fri Feb 26 10:23:19 2016', 8.5475462e-05),
  'test3.nc': ('Fri Feb 26 10:23:21 2016', -9.1590366e-05)},
 {'test0.nc': ('Fri Feb 26 10:23:16 2016', -0.0001619481),
  'test1.nc': ('Fri Feb 26 10:23:18 2016', -6.604439e-06),
  'test2.nc': ('Fri Feb 26 10:23:19 2016', 8.5475462e-05),
  'test3.nc': ('Fri Feb 26 10:23:20 2016', -9.1590366e-05)},
 {'test0.nc': ('Fri Feb 26 10:23:16 2016', -0.0001619481),
  'test1.nc': ('Fri Feb 26 10:23:18 2016', -6.604439e-06),
  'test2.nc': ('Fri Feb 26 10:23:19 2016', 8.5475462e-05),
  'test3.nc': ('Fri Feb 26 10:23:21 2016', -9.1590366e-05)},
 {'test0.nc': ('Fri Feb 26 10:23:16 2016', -0.0001619481),
  'test1.nc': ('Fri Feb 26 10:23:18 2016', -6.604439e-06),
  'test2.nc': ('Fri Feb 26 10:23:19 2016', 8.5475462e-05),
  'test3.nc': ('Fri Feb 26 10:23:21 2016', -9.1590366e-05)}]

In [100]:
from tilezilla import _util, tilespec, products, stores

In [None]:
def save(product, spec, desired_bands):
    from tilezilla import _util, tilespec, products, stores
    tiles = spec.bounds_to_tile(product.bounding_box(spec.crs))
    for b in product.bands:
        for ti in tiles:
            store = stores.GeoTIFFStore('tests/data/stores/GeoTIFF', ti, product)
    
            with 

In [102]:
weld_conus = tilespec.TILESPECS['WELD_CONUS']
collection = [
    products.ESPALandsat('../tests/data/LT50120312002300-SC20151009172149/'),
    products.ESPALandsat('../tests/data/LT50120312002300-SC20151009172149_EPSG5070')
]


tile = list(weld_conus.bounds_to_tile(product.bounding_box(weld_conus.crs)))[0]

store = stores.GeoTIFFStore('tests/data/stores/GeoTIFF', tile, product)

include_filter = {
    'long_name': ('.*surface reflectance.*', '.*brightness temperature.*', '^cfmask_band$',)
}
to_store = _util.multiple_filter(
    [b.long_name for b in product.bands],
    ('.*surface reflectance.*', '.*brightness temperature.*', '^cfmask_band$',),
    True)