<h1 align="center">PySpark4Climate I/O primer</h1>

This notebook introduces some of the functionalities supported by PySpark4Climate ```read module``` and how to use PySpark4Climate ```read module``` in general.

In [1]:
# Hide warnings if there are any
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark import SparkConf
import read
import numpy as np
from __future__ import print_function
import os
import sys

In [2]:
spark = SparkSession.builder.appName("spark-read-test").getOrCreate()
sc = spark.sparkContext

Let's direct Spark to make **pyspark4climate read module** available to all executors by using ```sc.addPyFiles()``` function option.

In [9]:
sc.addPyFile("/glade/p/work/abanihi/pyspark4climate/read.py")

In [10]:
help(read)

Help on module read:

NAME
    read

FILE
    /glade/p/work/abanihi/pyspark4climate/read.py

DESCRIPTION
    This module ingests netCDF file formats into Spark as:
        - a resilient distributed dataset(RDD)
        - a distributed dataframe
    
    Attributes:
        PARTITIONS (int): default number of partitions to be used by Spark.
    
    TODO:
        * Support multiple files reading
        * Convert time_indices from numbers to dates

CLASSES
    __builtin__.object
        dataset
    
    class dataset(__builtin__.object)
     |  Defines and initializes netCDF file attributes needed by Spark.
     |  Attributes:
     |      filepath                 (str)   :  path for the file to be read
     |      variable_name            (str)   :  variable name
     |      dims                     (tuple) :  dimensions (excluding time dimension) of the variable of interest
     |      ndims                    (int)   :  size of dims tuple
     |      partitions               (int)   :

In [3]:
# Print some information about Spark's configuration
print(SparkConf().toDebugString())

spark.Kryoserializer.buffer.max.mb=4096
spark.app.name=spark-read-test
spark.driver.maxResultSize=10g
spark.driver.memory=20g
spark.executor.memory=15g
spark.master=spark://r2i6n14.ib0.cheyenne.ucar.edu:7077
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.speculation=True
spark.submit.deployMode=client


# Dataset
For this tutorial we will be using the following dataset.
- ```/glade/p/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/hourly1/PRECC/b.e11.B20TRC5CNBDRD.f09_g16.034.cam.h3.PRECC.192001010000Z-200512312300Z.nc```

In [6]:
!ncdump -h /glade/p/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/hourly1/PRECC/b.e11.B20TRC5CNBDRD.f09_g16.034.cam.h3.PRECC.192001010000Z-200512312300Z.nc

netcdf b.e11.B20TRC5CNBDRD.f09_g16.034.cam.h3.PRECC.192001010000Z-200512312300Z {
dimensions:
	lat = 192 ;
	lon = 288 ;
	slat = 191 ;
	slon = 288 ;
	time = UNLIMITED ; // (753360 currently)
	nbnd = 2 ;
	chars = 8 ;
	lev = 30 ;
	ilev = 31 ;
	cosp_prs = 7 ;
	cosp_tau = 7 ;
	cosp_scol = 10 ;
	cosp_ht = 40 ;
	cosp_sr = 15 ;
	cosp_sza = 5 ;
	cosp_htmisr = 16 ;
	cosp_tau_modis = 6 ;
variables:
	double lev(lev) ;
		lev:long_name = "hybrid level at midpoints (1000*(A+B))" ;
		lev:units = "level" ;
		lev:positive = "down" ;
		lev:standard_name = "atmosphere_hybrid_sigma_pressure_coordinate" ;
		lev:formula_terms = "a: hyam b: hybm p0: P0 ps: PS" ;
	double hyam(lev) ;
		hyam:long_name = "hybrid A coefficient at layer midpoints" ;
	double hybm(lev) ;
		hybm:long_name = "hybrid B coefficient at layer midpoints" ;
	double ilev(ilev) ;
		ilev:long_name = "hybrid level at interfaces (1000*(A+B))" ;
		ilev:units = "level" ;
		ilev:positive = "down" ;
		ilev:standard_n

In [8]:
!du -lh /glade/p/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/hourly1/PRECC/b.e11.B20TRC5CNBDRD.f09_g16.034.cam.h3.PRECC.192001010000Z-200512312300Z.nc

156G	/glade/p/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/hourly1/PRECC/b.e11.B20TRC5CNBDRD.f09_g16.034.cam.h3.PRECC.192001010000Z-200512312300Z.nc


In [4]:
filepath = '/glade/p/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/hourly1/PRECC/b.e11.B20TRC5CNBDRD.f09_g16.034.cam.h3.PRECC.192001010000Z-200512312300Z.nc'

# Step 1: Initialize ```dataset``` class available in ```read module```

To initialize this class, we need to pass as an argument of a tuple containing ```(filepath, variable)```. In this case we are interested in ```PRECC variable```.

In [5]:
dset = read.dataset((filepath, 'PRECC'))

In [6]:
print(dset.dims)
print(dset.filepath)
print(dset.variable_name)
print(dset.partitions)
print(dset.other_dims_values_tuple[:5])

(u'lat', u'lon')
/glade/p/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/hourly1/PRECC/b.e11.B20TRC5CNBDRD.f09_g16.034.cam.h3.PRECC.192001010000Z-200512312300Z.nc
PRECC
94170
[(-90.0, 0.0), (-90.0, 1.25), (-90.0, 2.5), (-90.0, 3.75), (-90.0, 5.0)]


# Step 2: Use spark to broadcast the following dataset attributes to all the workers

In [7]:
other_dims_values_tuple = sc.broadcast(dset.other_dims_values_tuple) 
variable_name = sc.broadcast(dset.variable_name)
dims = sc.broadcast(dset.dims)
ndims = sc.broadcast(dset.ndims)

# Step 3: Create an RDD using ```read.create_rdd()```

In [18]:
precc_rdd = read.create_rdd(sc, (filepath, 'PRECC'), mode='single')

In [20]:
precc_rdd.count()

753360

In [21]:
precc_rdd.take(10)

[(array([[ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         ..., 
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.]], dtype=float32), 0),
 (array([[ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         ..., 
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.]], dtype=float32), 1),
 (array([[ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         ..., 
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.,  0., ...,  0.,  0.,  0.]], dtype=float32), 2),
 (array([[ 0.,  0.,  0., ...,  0.,  0.,  0.],
         [ 0.,  0.

![](https://i.imgur.com/EWd3Zdq.jpg)

# Step 4: Create a DataFrame using ```read.dataframe()```

In [11]:
# %load /glade/p/work/abanihi/pyspark4climate/read.py
"""
This module ingests netCDF file formats into Spark as:
    - a resilient distributed dataset(RDD)
    - a distributed dataframe

Attributes:
    PARTITIONS (int): default number of partitions to be used by Spark.

TODO:
    * Support multiple files reading
    * Convert time_indices from numbers to dates
"""

from __future__ import print_function
from netCDF4 import Dataset
from netCDF4 import MFDataset
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import Row
import itertools
import os

global PARTITIONS


class dataset(object):
    """Defines and initializes netCDF file attributes needed by Spark.
    Attributes:
        filepath                 (str)   :  path for the file to be read
        variable_name            (str)   :  variable name
        dims                     (tuple) :  dimensions (excluding time dimension) of the variable of interest
        ndims                    (int)   :  size of dims tuple
        partitions               (int)   :  number of partitions to be used by spark
        other_dims_values_tuple  (list)  :  list of tuples containing cartesian product of all dims values


    Examples:
        >>> dset = dataset(('ta_Amon_CCSM4_historical_r1i1p1_185001-189912.nc', 'ta'))
        >>> print(dset.partitions)
        75
        >>> print(dset.variable_name)
        ta
        >>> print(dset.ndims)
        3
        >>> print(dset.dims)
        (u'plev', u'lat', u'lon')
        >>> print(dset.other_dims_values_tuple[:2])
        [(100000.0, -90.0, 0.0), (100000.0, -90.0, 1.25)]

    """

    def __init__(self, filepath_variable_tuple=None):
        """

        Args:
            filepath_variable_tuple (tuple): tuple containing (filepath, 'variable')
        """

        if filepath_variable_tuple is not None:
            self.filepath = filepath_variable_tuple[0]
            self.variable_name = filepath_variable_tuple[1]
            self.dims = None
            self.ndims = None
            self.partitions = None
            self.other_dims_values_tuple = self.generate_cartesian_product()

    def generate_cartesian_product(self):
        f = Dataset(self.filepath, 'r')
        dset = f.variables[self.variable_name]
        self.partitions = dset.shape[0] / 8
        global PARTITIONS
        PARTITIONS = self.partitions
        self.dims = dset.dimensions[1:]
        self.ndims = len(self.dims)
        values = [f.variables[dim][:].tolist() for dim in self.dims]
        f.close()
        return [element for element in itertools.product(*values)]


def create_rdd(sc, file_list_or_txt_file, mode='multi', partitions=None):
    """Create an RDD from a file_list or tuple of (filepath, variable) and Returns the RDD.

    Args:
        sc                     (object)       : sparkContext Object
        file_list_or_txt_file  (list or tuple : list of tuples or a tuple of the format (filepath, variable)
        mode                   (str)          : If reading multiple files (multi), otherwise(single)
        partitions             (int)          : number of partitions

    """
    if mode == 'multi':
        return read_nc_multi(sc, file_list_or_txt_file, partitions=partitions)

    elif mode == 'single':
        return read_nc_single_chunked(sc, file_list_or_txt_file, partitions=partitions)

    else:
        raise NotImplementedError("You specified a mode that is not implemented.")


def read_nc_single_chunked(sc, filepath_variable_tuple, partitions=None):

    """ Generates an RDD using the information passed by create_rdd function.
     Args:
        sc                     (object)       : sparkContext Object
        file_list_or_txt_file  (tuple)        : a tuple of the format (filepath, variable)
        partitions             (int)          : number of partitions

    Returns:
        rdd_                   (rdd)          : Spark's resilient distributed dataset
    """
    assert isinstance(filepath_variable_tuple, tuple), "For single file mode, you must must input a tuple"
    dset = dataset(filepath_variable_tuple)
    filepath_ = dset.filepath
    variable_ = dset.variable_name
    rows = Dataset(filepath_, 'r').variables[variable_].shape[0]

    if not partitions:
        partitions = PARTITIONS

    if partitions > rows:
        partitions = rows

    step = rows / partitions

    rdd_ = sc.range(0, rows, step)\
             .sortBy(lambda x: x, numPartitions=partitions)\
             .flatMap(lambda x: readonep(filepath_, variable_, x, step)).zipWithIndex()\

    return rdd_


def readonep(filepath_, variable_, start_idx, chunk_size):
    """Read a slice from one file.

    Args:
        filepath_    (str): string containing the file path
        variable_    (str): variable name
        start_idx    (int): starting index
        chunk_size   (int): the chunk size to be read at a time.

    Returns:
        list:   list of the chunk read
    """
    try:
        f = Dataset(filepath_, 'r')
        dset = f.variables[variable_]

        # get the number of dimensions of the variable
        dims = dset.dimensions
        ndims = len(dims)
        end_idx = start_idx + chunk_size

        if end_idx < dset.shape[0]:
            chunk = dset[tuple([slice(start_idx, end_idx)] + [slice(None)]*(ndims-1))]

        else:
            chunk = dset[tuple([slice(start_idx, dset.shape[0])] + [slice(None)]*(ndims-1))]

        return list(chunk[:])

    except Exception as e:
        print("IOError: {} {}".format(e, filepath_))

    finally:
        pass
        f.close()


def dataframe(sc, file_list_or_txt_file, mode='multi', partitions=None):
    """Creates a distributed dataframe from a netCDF file.

    Args:
        sc                     (object)       : sparkContext Object
        file_list_or_txt_file  (tuple)        : a tuple of the format (filepath, variable)
        partitions             (int)          : number of partitions
        mode                   (str)          : (multi) if reading multiple files, otherwise(single)
    Returns:
        df                  (dataframe)          : Spark's distributed data frame
    """

    df = create_rdd(sc, file_list_or_txt_file, mode=mode, partitions=partitions)\
        .map(flatten_data)\
        .flatMap(lambda x: x).repartition(partitions*10)\
        .map(row_transform)\
        .toDF()

    return df


def rdd_to_df(rdd):
    """Function that converts an RDD into a Spark data frame.
    Arguments:
        - rdd: (rdd)

    Returns:
        - df: Spark dataframe
    """
    df = rdd.map(flatten_data)\
            .flatMap(lambda x: x).repartition(PARTITIONS*10)\
            .map(row_transform)\
            .toDF()

    return df


def flatten_data(line):
    """Flattens numpy array and return a tuple of each value
    and its corresponding lat_lon coordinates together with other dimensions.

    Args:
        line (tuple) :  an rdd element in the form of a tuple (data, idx) where data is
                        a numpy array and idx correspond to time index.

    Returns:
         results (tuple): a transformed rdd element in the form
                           of a tuple (idx, dim1_value, dim_value2, ..., data_value)
    """
    data = line[0].ravel().tolist()
    idx = line[1]
    results = map(lambda x: (idx, ) + (x[0]) + (x[1], ), zip(other_dims_values_tuple.value, data))
    return results


def row_transform(line):
    """Transforms a a tuple (idx, dim1_value, dim_value2, ..., data_value) into a Spark sql
       Row object.

    Args:
        line (tuple): a tuple of the form (idx, dim1_value, dim_value2, ..., data_value)

    Returns:
        row(*line) : Spark Row object with arbitray number of items depending on the size of
                     the tuple in line.

    Examples:
        >>> print(line)
        (0, 100000.0, -90.0, 0.0, 257.8)
        >>> row(*line)
        Row(time=0, plev=100000.0, lat=-90.0, lon=0.0, ta=257.8)

    """
    dims_ = dims.value
    ndims_ = len(dims_)
    variable_ = variable_name.value
    columns = ("time",)+tuple(dims_[:])+(variable_,)
    row = Row(*columns)
    return row(*line)


#if __name__ == '__main__':
    #pass 

In [None]:
df = rdd_to_df(precc_rdd)

In [14]:
dset.partitions

94170

In [12]:
precc_df = dataframe(sc, (filepath, 'PRECC'), mode='single', partitions=dset.partitions)

In [13]:
precc_df.show()

+----+------------------+------+--------------------+
|time|               lat|   lon|               PRECC|
+----+------------------+------+--------------------+
| 221|57.958115183246065| 317.5|1.915180458667009...|
| 221|57.958115183246065|318.75|1.754952072552384...|
| 221|57.958115183246065| 320.0|4.623490923449935...|
| 221|57.958115183246065|321.25|2.340043536719349...|
| 221|57.958115183246065| 322.5|3.059930264726063E-8|
| 221|57.958115183246065|323.75|2.528910059140798...|
| 221|57.958115183246065| 325.0|1.764903601042533...|
| 221|57.958115183246065|326.25|1.104392222117667...|
| 221|57.958115183246065| 327.5|7.332674911708636...|
| 221|57.958115183246065|328.75|2.120446353226501...|
| 315| 66.43979057591622|  17.5|                 0.0|
| 315| 66.43979057591622| 18.75|                 0.0|
| 315| 66.43979057591622|  20.0|                 0.0|
| 315| 66.43979057591622| 21.25|                 0.0|
| 315| 66.43979057591622|  22.5|                 0.0|
| 315| 66.43979057591622| 23