This is a first pass at an alternative way of loading data into the cluster. Take a look at the load_and_persist_mogreps notebook for more info first.


Our initial approach was to use network storage to avoid having to deal with data locality. Iris relies on file paths existing to minimise up front computations, meaning you can't just transfer cube objects around a cluster. There are three approaches:


1. ensure all data is available on all nodes
    - we thought using s3fs would help us here. In theory each node has an independent connection to s3, meaning we wouldn't suffer from the typical bottlenecks of network storage. In practice it's slooooow. We're probably missing something though, more investigation required
    
2. load data on specific nodes and pin cubes to that node
    - we thought that this would be prohibitively slow and fragile. This is probably still true, but it's actually faster to download an entire blob of data from S3 than to make a precise byte range request over s3fs!
    
3. Do all computations at once (download data + run analysis)
    - a variant of approach 2, this removes the need to pin but means you can't 'cache' results halfway through.
    
The below approach is a hybrid of approaches 2 + 3. Dask tries to minimise movement of objects already, so we can persist a chunk of computation without needing to explicitly pin cubes to nodes. If the cube does get moved Dask is smart and runs the loading step again.

In practice this seems to mean that after the initial load most of your computations will be v. fast as you're working off of local disks. However it seems like there's always a small portion of cubes that get moved and need to be rebuilt from scratch. It's not an ideal at all, but it is a ton faster than the s3fs approach.

This is a very naive approach / proof of concept - no attempt has been made to manage disk usage here. Dask doesn't know about disk space, so in the worst case scenario all data could be downloaded onto one node.

In [None]:
import iris
from distributed import Client

c = Client('172.31.29.182:8786')
c

In [None]:
month = '01'
day = '01'
run = '00'
prefix = 'prods_op_mogreps-g_2016{}{}_{}'.format(month, day, run)
prefix

In [None]:
from boto.s3.connection import S3Connection
import os

os.environ['S3_USE_SIGV4'] = 'True'

def list_keys(bucket, prefix, local_path='/usr/local/share/notebooks/data/mogreps-g/'):
    conn = S3Connection(host='s3.eu-west-2.amazonaws.com')
    bucket = conn.get_bucket(bucket)
    results = []
    keys = list(iter(bucket.list(prefix=prefix)))
    return keys


prefix = 'air_pressure_at_sea_level'
in_keys = list_keys('mogreps-g-sample2', prefix)
print(len(in_keys))
in_keys[:10]

In [None]:
import iris
import os

from iris.cube import CubeList
from dask import delayed
import dask.bag as db
import boto

from tempfile import NamedTemporaryFile

def load_cubes(address):
    def add_realization(cube, field, filename):
        if not cube.coords('realization'):
            realization = int(filename.split('_')[-2])
            realization_coord = iris.coords.AuxCoord(realization, standard_name='realization', var_name='realization')
            cube.add_aux_coord(realization_coord)
        cube.coord('realization').points.dtype = 'int32'
        cube.coord('time').var_name = 'time'
        cube.coord('forecast_period').var_name = 'forecast_period'
    return iris.load(address, callback=add_realization)

@delayed
def download_and_load(key_name, bucket):
    # if we write to the same location as the s3fs mount point on the Jupyter client we can move cubes between freely.
    # but that's not compatible with the s3fs mount on cluster approach
    key = boto.s3.key.Key(bucket=bucket, name=key_name)
    f_name = '/tmp/{}'.format(key.name.replace('/', '-'))
    key.get_contents_to_filename(f_name)
    return load_cubes(f_name)

In [None]:
delayed_cubes = db.from_delayed([download_and_load(k.name, k.bucket) for k in in_keys])
delayed_cubes

In [None]:
delayed_cubes.take(10)

In [None]:
p_cubes = c.persist(delayed_cubes)

In [None]:
#c.unpublish_dataset('remote_mogreps')
c.publish_dataset(remote_mogreps=p_cubes)

In [None]:
p_cubes = c.persist(delayed_cubes)

In [61]:
list(p_cubes)

[<iris 'Cube' of air_pressure_at_sea_level / (Pa) (realization: 12; time: 59; latitude: 600; longitude: 800)>,
 <iris 'Cube' of air_pressure_at_sea_level / (Pa) (realization: 12; time: 59; latitude: 600; longitude: 800)>,
 <iris 'Cube' of air_pressure_at_sea_level / (Pa) (realization: 12; time: 59; latitude: 600; longitude: 800)>,
 <iris 'Cube' of air_pressure_at_sea_level / (Pa) (realization: 12; time: 59; latitude: 600; longitude: 800)>,
 <iris 'Cube' of air_pressure_at_sea_level / (Pa) (realization: 12; time: 59; latitude: 600; longitude: 800)>,
 <iris 'Cube' of air_pressure_at_sea_level / (Pa) (realization: 12; time: 59; latitude: 600; longitude: 800)>,
 <iris 'Cube' of air_pressure_at_sea_level / (Pa) (realization: 12; time: 59; latitude: 600; longitude: 800)>,
 <iris 'Cube' of air_pressure_at_sea_level / (Pa) (realization: 12; time: 59; latitude: 600; longitude: 800)>,
 <iris 'Cube' of air_pressure_at_sea_level / (Pa) (realization: 12; time: 59; latitude: 600; longitude: 800)>,
 

In [62]:
meaned_values = p_cubes \
    .map(lambda x: x.collapsed('time', iris.analysis.MEAN)) \
    .map(lambda x: x.collapsed('realization', iris.analysis.MEAN)) \
    .map(lambda x: x.collapsed(['latitude', 'longitude'], iris.analysis.MEAN)) \
    .map(lambda x: x.data)
meaned_values

dask.bag<map-lam..., npartitions=20>

In [63]:
from distributed.client import as_completed

futures = c.compute(meaned_values.to_delayed())
for future in as_completed(futures):
    print(future.result())


[array(100852.9664)]
[array(100957.6192)]
[array(100940.2112)]
[array(100774.0928)]
[array(100855.12533333333)]
[array(100782.4384)]
[array(100796.56106666666)]
[array(100872.25173333334)]
[array(100910.37013333333)]
[array(100833.86026666667)]
[array(100779.40906666666)]
[array(100899.5328)]
[array(100875.42613333333)]
[array(100820.01066666667)]
[array(100941.11573333334)]
[array(100852.06186666667)]
[array(100818.39786666667)]
[array(100800.4096)]
[array(100842.96533333333)]
[array(100848.9216)]
