# Table of Contents
 <p>

In [1]:
from __future__ import print_function
from __future__ import absolute_import
import os
import numpy as np
import pandas as pd 
import xarray as xr
import itertools
from glob import glob
from pyspark.sql import SparkSession


def ncread(sc, paths, mode='single', **kwargs):
    """Calls sparkxarray netcdf read function based on the mode parameter.

    ============ ==============================
    Mode          Reading Function
    ------------ ------------------------------
    single       : read_nc_single
    multi        : read_nc_multi
    Anything else: Throw an exception
    ============= ==============================

    Parameters
    ----------

    sc       :  sparkContext object

    paths    :  str or sequence
                Either a string glob in the form "path/to/my/files/*.nc" or an explicit
                list of files to open

    mode     : str
               'single' for a single file
               'multi' for multiple files

    **kwargs : dict
               partitioning options to be passed on to the actual read function.
            
    
    """

    if 'partitions' not in kwargs:
        kwargs['partitions'] = None

    if 'partition_on' not in kwargs:
        kwargs['partition_on'] = ['time']

    error_msg = ("You specified a mode that is not implemented.")

    if (mode == 'single'):
        return read_nc_single(sc, paths, **kwargs)

    elif (mode == 'multi'):
        return read_nc_multi(sc, paths, **kwargs)
    else:
        raise NotImplementedError(error_msg)

        
def read_nc_single(sc, paths, **kwargs):
    """ Read a single netCDF file

    Parameters
    -----------
    sc       :  sparkContext object

    paths    :  str
                an explicit filename to open
    

    **kwargs : dict
               Additional arguments for partitioning 

    """
    partition_on = kwargs.get('partition_on')
    partitions = kwargs.get('partitions')

    dset = xr.open_dataset(paths)

    # D = {'dim_1': dim_1_size, 'dim_2': dim_2_size, ...}
    D ={dset[dimension].name:dset[dimension].size for dimension in partition_on}
    
    # dim_sizes = [range(dim_1_size), range(dim_2_size), range(...)]
    dim_ranges = [range(dim_size) for dim_size in D.values()]
    

    dim_cartesian_product_indices = [element for element in itertools.product(*dim_ranges)]

    # create a list of dictionaries for  positional indexing
    positional_indices = [dict(zip(partition_on, ij)) for ij in dim_cartesian_product_indices]

    if not partitions:
        partitions = len(dim_cartesian_product_indices) / 50

    if partitions > len(dim_cartesian_product_indices):
        partitions = len(dim_cartesian_product_indices)

    
    # Create an RDD
    rdd = sc.parallelize(positional_indices, partitions).map(lambda x: readone_slice(dset, x))

    return rdd


def readone_slice(dset, positional_indices):
    """Read a slice from an xarray.Dataset.

    Parameters
    ----------

    dset                : file_object
                         xarray.Dataset object
    positional_indices  : dict
                          dict containing positional indices for each dimension
                          e.g. {'lat': 0, 'lon': 0}

    Returns
    ---------
    chunk               : xarray.Dataset
                         a subset of the Xarray Dataset

    """

    # Change the positional indices into slice objects
    # e.g {'lat': 0, 'lon': 0} ---> {'lat': slice(0, 1, None),  'lon': slice(0, 1, None)}
    positional_slices = {dim: slice(positional_indices[dim], positional_indices[dim]+1) 
                                                         for dim in positional_indices}

    # Read a slice for the given positional_slices
    chunk = dset[positional_slices]
    return chunk


def read_nc_multi(sc, paths, **kwargs):
    """ Read multiple netCDF files

    Parameters
    -----------
    sc       :  sparkContext object

    paths    :  str or sequence
                Either a string glob in the form "path/to/my/files/*.nc" or an explicit
                list of files to open

    **kwargs : dict
               Additional arguments for partitioning 

    """

    partition_on = kwargs.get('partition_on')
    partitions = kwargs.get('partitions')

    dset = xr.open_mfdataset(paths, autoclose=True)

    # D = {'dim_1': dim_1_size, 'dim_2': dim_2_size, ...}
    D ={dset[dimension].name:dset[dimension].size for dimension in partition_on}
    
    # dim_sizes = [range(dim_1_size), range(dim_2_size), range(...)]
    dim_ranges = [range(dim_size) for dim_size in D.values()]
    

    dim_cartesian_product_indices = [element for element in itertools.product(*dim_ranges)]

    # create a list of dictionaries for  positional indexing
    positional_indices = [dict(zip(partition_on, ij)) for ij in dim_cartesian_product_indices]

    if not partitions:
        partitions = len(dim_cartesian_product_indices) / 50

    if partitions > len(dim_cartesian_product_indices):
        partitions = len(dim_cartesian_product_indices)

    
    # Create an RDD
    rdd = sc.parallelize(positional_indices, partitions).map(lambda x: readone_slice(dset, x))

    return rdd



In [2]:
spark = SparkSession.builder.appName('hi').getOrCreate()
sc = spark.sparkContext

In [3]:
 filename = '/home/abanihi/Documents/climate-data/ERM/t85.an.sfc/e4moda.an.sfc.t85.sst.1957-2002.nc'

def ncread(sc, filename, mode='single', **kwargs):

    if 'partitions' not in kwargs:
        kwargs['partitions'] = None
    if 'partition_on' not in kwargs:
        kwargs['partition_on'] = ['time']


    if (mode == 'single'):
        print('Calling ... read_nc_single(sc, filename, **kwargs)\n')
        print('*******************************')
        return read_nc_single(sc, filename, **kwargs)
       
    elif (mode == 'multi'):
        #print('Calling: ...read_nc_multi(sc, filename, **kwargs)')
        return read_nc_multi(sc, filename, **kwargs)
    else:
        raise NotImplementedError("You specified a mode that is not implemented.")

In [4]:
%time rdd = ncread(sc, filename, mode='single', partition_on=['lat', 'lon'], partitions=700)

CPU times: user 136 ms, sys: 24 ms, total: 160 ms
Wall time: 391 ms


In [5]:
%time rdd.count()

CPU times: user 112 ms, sys: 24 ms, total: 136 ms
Wall time: 37.8 s


32768

In [6]:
rdd.first()

<xarray.Dataset>
Dimensions:     (lat: 1, lon: 1, time: 540)
Coordinates:
  * time        (time) datetime64[ns] 1957-09-01 1957-10-01 1957-11-01 ...
  * lat         (lat) float32 -88.9277
  * lon         (lon) float32 0.0
Data variables:
    gw          (lat) float32 0.000449381
    date        (time) int32 19570901 19571001 19571101 19571201 19580101 ...
    datesec     (time) int32 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 ...
    yyyymmddhh  (time) int32 1957090100 1957100100 1957110100 1957120100 ...
    SST         (time, lat, lon) float64 nan nan nan nan nan nan nan nan nan ...
Attributes:
    title:                     \nERA40 T85 Surface Analysis: created at NCAR
    temporal_span:             \nThe entire ERA40 archive spans 45 years: Sep...
    source_original:           \nEuropean Center for Medium-Range Weather For...
    story:                     \nThis dataset is a netCDF version of ds126.0 ...
    source_NCAR:               \nData Support Section                  

In [7]:
paths = "/home/abanihi/Documents/climate-data/sparkxarray-tests/*.nc"

In [17]:
rddmulti = ncread(sc, paths, mode='multi',partition_on=['lat', 'lon', 'nv'], partitions=1000)

In [18]:
%time rddmulti.count()

CPU times: user 136 ms, sys: 68 ms, total: 204 ms
Wall time: 38.6 s


32040

In [19]:
rddmulti.first()

<xarray.Dataset>
Dimensions:   (lat: 1, lon: 1, nv: 1, time: 4, zlev: 1)
Coordinates:
  * zlev      (zlev) float32 0.0
  * lat       (lat) float32 -88.0
  * lon       (lon) float32 0.0
  * time      (time) datetime64[ns] 1854-01-15 1854-02-15 1854-03-15 1854-04-15
Dimensions without coordinates: nv
Data variables:
    lat_bnds  (time, lat, nv) float32 -89.0 -89.0 -89.0 -89.0
    lon_bnds  (time, lon, nv) float32 -1.0 -1.0 -1.0 -1.0
    sst       (time, zlev, lat, lon) float64 nan nan nan nan
    anom      (time, zlev, lat, lon) float64 nan nan nan nan
Attributes:
    Conventions:                CF-1.6
    Metadata_Conventions:       CF-1.6, Unidata Dataset Discovery v1.0
    metadata_link:              C00884
    id:                         ersst.v4.185401
    naming_authority:           gov.noaa.ncdc
    title:                      NOAA Extended Reconstructed Sea Surface Tempe...
    summary:                    ERSST.v4 is developped based on v3b after rev...
    institution:         

In [12]:
dset = xr.open_mfdataset(paths)

In [20]:
dset.nbytes * (2**-30)

0.0009639188647270203

In [22]:
ds.nbytes * (2**-30)

0.13184790313243866

In [21]:
ds = xr.open_dataset(filename)

In [37]:
ds.loc[{'lat': slice(0, 1, None), 'lon': slice(0, 1, None)}]

<xarray.Dataset>
Dimensions:     (lat: 1, lon: 1, time: 540)
Coordinates:
  * time        (time) datetime64[ns] 1957-09-01 1957-10-01 1957-11-01 ...
  * lat         (lat) float32 0.700384
  * lon         (lon) float32 0.0
Data variables:
    gw          (lat) float32 0.0244462
    date        (time) int32 19570901 19571001 19571101 19571201 19580101 ...
    datesec     (time) int32 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 ...
    yyyymmddhh  (time) int32 1957090100 1957100100 1957110100 1957120100 ...
    SST         (time, lat, lon) float64 298.0 298.9 300.0 300.8 301.2 301.8 ...
Attributes:
    title:                     \nERA40 T85 Surface Analysis: created at NCAR
    temporal_span:             \nThe entire ERA40 archive spans 45 years: Sep...
    source_original:           \nEuropean Center for Medium-Range Weather For...
    story:                     \nThis dataset is a netCDF version of ds126.0 ...
    source_NCAR:               \nData Support Section                    

In [65]:
%time rdd.count()

CPU times: user 84 ms, sys: 32 ms, total: 116 ms
Wall time: 5.66 s


17694720

In [66]:
rdd.first()

(0, 0, 0)

In [38]:
D = {'time': 540, 'lat': 128, 'lon': 256}

In [79]:
partition_on=['lat', 'lon']

In [91]:
dim_ranges=(range(5), range(5))
c = [element for element in itertools.product(*dim_ranges)]

In [92]:
c

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (1, 0),
 (1, 1),
 (1, 2),
 (1, 3),
 (1, 4),
 (2, 0),
 (2, 1),
 (2, 2),
 (2, 3),
 (2, 4),
 (3, 0),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 4),
 (4, 0),
 (4, 1),
 (4, 2),
 (4, 3),
 (4, 4)]

In [154]:
m = [dict(zip(partition_on, ij)) for ij in c ]
m

[{'lat': 0, 'lon': 0},
 {'lat': 0, 'lon': 1},
 {'lat': 0, 'lon': 2},
 {'lat': 0, 'lon': 3},
 {'lat': 0, 'lon': 4},
 {'lat': 1, 'lon': 0},
 {'lat': 1, 'lon': 1},
 {'lat': 1, 'lon': 2},
 {'lat': 1, 'lon': 3},
 {'lat': 1, 'lon': 4},
 {'lat': 2, 'lon': 0},
 {'lat': 2, 'lon': 1},
 {'lat': 2, 'lon': 2},
 {'lat': 2, 'lon': 3},
 {'lat': 2, 'lon': 4},
 {'lat': 3, 'lon': 0},
 {'lat': 3, 'lon': 1},
 {'lat': 3, 'lon': 2},
 {'lat': 3, 'lon': 3},
 {'lat': 3, 'lon': 4},
 {'lat': 4, 'lon': 0},
 {'lat': 4, 'lon': 1},
 {'lat': 4, 'lon': 2},
 {'lat': 4, 'lon': 3},
 {'lat': 4, 'lon': 4}]

In [170]:
[{dim:  slice(element[dim], element[dim]+1)} for element in m for dim in element.keys()]

[{'lat': slice(0, 1, None)},
 {'lon': slice(0, 1, None)},
 {'lat': slice(0, 1, None)},
 {'lon': slice(1, 2, None)},
 {'lat': slice(0, 1, None)},
 {'lon': slice(2, 3, None)},
 {'lat': slice(0, 1, None)},
 {'lon': slice(3, 4, None)},
 {'lat': slice(0, 1, None)},
 {'lon': slice(4, 5, None)},
 {'lat': slice(1, 2, None)},
 {'lon': slice(0, 1, None)},
 {'lat': slice(1, 2, None)},
 {'lon': slice(1, 2, None)},
 {'lat': slice(1, 2, None)},
 {'lon': slice(2, 3, None)},
 {'lat': slice(1, 2, None)},
 {'lon': slice(3, 4, None)},
 {'lat': slice(1, 2, None)},
 {'lon': slice(4, 5, None)},
 {'lat': slice(2, 3, None)},
 {'lon': slice(0, 1, None)},
 {'lat': slice(2, 3, None)},
 {'lon': slice(1, 2, None)},
 {'lat': slice(2, 3, None)},
 {'lon': slice(2, 3, None)},
 {'lat': slice(2, 3, None)},
 {'lon': slice(3, 4, None)},
 {'lat': slice(2, 3, None)},
 {'lon': slice(4, 5, None)},
 {'lat': slice(3, 4, None)},
 {'lon': slice(0, 1, None)},
 {'lat': slice(3, 4, None)},
 {'lon': slice(1, 2, None)},
 {'lat': slice

In [164]:
for i in m:
    for item in i.keys():
        print(item, slice(i[item], i[item]+1))

lat slice(0, 1, None)
lon slice(0, 1, None)
lat slice(0, 1, None)
lon slice(1, 2, None)
lat slice(0, 1, None)
lon slice(2, 3, None)
lat slice(0, 1, None)
lon slice(3, 4, None)
lat slice(0, 1, None)
lon slice(4, 5, None)
lat slice(1, 2, None)
lon slice(0, 1, None)
lat slice(1, 2, None)
lon slice(1, 2, None)
lat slice(1, 2, None)
lon slice(2, 3, None)
lat slice(1, 2, None)
lon slice(3, 4, None)
lat slice(1, 2, None)
lon slice(4, 5, None)
lat slice(2, 3, None)
lon slice(0, 1, None)
lat slice(2, 3, None)
lon slice(1, 2, None)
lat slice(2, 3, None)
lon slice(2, 3, None)
lat slice(2, 3, None)
lon slice(3, 4, None)
lat slice(2, 3, None)
lon slice(4, 5, None)
lat slice(3, 4, None)
lon slice(0, 1, None)
lat slice(3, 4, None)
lon slice(1, 2, None)
lat slice(3, 4, None)
lon slice(2, 3, None)
lat slice(3, 4, None)
lon slice(3, 4, None)
lat slice(3, 4, None)
lon slice(4, 5, None)
lat slice(4, 5, None)
lon slice(0, 1, None)
lat slice(4, 5, None)
lon slice(1, 2, None)
lat slice(4, 5, None)
lon slice(

In [None]:
for i, age in enumerate(d['age'] for d in myList):

In [136]:
[{element:slice(value, value+1)} for element, value in d.items() for d in n]

NameError: name 'd' is not defined

In [153]:
[item for item in enumerate(dictionary for dictionary in n)]

[{'lat': 2, 'lon': 4}, {'lat': 3, 'lon': 6}]

In [131]:
myList = [{'age':x} for x in range(1,10)]
myList

[{'age': 1},
 {'age': 2},
 {'age': 3},
 {'age': 4},
 {'age': 5},
 {'age': 6},
 {'age': 7},
 {'age': 8},
 {'age': 9}]

In [132]:
# Enumerate ages
for i, age in enumerate(d['age'] for d in myList): 
    print (i,age)

0 1
1 2
2 3
3 4
4 5
5 6
6 7
7 8
8 9


In [96]:
rdd1.count()

2

In [97]:
rdd1.first()

{'dfd': 10}

In [125]:
n =  [{'lat': 2, 'lon': 4}, {'lat': 3, 'lon': 6}]

In [127]:
[{d:slice(D[d])} for D in n]

NameError: name 'd' is not defined

In [129]:
b = {'lat': 2, 'lon': 4}

In [130]:
b.keys()

dict_keys(['lat', 'lon'])