# Table of Contents
* [Large data](#Large-data)
	* [Datashader](#Datashader)
	* [Aggregators](#Aggregators)
* [Streaming](#Streaming)
	* [DataFrames](#DataFrames)
	* [Plotting](#Plotting)


# Large data

[Dask](dask.org) enables computation on data sets that are larger than memory, called *out-of-core* processing.

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
import dask.dataframe as dd

taxi = dd.read_parquet('data/nyc_taxi_50k.parq', engine='fastparquet')
taxi.info()

In [None]:
taxi.head()

## Datashader

<img src='img/datashader_logo.png' width=10% align='right'>

<img src='img/datashader_pipeline.png' width=70% align='left'>

* Tools like Bokeh map Data (left) directly into an HTML/JavaScript Plot (right)
* datashader instead renders Data into a plot-sized Aggregate array, from which an Image can be constructed then embedded into a Bokeh Plot
* Only the fixed-sized Image needs to be sent to the browser, allowing millions or billions of datapoints to be used
* Every step automatically adjusts to the data, but can be customized

*When not to use datashader*

* Plotting less than 1e5 or 1e6 data points
* When every datapoint must be resolveable individually; standard Bokeh will render all of them
* For full interactivity (hover tools) with every datapoint

*When to use datashader*

* Actual big data; when Bokeh/Matplotlib have trouble
* When the distribution matters more than individual points
* When you find yourself sampling, decimating, or binning to better understand the distribution

In [None]:
import hvplot.dask

taxi.hvplot.scatter(x='pickup_x', y='pickup_y', datashade=True)

## Aggregators

In [None]:
import datashader as ds

clean = taxi.loc[taxi['fare_amount']> 0]
clean['tip_fraction'] = clean['tip_amount'] / clean['fare_amount']

clean.hvplot.scatter(x='dropoff_x', y='dropoff_y', c='tip_fraction',
                     datashade=True,
                     aggregator=ds.mean('tip_fraction'),
                     cmap='bmy')

# Streaming

<a href='./data/stream-ae5.ipynb' class='btn btn-primary btn-lg'>Run the notebook at this link and leave it running</a>

<img src='img/sequence.png'>

The [Streamz](https://streamz.readthedocs.io/en/latest/index.html) enables continuous streams of data from a variety of sources to be built into processing pipelines.

Optionally, Streamz can also work with Pandas dataframes to provide sensible streaming operations on continuous tabular data.

Data is being streamed into the `tmp/taxi` directory every few seconds as new files.

In [None]:
!ls tmp/taxi

The `filenames()` method polls for new files according to the glob every second.

In [None]:
from streamz import Stream

stream = Stream()
f = stream.filenames('tmp/taxi/*.csv', poll_interval=1, start=True)

## DataFrames

The streamed data is mapped to a Pandas DataFrame and then converted to a Streamz DataFrame called `sdf` with `to_dataframe()`.

In [None]:
import pandas as pd
example = pd.read_csv('tmp/taxi/0.csv', parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

sdf = (f
       .map(pd.read_csv, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
       .to_dataframe(example)
      )

Streamz DataFrames support many of the same operations as regular DataFrames and through ipywidgets the value is updated continuously as new data arrives from the stream.

Note that only the most recent file is being processed and displayed.

In [None]:
sdf['fare_amount'].mean()

## Plotting

HvPlot can plot directly from a Streamz DataFrame. Here the values are being cached in the plot so that more points appear every few seconds.

In [None]:
import hvplot.streamz
from bokeh.palettes import Category10_7

scatter = sdf.hvplot.scatter(x='fare_amount', y='trip_distance', c='passenger_count',
                             hover_cols=['passenger_count', 'tip_amount'],
                             cmap=Category10_7, padding=0.1)
scatter

Here we'll mix Pandas processing and plotting.

In [None]:
sdf['diff'] = (sdf['tpep_dropoff_datetime'] - sdf['tpep_pickup_datetime'])
sdf['duration'] = sdf['diff'].map(lambda x:x.total_seconds() / 60)

bar = (sdf
       .groupby('passenger_count')['duration'].mean()
       .hvplot.bar(padding=0.1)
      )

scatter.opts(width=450) + bar.opts(width=450)

To terminate listening for new data call `.stop()` on the original stream.

In [None]:
f.stop()

<font color='grey'><i>Copyright Anaconda 2012-2019 All Rights Reserved.</i></font>