# Dask Bag - CMIP6 Indexer Analysis

We demonstrate how to use Dask.bag to read and process semi-structured data such as JSON.

* load JSON files
* Parse json object as dictionary
* Apply map, filter, group
---

- Authors: NCI Virtual Research Environment Team
- Keywords: Dask bag, JSON 
- Create Date: 2020-Sep
- Lineage/Reference: This tutorial is referenced to [dask tutorial](https://github.com/dask/dask-tutorial).
---

## About Dask.bag

Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs. We'll refer to this as "messy" data, because it can contain complex nested structures, missing fields, mixtures of data types, etc. The *functional* programming style fits very nicely with standard Python iteration, such as can be found in the `itertools` module.

Messy data is often encountered at the beginning of data processing pipelines when large volumes of raw data are first consumed. The initial set of data might be JSON, CSV, XML, or any other format that does not enforce strict structure and datatypes.
For this reason, the initial data massaging and processing is often done with Python `list`s, `dict`s, and `set`s.

These core data structures are optimized for general-purpose storage and processing.  Adding streaming computation with iterators/generator expressions or libraries like `itertools` or [`toolz`](https://toolz.readthedocs.io/en/latest/) let us process large volumes in a small space.  If we combine this with parallel processing then we can churn through a fair amount of data.

Dask.bag is a high level Dask collection to automate common workloads of this form.  In a nutshell

    dask.bag = map, filter, toolz + parallel execution

## Setup

Choose from the following two options to create a client:

In [164]:
# If you run this notebook on your local computer or NCI's VDI instance, you can create cluster
from dask.distributed import Client
client = Client()
print(client)

<Client: 'tcp://127.0.0.1:43867' processes=8 threads=48, memory=161.06 GB>


In [None]:
# If you run this notebook on Gadi under pangeo environment, you can create cluster using scheduler.json file
from dask.distributed import Client, LocalCluster
client = Client(scheduler_file='../scheduler.json')
print(client)

<div class="alert alert-info">
<b>Warning: Please make sure you specify the correct path to the schedular.json file within your environment.</b>  
</div>

Starting the Dask Client will provide a dashboard which is useful to gain insight into the computation. The link to the dashboard will become visible when you create the Client. We recommend having the Client open on one side of your screen and your notebook open on the other side, which will be useful for learning purposes.

## Creation

You can create a `Bag` from a Python sequence, from files, or from data on a remote service url, etc. We demonstrate using `.take()` to show elements of the data. (Doing `.take(1)` results in a tuple with one element)

Note that the data are partitioned into blocks, and there are many items per block.

In the first example, the two partitions contain five elements each.

In [3]:
# each element is an integer
import dask.bag as db
b1 = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
b1.take(3)

(1, 2, 3)

In the second example, dask bag sequences a list with different data type

In [4]:
nested_containers = [[0, 1, 2, 3],{},[6.5, 3.14], 'Python', {'version':3}, '' ]
b2 = db.from_sequence(nested_containers) 
b2.count().compute()

6

In the third example, each file is partitioned into one or more bytes blocks.

In [6]:
# each element is a text file, where each line is a JSON object
import os
b3 = db.read_text(os.path.join('/g/data/dk92/notebooks/demo_data/cmip6_json','*.json'))
b3.take(1)

('{"Conventions": ["CF-1.7 CMIP-6.2"], "activity_id": ["CMIP"], "branch_method": ["standard"], "creation_date": ["2019-11-15T02:37:21Z"], "data_specs_version": ["01.00.30"], "experiment": ["1 percent per year increase in CO2"], "experiment_id": ["1pctCO2"], "external_variables": ["areacella"], "frequency": ["mon"], "further_info_url": ["https://furtherinfo.es-doc.org/CMIP6.CSIRO.ACCESS-ESM1-5.1pctCO2.none.r1i1p1f1"], "grid": ["native atmosphere N96 grid (145x192 latxlon)"], "grid_label": ["gn"], "history": ["2019-11-15T02:37:21Z ; CMOR rewrote data to be consistent with CMIP6, CF-1.7 CMIP-6.2 and CF standards."], "institution": ["Commonwealth Scientific and Industrial Research Organisation, Aspendale, Victoria 3195, Australia"], "institution_id": ["CSIRO"], "mip_era": ["CMIP6"], "nominal_resolution": ["250 km"], "notes": ["Exp: ESM-1pctCO2; Local ID: PI-1pct-01; Variable: tasmax ([\'fld_s03i236_max\'])"], "parent_activity_id": ["CMIP"], "parent_experiment_id": ["piControl"], "parent_mi

## Manipulation

`Bag` objects hold the standard functional API found in projects like the Python standard library, `toolz`, or `pyspark`, including `map`, `filter`, `groupby`, etc..

Operations on `Bag` objects create new bags.

In [10]:
# for each even number, calculate the power of 2 of this number
def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
c

dask.bag<lambda, npartitions=10>

You can either convert the dast.bag into a list by calling the `.compute()` method to trigger execution, as we saw for `Delayed` objects.  

In [11]:
list(c)

[4, 16, 36, 64, 100]

In [12]:
d=c.compute()
d

[4, 16, 36, 64, 100]

### CMIP6 Indexer data example

This example data are global attributes from some CMIP6 data. Each file is a JSON encoded dictionary with a list of keys, which are basically CMIP6 vocabularies like:

*  Conventions: CF-1.7 CMIP-6.2
*  activity_id: CMIP
*  creation_date: 2019-11-15T02:37:21Z
*  experiment: 1 percent per year increase in CO2
*  ...

JavaScript Object Notation - JSON data files has the following great features:

- stored as plain text
- common web format
- direct mapping to Python lists & Dictionaries



### Data preprocessing 

Dask.bag only take double quotas in the json object. Therefore, we need to replace the single quota with double quota in the json files. 

Using Python's glob module to search up all the historical model files.

In [161]:
import glob
files = glob.glob('/g/data/dk92/notebooks/demo_data/cmip6_json/*.json')

In [162]:
len(files)

185

Use json module to read the json file.

In [127]:
import json
with open(files[0]) as f:
    items = json.load(f)
type(items)

dict

In [128]:
items['mip_era']

'CMIP6'

JSON files into Dask Bags

In [129]:
import dask.bag as db
items = db.read_text(files[0]) 
items.take(1) # Note: tuple containing a string, but I want a dictionary

('{"Conventions": "CF-1.7 CMIP-6.2", "activity_id": "CMIP", "branch_method": "standard", "creation_date": "2019-11-15T02:38:12Z", "data_specs_version": "01.00.30", "experiment": "1 percent per year increase in CO2", "experiment_id": "1pctCO2", "external_variables": "areacella", "frequency": "mon", "further_info_url": "https://furtherinfo.es-doc.org/CMIP6.CSIRO.ACCESS-ESM1-5.1pctCO2.none.r1i1p1f1", "grid": "native atmosphere N96 grid (145x192 latxlon)", "grid_label": "gn", "history": "2019-11-15T02:38:12Z ; CMOR rewrote data to be consistent with CMIP6, CF-1.7 CMIP-6.2 and CF standards.", "institution": "Commonwealth Scientific and Industrial Research Organisation, Aspendale, Victoria 3195, Australia", "institution_id": "CSIRO", "mip_era": "CMIP6", "nominal_resolution": "250 km", "notes": "Exp: ESM-1pctCO2; Local ID: PI-1pct-01; Variable: tas ([\'fld_s03i236\'])", "parent_activity_id": "CMIP", "parent_experiment_id": "piControl", "parent_mip_era": "CMIP6", "parent_source_id": "ACCESS-ES

In [130]:
import dask.bag as db
items = db.read_text(files[0]).map(json.loads)
items.take(1) # Note: tuple containing a dictionary

({'Conventions': 'CF-1.7 CMIP-6.2',
  'activity_id': 'CMIP',
  'branch_method': 'standard',
  'creation_date': '2019-11-15T02:38:12Z',
  'data_specs_version': '01.00.30',
  'experiment': '1 percent per year increase in CO2',
  'experiment_id': '1pctCO2',
  'external_variables': 'areacella',
  'frequency': 'mon',
  'further_info_url': 'https://furtherinfo.es-doc.org/CMIP6.CSIRO.ACCESS-ESM1-5.1pctCO2.none.r1i1p1f1',
  'grid': 'native atmosphere N96 grid (145x192 latxlon)',
  'grid_label': 'gn',
  'history': '2019-11-15T02:38:12Z ; CMOR rewrote data to be consistent with CMIP6, CF-1.7 CMIP-6.2 and CF standards.',
  'institution': 'Commonwealth Scientific and Industrial Research Organisation, Aspendale, Victoria 3195, Australia',
  'institution_id': 'CSIRO',
  'mip_era': 'CMIP6',
  'nominal_resolution': '250 km',
  'notes': "Exp: ESM-1pctCO2; Local ID: PI-1pct-01; Variable: tas (['fld_s03i236'])",
  'parent_activity_id': 'CMIP',
  'parent_experiment_id': 'piControl',
  'parent_mip_era': 'C

### Basic Queries

Once we parse our JSON data into proper Python objects (`dict`s, `list`s, etc.) we can perform more interesting queries by creating small Python functions to run on our data.

In [145]:
js = db.read_text('/g/data/dk92/notebooks/demo_data/cmip6_json/*.json').map(json.loads)
js

dask.bag<loads, npartitions=185>

In [146]:
# repartition the bag so that `.take()` will return the number of elements as you want.
js1 = js.repartition(1)
js1.take(3)

({'Conventions': 'CF-1.7 CMIP-6.2',
  'activity_id': 'CMIP',
  'branch_method': 'standard',
  'creation_date': '2019-11-15T02:37:21Z',
  'data_specs_version': '01.00.30',
  'experiment': '1 percent per year increase in CO2',
  'experiment_id': '1pctCO2',
  'external_variables': 'areacella',
  'frequency': 'mon',
  'further_info_url': 'https://furtherinfo.es-doc.org/CMIP6.CSIRO.ACCESS-ESM1-5.1pctCO2.none.r1i1p1f1',
  'grid': 'native atmosphere N96 grid (145x192 latxlon)',
  'grid_label': 'gn',
  'history': '2019-11-15T02:37:21Z ; CMOR rewrote data to be consistent with CMIP6, CF-1.7 CMIP-6.2 and CF standards.',
  'institution': 'Commonwealth Scientific and Industrial Research Organisation, Aspendale, Victoria 3195, Australia',
  'institution_id': 'CSIRO',
  'mip_era': 'CMIP6',
  'nominal_resolution': '250 km',
  'notes': "Exp: ESM-1pctCO2; Local ID: PI-1pct-01; Variable: tasmax (['fld_s03i236_max'])",
  'parent_activity_id': 'CMIP',
  'parent_experiment_id': 'piControl',
  'parent_mip_e

List all the variables using `.map()`:

In [147]:
js.map(lambda record: record['variable_id']).compute()

['tasmax',
 'pr',
 'ua',
 'ua',
 'ua',
 'huss',
 'huss',
 'cl',
 'cl',
 'cl',
 'cl',
 'cl',
 'pfull',
 'cl',
 'cl',
 'cl',
 'cl',
 'cl',
 'cl',
 'cl',
 'cl',
 'cl',
 'cl',
 'pfull',
 'cl',
 'rsus',
 'rsus',
 'rlus',
 'rlus',
 'vas',
 'vas',
 'sci',
 'sci',
 'prsn',
 'rsutcs',
 'prsn',
 'tasmin',
 'tasmin',
 'tauv',
 'tauv',
 'evspsbl',
 'evspsbl',
 'hurs',
 'hurs',
 'rlutcs',
 'rsutcs',
 'rlutcs',
 'prw',
 'prw',
 'prc',
 'prc',
 'ta',
 'ta',
 'ta',
 'ta',
 'rsut',
 'zg',
 'rsut',
 'rlut',
 'rlut',
 'rsdscs',
 'rsdscs',
 'vas',
 'ua',
 'tasmax',
 'ps',
 'uas',
 'zg',
 'cl',
 'evspsbl',
 'wap',
 'tas',
 'hfls',
 'hfss',
 'zg',
 'pr',
 'cct',
 'rsus',
 'zg',
 'rlut',
 'rsut',
 'psl',
 'tauv',
 'hurs',
 'clt',
 'tasmin',
 'hus',
 'sfcWind',
 'rsdt',
 'zg',
 'va',
 'rlds',
 'rsds',
 'huss',
 'tauu',
 'ts',
 'cli',
 'tasmax',
 'cli',
 'cli',
 'cli',
 'cli',
 'cli',
 'cli',
 'cli',
 'cli',
 'cli',
 'cli',
 'rldscs',
 'cli',
 'cli',
 'cli',
 'cli',
 'cli',
 'hfls',
 'hfls',
 'clw',
 'clw',
 '

Find out how many files with variable `cli` using `.filter()`:

In [165]:
specific_item = js.filter(lambda element: element['variable_id'] == 'cli')
specific_item.count().compute()

16

Find out how many data files have the nominal resolution of 250 km.

In [149]:
d_250km = js.filter(lambda element: element['nominal_resolution'] == '250 km').count().compute()
d_250km

185

It is common to do many of these steps in one pipeline, only calling `compute` or `take` at the end.

In [150]:
result = (js1.filter(lambda element: element['variable_id'] == 'cli')
           .map(lambda element: element['tracking_id']))
result

dask.bag<lambda, npartitions=1>

In [151]:
result.take(5)

('hdl:21.14100/bf8714fa-75e9-4991-8a78-65ff3c9d0c09',
 'hdl:21.14100/07afabcd-2671-43b2-be80-8898b500c380',
 'hdl:21.14100/bc0186a4-5032-4f5f-bb8f-f9ea1b1fb840',
 'hdl:21.14100/03314b2e-bccd-43f2-9971-4ea064dd4168',
 'hdl:21.14100/484c86d8-495d-4b19-aec9-9b64c9bb9c3e')

### Groupby and Foldby

Often we want to group data by some function or key.  We can do this either with the `.groupby` method, which is straightforward but forces a full shuffle of the data (expensive) or with the harder-to-use but faster `.foldby` method, which does a streaming combined groupby and reduction.

*  `groupby`:  Shuffles data so that all items with the same key are in the same key-value pair
*  `foldby`:  Walks through the data accumulating a result per key

*Note: the full groupby is particularly bad. In actual workloads you would do well to use `foldby` or switch to `DataFrame`s if possible.*

### `groupby`

Groupby collects items in your collection so that all items with the same value under some function are collected together into a key-value pair.

In [152]:
result = js.groupby(lambda item: item['institution_id'])
result

dask.bag<shuffle, npartitions=196>

In [153]:
%%time
result = js.groupby(lambda item: item['institution_id']).starmap(lambda k, v: (k, len(v))).compute()
print(sorted(result))

[('CSIRO', 18), ('CSIRO', 19), ('CSIRO', 19), ('CSIRO', 19), ('CSIRO', 19), ('CSIRO', 20), ('CSIRO', 20), ('CSIRO', 20), ('IPSL', 3), ('IPSL', 4), ('IPSL', 4), ('IPSL', 4), ('IPSL', 4), ('IPSL', 4), ('IPSL', 8)]
CPU times: user 2.05 s, sys: 76.2 ms, total: 2.12 s
Wall time: 2.12 s


### `foldby`

Foldby can be quite odd at first.  It is similar to the following functions from other libraries:

*  [`toolz.reduceby`](http://toolz.readthedocs.io/en/latest/streaming-analytics.html#streaming-split-apply-combine)
*  [`pyspark.RDD.combineByKey`](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/)

When using `foldby` you provide 

1.  A key function on which to group elements
2.  A binary operator such as you would pass to `reduce` that you use to perform reduction per each group
3.  A combine binary operator that can combine the results of two `reduce` calls on different parts of your dataset.

Your reduction must be associative.  It will happen in parallel in each of the partitions of your dataset.  Then all of these intermediate results will be combined by the `combine` binary operator.

We find the number of institute with the same name.

In [155]:
%%time
# This one is comparatively fast and produces the same result.
from operator import add
def incr(tot, _):
    return tot+1

result = js.foldby(key='institution_id', 
                   binop=incr, 
                   initial=0, 
                   combine=add, 
                   combine_initial=0).compute()
print(sorted(result))

[('CSIRO', 154), ('IPSL', 31)]
CPU times: user 514 ms, sys: 24 ms, total: 538 ms
Wall time: 553 ms


## DataFrames

For the same reasons that Pandas is often faster than pure Python, `dask.dataframe` can be faster than `dask.bag`.  DataFrames are frequently the end-point of the "messy" part of data ingestion—once the data can be made into a data-frame, then complex split-apply-combine logic will become much more straight-forward and efficient.

You can transform a bag with a simple tuple or flat dictionary structure into a `dask.dataframe` with the `to_dataframe` method.

In [156]:
df = js.to_dataframe()
df

Unnamed: 0_level_0,Conventions,activity_id,branch_method,creation_date,data_specs_version,experiment,experiment_id,external_variables,frequency,further_info_url,grid,grid_label,history,institution,institution_id,mip_era,nominal_resolution,notes,parent_activity_id,parent_experiment_id,parent_mip_era,parent_source_id,parent_time_units,parent_variant_label,product,realm,run_variant,source,source_id,source_type,sub_experiment,sub_experiment_id,table_id,table_info,title,variable_id,variant_label,version,cmor_version,tracking_id,license
npartitions=185,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1
,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


This now looks like a well-defined DataFrame, and we can apply Pandas-like computations to it efficiently.

Using a Dask DataFrame, how long does it take to do our prior computation of numbers of institues with the same name?  It turns out that `dask.dataframe.groupby()` is faster than `dask.bag.groupby()` ; but it still cannot match `dask.bag.foldby()` for this case.

In [157]:
df.groupby('institution_id')

<dask.dataframe.groupby.DataFrameGroupBy at 0x14b6fc3cbc10>

In [159]:
%time df.groupby('institution_id').count().compute()

CPU times: user 536 ms, sys: 38.8 ms, total: 575 ms
Wall time: 1.24 s


Unnamed: 0_level_0,Conventions,activity_id,branch_method,creation_date,data_specs_version,experiment,experiment_id,external_variables,frequency,further_info_url,...,sub_experiment_id,table_id,table_info,title,variable_id,variant_label,version,cmor_version,tracking_id,license
institution_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
CSIRO,154,154,154,154,154,154,154,154,154,154,...,154,154,154,154,154,154,154,154,154,154
IPSL,31,31,31,31,31,31,31,31,31,31,...,31,31,0,31,31,31,0,0,31,31


## Limitations

Bags provide very general computation (any Python function.)  This generality
comes at cost.  Bags have the following known limitations

1.  Bag operations tend to be slower than array/dataframe computations in the
    same way that Python tends to be slower than NumPy/Pandas
2.  ``Bag.groupby`` is slow.  You should try to use ``Bag.foldby`` if possible.
    Using ``Bag.foldby`` requires more thought. Even better, consider creating
    a normalised dataframe.

## Shutdown

In [None]:
client.shutdown()