<a href="https://www.nvidia.com/dli"> <img src="images/DLI_Header.png" alt="Header" style="width: 400px;"/> </a>

# Data Formats

Data can exist in all shapes and sizes. We will start by looking at NOAA's [Tides & Currents](https://tidesandcurrents.noaa.gov/) dataset as presented in different data formats. Specifically, we will be looking at water levels as used to represent local tide information.

<div style="text-align:center"><img src='images/water_dash.png' width=500></div>

## Objectives
* Learn the differences between the following file types and when best to use them:
 * [JSON](https://www.json.org/json-en.html)
 * [CSV](https://datahub.io/docs/data-packages/csv/)
 * [Parquet](https://parquet.apache.org/documentation/latest/)
* Learn the differences between the following libraries/framework and when best to use them:
 * [pandas](https://pandas.pydata.org/)
 * [cuDF](https://docs.rapids.ai/api/cudf/stable/)
 * [Dask](https://dask.org/)
 
First, let's get these libraries loaded.

In [2]:
import cudf
import dask.dataframe as dd
import dask_cudf
import json
import pandas as pd
import time
import urllib

class Timer:
    def __enter__(self):
        self.start = time.perf_counter()
        return self
    
    def __exit__(self, *args):
        self.end = time.perf_counter()
        self.interval = self.end - self.start

## Data Formats

### JSON

To begin, let us pull our data directly from NOAA's [CO-OPS API](https://api.tidesandcurrents.noaa.gov/api/prod/). In particular, we will be looking at the water level of a body of water of your choosing.

As a web API, we will need to craft a [Uniform Resource Locator](https://developer.mozilla.org/en-US/docs/Learn/Common_questions/What_is_a_URL) (URL) in order to retrieve data from it. This API is flexible and uses several parameters:

* `station`: A numerical ID of the station doing the surveying.
* `range`: How many hours back to retrieve the data from the current time.
* `units`: Can be `english` or `metric`.
* `datum`: The [reference elevation](https://tidesandcurrents.noaa.gov/datum_options.html) to use.
  * Different stations have a different set of datums available.
* `format`: The type of file to return. Can be `json`, `xml`, or `csv`.

**TODO**: Pick a station from NOAA's [Water Level Website](https://tidesandcurrents.noaa.gov/stations.html?type=Water+Levels) and update the `station` parameter below with its 7 digit ID number as a string. Many stations use the `MLLW` datum, but not all of them do. In order to find out which datums are available, use the dropdown at the bottom of the station's dashboard:

<div style="text-align:center"><img src='images/datum.png' width=500></div>

**Hint**: If you receive a `400 Bad Request` error, click on the generated link to view the error message. Click the `...` for one possible solution.

It should also be noted that this is a live data API and there is a chance that it may be experiencing downtime. In such case, we have saved a sample in [data/sample_json.txt](data/sample_json.txt).

In [3]:
url_base = "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?"

url_parameters = {
    "station": "FIXME", # Add a station ID of your choosing. Please keep the quotes.
    "range": 2,  # Please keep this in the single digits for now.
    "product": "water_level",
    "units": "english",
    "datum": "MLLW",
    "time_zone": "gmt",
    "application": "nvidia_dli",
    "format": "json"
}

url = url_base + urllib.parse.urlencode(url_parameters)
print(url)

with urllib.request.urlopen(url) as url:
    data = json.loads(url.read().decode())
    print(json.dumps(data, indent=4))

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?station=FIXME&range=2&product=water_level&units=english&datum=MLLW&time_zone=gmt&application=nvidia_dli&format=json


HTTPError: HTTP Error 400: Bad Request

In [4]:
url_base = "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?"

url_parameters = {
    "station": "9063009",
    "range": 2,  # Please keep this in the single digits for now.
    "product": "water_level",
    "units": "english",
    "datum": "IGLD",
    "time_zone": "gmt",
    "application": "ports_screen",
    "format": "json"
}

url = url_base + urllib.parse.urlencode(url_parameters)
print(url)

with urllib.request.urlopen(url) as url:
    data = json.loads(url.read().decode())
    print(json.dumps(data, indent=4))

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?station=9063009&range=2&product=water_level&units=english&datum=IGLD&time_zone=gmt&application=ports_screen&format=json
{
    "metadata": {
        "id": "9063009",
        "name": "American Falls",
        "lat": "43.0811",
        "lon": "-79.0614"
    },
    "data": [
        {
            "t": "2022-02-12 03:42",
            "v": "559.029",
            "s": "0.020",
            "f": "0,0,0,0",
            "q": "p"
        },
        {
            "t": "2022-02-12 03:48",
            "v": "559.032",
            "s": "0.016",
            "f": "0,0,0,0",
            "q": "p"
        },
        {
            "t": "2022-02-12 03:54",
            "v": "559.045",
            "s": "0.030",
            "f": "0,0,0,0",
            "q": "p"
        },
        {
            "t": "2022-02-12 04:00",
            "v": "559.068",
            "s": "0.016",
            "f": "0,0,0,0",
            "q": "p"
        },
        {
            "t":

Now that we have our data, what does it mean? The result has been split into two main categories:
* `metadata`: Overall information about the result.
 * `id`: Our chosen station ID.
 * `name`: The name of the station.
 * `lat`: The latitude of the station.
 * `lon`: The longitude of the station.
* `data`: The water level readings within the past `2` hours.
 * `t`: The timestamp of the reading.
 * `v`: The value of the reading.
 * `s`: The standard deviation (Sigma) of the reading.
   * If there a multiple values within a one second window, a standard deviation is given.
 * `f`: Four different quality flags.
   * See the [documentation](https://coastwatch.pfeg.noaa.gov/erddap/info/nosCoopsWLR6/index.html) for more information.
 * `q`: Whether the data is (P)relimary or (V)erified.
 
While this format is useful for building a web page, we can see that it is not compact. The field names for the data are repeated for each reading. (This is likely why NOAA uses one letter field names: to save space). In order to see how long it takes to load the data locally, let's grab more data and save it to `.txt` file.

**TODO**: The CO-OPS API supports pulling data for the past year. Replace the FIXMEs below to pull a year's worth of data and save it. The [json](https://docs.python.org/3/library/json.html) documentation may be useful.

In [5]:
url_parameters["range"] = 365 * 24
url = url_base + urllib.parse.urlencode(url_parameters)

with urllib.request.urlopen(url) as url:
    data = json.FIXME(url.read().decode())

with open("data/" + url_parameters["station"] + ".txt", 'w') as outfile:
    FIXME.dump(data["data"], outfile, indent=4)

AttributeError: module 'json' has no attribute 'FIXME'

In [6]:
url_parameters["range"] = 365 * 24
url = url_base + urllib.parse.urlencode(url_parameters)

with urllib.request.urlopen(url) as url:
    data = json.loads(url.read().decode())

with open("data/" + url_parameters["station"] + ".txt", 'w') as outfile:
    json.dump(data["data"], outfile, indent=4)

To check the data file, we can use the [head](https://linux.die.net/man/1/head) command to avoid displaying the entire, potentially large, file. We can also use the [cut](https://linux.die.net/man/1/cut) command to cut each line down to 80 characters. Feel free to replace `sample_json` with your chosen station ID.

In [7]:
!head data/sample_json.txt | cut -c -80

[
    {
        "t": "2020-04-28 06:06",
        "v": "558.937",
        "s": "0.000",
        "f": "0,0,0,0",
        "q": "v"
    },
    {
        "t": "2020-04-28 06:12",


Terminal tools are handy, but if we want to do analysis on these files, it's best to load them into a [DataFrame](https://pandas.pydata.org/pandas-docs/stable/user_guide/dsintro.html#dataframe) which is like a spreadsheet in a programming object.

[pandas](https://pandas.pydata.org/) and [cuDF](https://docs.rapids.ai/api/cudf/stable/) are similar libraries for reading and manipulating DataFrames, with one key difference: `pandas` loads data and performs computations on the CPU whereas `cuDF` does so on the GPU. Let's compare how quickly both of these library reads JSON.

**TODO**: Replace `data/sample_json.txt` with the path to your downloaded station data. Run the below cell **twice**. The read time for GPU should drop dramatically as cuDF is initialized.

In [10]:
file_path = "data/sample_json.txt"  # Replace with your file path

with Timer() as t_pd:
    df_cpu = pd.read_json(file_path, orient="records")
with Timer() as t_cudf:
    df_gpu = cudf.read_json(file_path)

"{:10s} pd: {:>8.5f}s cudf: {:>8.5f}s".format("json", t_pd.interval, t_cudf.interval)

  "Using CPU via Pandas to read JSON dataset, this may "


'json       pd:  0.16162s cudf:  0.19901s'

Reading is useful, but so is the ability to write. Let's convert our data into CSV, parquet, and ZIP: three other file formats that we will be exploring.

First, [to_csv](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html).

In [11]:
file_path = "data/sample_data.csv"

with Timer() as t_pd:
    df_cpu.to_csv(file_path, index=False)
with Timer() as t_cudf:
    df_gpu.to_csv(file_path, index=False)
"{:10s} pd: {:>8.5f}s cudf: {:>8.5f}s".format("csv", t_pd.interval, t_cudf.interval)

'csv        pd:  0.13340s cudf:  0.01050s'

Next, [to_parquet](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html).

In [12]:
file_path = "data/sample_data.parquet"

with Timer() as t_pd:
    df_cpu.to_parquet(file_path, index=False)
with Timer() as t_cudf:
    df_gpu.to_parquet(file_path, index=False)
"{:10s} pd: {:>8.5f}s cudf: {:>8.5f}s".format("parquet", t_pd.interval, t_cudf.interval)

'parquet    pd:  0.05598s cudf:  0.11603s'

Finally, to zip, using the `compression` parameter of [to_csv](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html).

In [13]:
file_path = "data/sample_data.csv.zip"

with Timer() as t_pd:
    df_cpu.to_csv(file_path, compression="zip", index=False)
# cuDF does not yet support compression
"{:10s} pd: {:>8.5f}s cudf: ---".format("csv", t_pd.interval)

'csv        pd:  0.26495s cudf: ---'

### CSV

The next file format we will look at is the [CSV](https://datahub.io/docs/data-packages/csv) file type. CSV stands for Comma Separated Values. Most people are familiar with this format from spreadsheet editing tools like Microsoft [Excel](https://www.microsoft.com/en-us/microsoft-365/excel) and Google [Sheets](https://www.google.com/sheets/about/). CSVs are human readable and are stored in a text file. Let's take a look at the top few rows of our `sample_data.csv` using the [head](https://linux.die.net/man/1/head) command.

In [14]:
!head data/sample_data.csv | cut -c -80

t,v,s,f,q
2020-04-28 06:06,558.937,0.000,"0,0,0,0",v
2020-04-28 06:12,558.927,0.000,"0,0,0,0",v
2020-04-28 06:18,558.921,0.000,"0,0,0,0",v
2020-04-28 06:24,558.914,0.000,"0,0,0,0",v
2020-04-28 06:30,558.911,0.000,"0,0,0,0",v
2020-04-28 06:36,558.911,0.000,"0,0,0,0",v
2020-04-28 06:42,558.911,0.000,"0,1,0,0",v
2020-04-28 06:48,558.911,0.000,"0,1,0,0",v
2020-04-28 06:54,558.911,0.000,"0,1,0,0",v


CSVs may or may not have a `header` row at the top like we do here. This row contains the column names.

Let's compare the time to read a CSV file with pandas and cuDF two libraries.

In [15]:
file_path = "data/sample_data.csv"

with Timer() as t_pd:
    df_cpu = pd.read_csv(file_path)
with Timer() as t_cudf:
    df_gpu = cudf.read_csv(file_path)
"{:10s} pd: {:>8.5f}s cudf: {:>8.5f}s".format("csv", t_pd.interval, t_cudf.interval)

'csv        pd:  0.07787s cudf:  0.00997s'

Like the terminal `head` command, our DataFrames have a `head` method so we can see the first few rows. Let's verify that both the CPU and GPU version of our DataFrame look the same.

In [16]:
df_cpu.head()

Unnamed: 0,t,v,s,f,q
0,2020-04-28 06:06,558.937,0.0,0,v
1,2020-04-28 06:12,558.927,0.0,0,v
2,2020-04-28 06:18,558.921,0.0,0,v
3,2020-04-28 06:24,558.914,0.0,0,v
4,2020-04-28 06:30,558.911,0.0,0,v


In [17]:
df_gpu.head()

Unnamed: 0,t,v,s,f,q
0,2020-04-28 06:06,558.937,0.0,0,v
1,2020-04-28 06:12,558.927,0.0,0,v
2,2020-04-28 06:18,558.921,0.0,0,v
3,2020-04-28 06:24,558.914,0.0,0,v
4,2020-04-28 06:30,558.911,0.0,0,v


We can also check the number of rows in the DataFrame using python's `len` function.

In [18]:
len(df_cpu)

87599

In [19]:
len(df_gpu)

87599

### Parquet

The next file format we will look at is Apache's [parquet](https://parquet.apache.org/) format. Whereas CSV files are optimized for rows, `parquet` is optimized for columns. We can look at the raw data file, but it's not human readable...

In [20]:
!head data/sample_data.parquet | cut -c -80

,І   ��L   2020-04-28 06:06F 12J  8F 24F 30J Jd 
9 0B�
F� B�
(F� B�
(F� B�
(F� B�
9 0B�
	B B�
(F  5>�( 3B�B�
(FXB�


`parquet` files have a header and footer describing the schema of the file. The columns are broken down into chunks. A program that reads a `parquet` file would look at the metadata to figure out where the columns are in the file that is being called by the user. Read more about it [here](https://parquet.apache.org/documentation/latest/).

Like before, let's see how long it takes `pandas` to read a `parquet` versus `cuDF`.

In [21]:
file_path = "data/sample_data.parquet"

with Timer() as t_pd:
    df_cpu = pd.read_parquet(file_path)
with Timer() as t_cudf:
    df_gpu = cudf.io.parquet.read_parquet(file_path)
"{:10s} pd: {:>8.5f}s cudf: {:>8.5f}s".format("parquet", t_pd.interval, t_cudf.interval)

'parquet    pd:  0.04959s cudf:  0.03753s'

### Zip

Oftentimes, data is compressed to save space. [ZIP](https://experience.dropbox.com/resources/what-is-a-zip-file) works by encoding redundant data into fewer bytes. In many cases, data needs to be uncompressed before it is usable by other applications, but both `pandas` and `cuDF` can read this file type.

In [22]:
file_path = "data/sample_data.csv.zip"

with Timer() as t_pd:
    df_cpu = pd.read_csv(file_path, compression="zip")
with Timer() as t_cudf:
    df_gpu = cudf.read_csv(file_path, compression="zip")
"{:10s} pd: {:>8.5f}s cudf: {:>8.5f}s".format(
    "csv(zip)", t_pd.interval, t_cudf.interval
)

'csv(zip)   pd:  0.07050s cudf:  0.01369s'

## Parallel Reading

[Dask](https://dask.org/) is an open-source library designed to natively scale Python code. Dask is a task-based library for parallel scheduling and execution. Although it is certainly possible to use the task-scheduling machinery directly to implement customized parallel workflows, most users only interact with Dask through a *Dask Collection API*.  The most popular "collection" APIs include:

- [Dask DataFrame](https://docs.dask.org/en/latest/dataframe.html): Dask-based version of the [Pandas](https://pandas.pydata.org/) DataFrame/Series API.  Note that `dask_cudf` is just a wrapper around this collection module (`dask.dataframe`).
- [Dask Array](https://docs.dask.org/en/latest/array.html): Dask-based version of the [NumPy](https://numpy.org/) array API
- [Dask Bag](https://docs.dask.org/en/latest/bag.html): *Similar to* a Dask-based version of PyToolz or a Pythonic version of PySpark RDD

For example, Dask DataFrame provides a convenient API for decomposing large pandas (or cuDF) DataFrame/Series objects into a collection of DataFrame *partitions*.

<div style="text-align:center"><img src="images/dask-dataframe.svg" width="350px"></div>

It is useful to understand (on a basic level) how Dask works. When an application or library uses a Dask collection API (like Dask DataFrame), they are typically using that API to construct a directed acyclic graph ([DAG](https://mathworld.wolfram.com/AcyclicDigraph.html)) of tasks.  Once a DAG is constructed, the **core** Dask API can be used (either directly or implicitly through the collection API) to schedule and execute the DAG on one or more threads/processes.

In other words, Dask provides various APIs to:

1. Construct a DAG of "tasks"
2. Schedule/execute those DAGs
3. (Optionally) Spin up dedicated worker and scheduler processes to enable distributed execution

<div style="text-align:center"><img src='images/dask_dag_cartoon.png' width=500></div>

Let's take a look at all of this in action.

In [23]:
file_path = "data/sample_data.csv"

with Timer() as t_dd_cpu:
    ddf_cpu = dd.read_csv(file_path)
with Timer() as t_dd_gpu:
    ddf_gpu = dask_cudf.read_csv(file_path)
"{:10s} CPU: {:>8.5f}s GPU: {:>8.5f}s".format(
    "csv(dask)", t_dd_cpu.interval, t_dd_gpu.interval
)

'csv(dask)  CPU:  0.01050s GPU:  0.00938s'

So fast!

Well, not really. Let's take a look at the DataFrames to see what we mean.

In [24]:
ddf_cpu

Unnamed: 0_level_0,t,v,s,f,q
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,object,float64,float64,object,object
,...,...,...,...,...


In [25]:
ddf_gpu

Unnamed: 0_level_0,t,v,s,f,q
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,object,float64,float64,object,object
,...,...,...,...,...


Neither of the DataFrames have numbers in them, but the structure is set up. This is the result of the [DAG](https://mathworld.wolfram.com/AcyclicDigraph.html). The structure is created to quickly produce a result, but there is no result yet. When we ran the `read_csv` functions, the data was not actually read yet.

We can force a result with `compute`, causing the CSV files to be read.

In [26]:
%%time
ddf_cpu.compute()

CPU times: user 56 ms, sys: 12 ms, total: 68 ms
Wall time: 64.8 ms


Unnamed: 0,t,v,s,f,q
0,2020-04-28 06:06,558.937,0.000,0000,v
1,2020-04-28 06:12,558.927,0.000,0000,v
2,2020-04-28 06:18,558.921,0.000,0000,v
3,2020-04-28 06:24,558.914,0.000,0000,v
4,2020-04-28 06:30,558.911,0.000,0000,v
...,...,...,...,...,...
87594,2021-04-28 05:30,559.436,0.000,0000,p
87595,2021-04-28 05:36,559.436,0.000,0000,p
87596,2021-04-28 05:42,559.442,0.000,1000,p
87597,2021-04-28 05:48,559.383,0.007,0000,p


In [27]:
%%time
ddf_gpu.compute()

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 9.16 ms


Unnamed: 0,t,v,s,f,q
0,2020-04-28 06:06,558.937,0.000,0000,v
1,2020-04-28 06:12,558.927,0.000,0000,v
2,2020-04-28 06:18,558.921,0.000,0000,v
3,2020-04-28 06:24,558.914,0.000,0000,v
4,2020-04-28 06:30,558.911,0.000,0000,v
...,...,...,...,...,...
87594,2021-04-28 05:30,559.436,0.000,0000,p
87595,2021-04-28 05:36,559.436,0.000,0000,p
87596,2021-04-28 05:42,559.442,0.000,1000,p
87597,2021-04-28 05:48,559.383,0.007,0000,p


We may notice that the time it takes to use `Dask`, regardless of CPU or GPU, is not as fast as regular `pandas` and `cuDF` in this case. That is because there some overhead setting up the DAG for parallelization. Because of this, reading data with Dask works best with a large amount of data split across multiple files.

## Exercise

Speaking of comparisons, what is better? `CSV` or `parquet`? `pandas` or `cuDF`? This depends on the data content and what we intend to use it for. In the `data` directory, `numbers.csv` is a large, randomly generated dataset with `7500` rows and `7500` columns. It has been duplicated and saved in `data/numbers.parquet`.

Being a large file, we do not recommend opening it in a new tab as it may crash the notebook connection. Instead, we can sample the file programmatically.

In [28]:
!head data/numbers.csv | cut -c -80

,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
0,0.9688122299539527,-1.9057409366225484,-0.5552106626195441,0.10957962030282403
1,2.3359458116268983,0.09562432378412504,0.47552757641913707,-0.1661054487161083
2,-0.09466958432022161,-1.259493954632562,2.12786051166871,0.8233342585224563,-0
3,0.13067335954764697,0.9963187046923956,-1.0503495764572368,0.03327832259014544
4,-1.483028449313044,0.6875889933866639,1.5303177317859817,1.211056732145375,-0.
5,1.3000573026397504,1.2345335286147918,-0.5603539849064821,0.9242411704697056,-
6,-0.8030736831551547,-0.5129375571241115,0.9468218590193478,1.0107628920250133,
7,-0.9328684817957795,1.2011893344498081,-0.310068624700141,0.5911974890670361,-
8,-0.10251583527810282,0.127221434700931,-0.13388987540502392,-0.395017824053683


**TODO**: Play around with the `skip_rows` and `col_indexes` below to see how it impacts the execution time of both `pandas` and `cuDF`. Check out the documentation for [read_csv](https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html) and [read_parquet](https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html). For `parquet`, we are using the [pyarrow](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html) engine.

**BONUS**: See if you can find a situation where pandas is faster than cuDF. What set of conditions made it possible? What happens when no rows are skipped or most rows are skipped? What happends when no columns are used or most columns are used?

**Note**:
* `skip_rows` works a little differently for [cuDF](https://docs.rapids.ai/api/cudf/stable/api.html#cudf.io.csv.read_csv)
* We've added a `row_num` column so the `parquet` rows can be filtered while read using [filters](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html).

In [29]:
# Change these integers
skip_rows = 1000  # Rows to skip, up to 7500
col_indexes = range(0, 1000)  # Columns to include, up to 7500

filter_rows = [("row_num", "<=", skip_rows)]
use_cols = ["{0}".format(col_idx) for col_idx in col_indexes]

csv_path = "data/numbers.csv"
parq_path = "data/numbers.parquet"

# CSV
with Timer() as t_pd:
    pd.read_csv(csv_path, usecols=use_cols, skiprows=range(1, skip_rows + 1))
with Timer() as t_cudf:
    cudf.read_csv(csv_path, usecols=use_cols, header=0, skiprows=skip_rows)
print(
    "{:10s} pd: {:>8.5f}s cudf: {:>8.5f}s".format("csv", t_pd.interval, t_cudf.interval)
)

# Parquet
with Timer() as t_pd:
    pd.read_parquet(parq_path, columns=use_cols, filters=filter_rows)
with Timer() as t_cudf:
    cudf.io.parquet.read_parquet(parq_path, columns=use_cols, filters=filter_rows)

print(
    "{:10s} pd: {:>8.5f}s cudf: {:>8.5f}s".format(
        "parquet", t_pd.interval, t_cudf.interval
    )
)

csv        pd:  5.15952s cudf:  0.55392s
parquet    pd:  0.40531s cudf:  0.35102s


One last thought before moving on, how much space did each of these file types take? We can use the `ls` terminal command to list all the files in the `data` directory. The `s` flag lists the size and the `h` flag makes that size "human readable".

In [None]:
!ls data -sh

Do the sizes match what you would expect? How well do they reflect the speed it takes to read these files?
Have answers to all of these questions? After this set of notebooks, return to the task launcher to take a short quiz. But for now, proceed to the [next notebook](2_Dask_and_MapReduce.ipynb) to learn more about Dask and MapReduce.

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

<a href="https://www.nvidia.com/dli"> <img src="images/DLI_Header.png" alt="Header" style="width: 400px;"/> </a>