In [None]:
from collections import deque
from functools import partial
from itertools import cycle

import numpy as np
import pandas as pd
import xarray as xr

import datashader as ds
import datashader.transfer_functions as tf
from datashader.colors import viridis

from streamz import Stream

In [None]:
def taxi_trips_stream(source='data/nyc_taxi.csv', frequency='T'):
    """Generate dataframes grouped by given frequency"""
    def get_group(resampler, key):
        try:
            df = resampler.get_group(key)
            df.reset_index(drop=True)
        except KeyError:
            df = pd.DataFrame()
        return df

    df = pd.read_csv(source,
                     infer_datetime_format=True,
                     parse_dates=['tpep_pickup_datetime', 'tpep_pickup_datetime'])
    df = df.set_index('tpep_pickup_datetime', drop=True)
    df = df.sort_index()
    r = df.resample(frequency)
    chunks = [get_group(r, g) for g in sorted(r.groups)]
    indices = cycle(range(len(chunks)))
    while True:
        yield chunks[next(indices)]

### Create streaming pipeline

Given a stream of dataframes representing NYC taxi data, we create a pipeline with four streams: two streams are sliding window aggregations over some time period, while two other streams track the cumulative average for a particular value. The pipeline visualization below shows each step that makes up each stream.

For each aggregation stream, the general steps are 1) aggregate each dataframe using Datashader reduction, 2) keep sliding window of aggregations, and 3) combine sliding window collection into image. The first stream creates a two-day sliding window aggregation, while the second stream creates a 1-week sliding window aggregation.

For each cumulative average stream, we track the cumulative sum of each value along with the number of cumulative data points.

We use the primitives given in the `streamz` library to accomplish this. `aggregated_sliding_window_image_queue` creates each distinct pipeline, but this will likely be replaced by a native `streamz.StreamingDataFrame` container when ready. Each stream will place its final aggregation into a double-ended queue, which is used to keep a history of previous aggregations. By default, we only keep the most recent.

In [None]:
def aggregate_df(df, x, y, plot_width=800, plot_height=600, agg=None):
    t0 = df.index.min().date()
    t1 = df.index.max().date()
    cvs = ds.Canvas(plot_width=plot_width, plot_height=plot_height)
    return t0, t1, cvs.points(df, x, y, agg)

In [None]:
def aggregate_images(iterable, cmap):
    name = "{} - {}".format(iterable[0][0], iterable[-1][1])
    merged = xr.concat((item[2] for item in iterable), dim='cols')
    total = merged.sum(dim='cols')
    return tf.shade(total, cmap=cmap, name=name)

In [None]:
def aggregated_sliding_window_image_queue(source, agg1, agg2, window=1, history=1):
    q = deque(maxlen=history)
    s = source.map(agg1).sliding_window(window)
    s.map(agg2).sink(q.append)
    return q

In [None]:
def cumulative_mean_queue(source, column, history=1):
    def accumulator(acc, df):
        n, total = acc
        return n + 1, total + df[column].sum()
    
    def merge(value):
        n, total = value
        return total / n

    q = deque(maxlen=history)
    source.accumulate(accumulator, start=(0, 0)).map(merge).sink(q.append)
    return q

In [None]:
def show_queue(q, column):
    pd.options.display.float_format = '{:.2f}'.format
    df = pd.DataFrame({'time': np.arange(len(q)), column: list(q)})
    return df.set_index('time')

In [None]:
# Helper functions for useful aggregations
min_amount     = partial(aggregate_df, x='pickup_x', y='pickup_y', agg=ds.min('total_amount'))
max_amount     = partial(aggregate_df, x='pickup_x', y='pickup_y', agg=ds.max('total_amount'))
mean_amount    = partial(aggregate_df, x='pickup_x', y='pickup_y', agg=ds.mean('total_amount'))
sum_amount     = partial(aggregate_df, x='pickup_x', y='pickup_y', agg=ds.sum('total_amount'))
max_passengers = partial(aggregate_df, x='pickup_x', y='pickup_y', agg=ds.max('passenger_count'))
sum_passengers = partial(aggregate_df, x='pickup_x', y='pickup_y', agg=ds.sum('passenger_count'))
sum_pickups    = partial(aggregate_df, x='pickup_x', y='pickup_y', agg=ds.count())

reduce_viridis = partial(aggregate_images, cmap=viridis)

In [None]:
source = Stream()
q_days = aggregated_sliding_window_image_queue(source, window=2, history=6, agg1=max_amount, agg2=reduce_viridis)
q_week = aggregated_sliding_window_image_queue(source, window=7, agg1=max_amount, agg2=reduce_viridis)

q_avg_passengers = cumulative_mean_queue(source, 'passenger_count', history=7)
q_avg_amount     = cumulative_mean_queue(source, 'total_amount', history=7)

In [None]:
source.visualize()

### Push data through pipeline

We initially push 7 days worth of dataframes through the pipeline since the sliding window requires a full window before emitting a window's worth of data.

In [None]:
trips_per_day = taxi_trips_stream(frequency='D')
for i in range(7):
    source.emit(next(trips_per_day))

#### Cumulative average of passengers (ordered by oldest first)

In [None]:
show_queue(q_avg_passengers, 'cumulative average passengers')

#### Cumulative average of total fare (ordered by oldest first)

In [None]:
show_queue(q_avg_amount, 'cumulative average total fare')

#### History of 2-day aggregations (ordered by oldest first)

In [None]:
tf.Images(*list(q_days))

#### Current 1-week aggregation

In [None]:
tf.Images(*list(q_week))

Now we get the next day's worth of data and see how the streams have updated.

In [None]:
source.emit(next(trips_per_day))

#### Cumulative average of passengers (ordered by oldest first)

In [None]:
show_queue(q_avg_passengers, 'cumulative average passengers')

#### Cumulative average of total fare (ordered by oldest first)

In [None]:
show_queue(q_avg_amount, 'cumulative average total fare')

#### History of 2-day aggregations (ordered by oldest first)

In [None]:
tf.Images(*list(q_days))

#### Current 1-week aggregation

In [None]:
tf.Images(*list(q_week))