<p float="left">
<img src="images/dask_horizontal.svg" width="45%" alt="Dask logo\"> <img src="images/numba_horizontal.svg" vspace="50" width="45%" alt="Numba logo\">
</p>

<h1><center>Using Dask (and Numba) to Explore Chicago Taxi Trips</center></h1>

## Anirrudh Krishnan
### Software Engineer, Quansight
#### PyLadies Chicago Meetup - August 2019


In [1]:
import os, sys
import numpy as np
import dask
from datetime import datetime 

## What is Dask?

* Dask is an open source library for parallel computing in Python - helping the language scale.

## Why Dask?

* Large Dataset (49.63 GB) - Doesn't fit in memory!
* Scalable on local machine - and clusters
* Familiar API (Similar to NumPy, Pandas, SciPy, etc,), and easily implementable

<h1 align="center">Dask Components</h1>

<img src="images/dask-components.svg" hspace="50px" width="60%" alt="Dask logo\">

## Dask Schdeuler

Dask has a scheduler which is deployed in a number of ways - and can easily scale to a cluster. For the purpose of showing the
speed of Dask, we will be executing on a single-machine, but will utilize the scheduler's ability so spread across processes and threads
to speed up the process.

Furthermore, there is a Dask Web UI we can use to monitor jobs.

In [2]:
from dask.distributed import Client 
client = Client() # Dask Scheduler Started at port 8787

## Dask Dataframe: Stacked Pandas Dataframes

While thinking about Dask Dataframes, we should think about them as a collection of Pandas dataframes. Dask dataframes can come from a variety of formats, as well as write to a variety of formats.

<img src="images/dask-dataframe.svg" width="400px">

In the sample below, we get data from a `.csv` file as normal. 

In [3]:
import dask.dataframe as dd
df = dd.read_csv('taxi_trips.csv')
df.trip_start_timestamp = df.trip_start_timestamp.astype(dtype='datetime64[ns]')
df = df[(df.fare != 0) & (df.fare != 9999.99)]
labels = ['company', 'payment_type', 'trip_id', 'taxi_id', 'extras', 'pickup_census_tract']
df = df.drop(labels, axis=1)
df = df.set_index('trip_start_timestamp', shuffle='tasks')

Using dask.dataframe.npartitions, I can peek at how many dataframes make-up
the entire dataset.

Furthermore, I computed the size of the entire dataset, and recorded that below for fun.

```Python
sz = df.size
print(sz.compute()) # 2708641296
```


In [4]:
df.npartitions

776

These are lazily evaluvated, and thus we can look at the relations between frames.

In [5]:
print(df.columns)

Index(['dropoff_centroid_longitude', 'tax', 'fare', 'dropoff_community_area',
       'trip_total', 'tips', 'tolls', 'trip_miles', 'pickup_centroid_location',
       'trip_end_timestamp', 'pickup_centroid_latitude',
       'dropoff_centroid_location', 'dropoff_census_tract',
       'pickup_centroid_longitude', 'trip_seconds',
       'dropoff_centroid_latitude', 'pickup_community_area'],
      dtype='object')


### Basic Data Cleansing

So I cheated, and I saw some outliers while running the computation. I made the decision to drop these row based on the fact that
they make no sense: while the trip might have taken place, in terms of the monetary calculation, it does not make sense when the
max is 9999.99 and the min is 0.00 for the fare of the taxi - we can (probably) make the assumption that nobody paid nothing and that nobody paid
9999.99 for a ride...



In [6]:

# large_future
# future = client.submit(lambda x: x.mean().compute(), df.loc[f'2015-01-01':f'2015-12-31', 'fare'])

In [7]:
df_15 = df.loc['2015-01-01':'2015-12-31']

In [11]:
fare = df['fare'].groupby(df['dropoff_community_area']).sum()

In [12]:
# def gen_futures(df, colname):
#     """
#     Returns a list of futures that require computation.
    
#     Parameters: 
#     arg1 (dask dataframe): Dask Dataframe that has been persisted for computational purposes. 
#     arg2 (string colname): String containing the column name to be processed.
    
#     Returns: 
#     list: A list containing futures.
#     """
#     stats = []
#     stats.extend((df[colname].min(), df[colname].max(), df[colname].mean(),df[colname].sum()))
#     return stats

# df_15.visualize()
# fare = gen_futures(df_15, 'fare')
# fare
# fares = client.gather(fare)
# df_15 = df_15.persist()
# df_15.visualize()
fare

Dask Series Structure:
npartitions=1
    float64
        ...
Name: fare, dtype: float64
Dask Name: series-groupby-sum-agg, 64242 tasks

In [8]:
# future = client.persist(df_15)
# for i in fare:
#     i.compute()
#     i
# df_15.visualize()
# fare = gen_futures(future, 'fare')
# for i in fare: 
#     i.result()

In [9]:
# future

Unnamed: 0_level_0,dropoff_centroid_longitude,tax,fare,taxi_id,payment_type,dropoff_community_area,trip_total,tips,tolls,trip_id,trip_miles,pickup_centroid_location,trip_end_timestamp,extras,pickup_census_tract,pickup_centroid_latitude,dropoff_centroid_location,company,dropoff_census_tract,pickup_centroid_longitude,trip_seconds,dropoff_centroid_latitude,pickup_community_area
npartitions=182,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1
2015-01-01 00:00:00.000000000,float64,float64,float64,object,object,float64,float64,float64,float64,object,float64,object,object,float64,float64,float64,object,object,float64,float64,float64,float64,float64
2015-01-01 13:30:00.000000000,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2015-12-29 15:48:57.740842240,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2015-12-31 23:59:59.999999999,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [16]:
# def gen_statistics(df):
#     stats = []
#     fare = gen_futures(df, 'fare')
#     tips = gen_futures(df, 'tips')
#     tax = gen_futures(df, 'tax')
#     tolls = gen_futures(df, 'tolls')
#     stats.extend((fare, tips, tax, tolls))
    
#     return stats

# stats_2015_list = gen_statistics(future)
# stats_2015_list

# for i in stats_2015_list:
#     results = client.gather(i)

[[dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>],
 [dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>],
 [dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>],
 [dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>,
  dd.Scalar<series-..., dtype=float64>]]

In [None]:
# df_13 = df.loc['2013-01-01':'2013-12-31']

## Dask Array: Collection of NumPy Arrays

Dask Array is 


An in-depth explaintion of the code for the arr_methods: 

* arr.to_dask_array transforms the dataframe slice into a dask datframe to operate on.
* keepdims=FALSE allows the matrix to be modified and reduced to a single numeric object. Otherwise, the shape would be kept intact.
* nan(operation) allows us to look at the totals without having to account for NaN values. 
* `.compute` tells the operation to actually _compute_ in the backend. Since we are expecting a Future, we shall not worry about the Python Process.

In [39]:
"""
Generate some basic statistics 
"""
def arr_avg(arr, col_name):
    return dask.array.nanmean(arr[col_name].to_dask_array(), keepdims=False).compute()

def arr_min(arr):
    return dask.array.nanmin(arr, keepdims=False).compute()

def arr_max(arr, col_name):
    return dask.array.nanmax(arr[col_name].to_dask_array(), keepdims=False).compute()

def arr_sum(arr):
    return dask.array.nansum(arr, keepdims=False).compute()

def gen_futures(arr):
    """
    Generate a list of futures
    that will be submitted as a list to 
    Dask workers that are going to be calculated.
    """
    totals = []
    tavg = client.submit(arr_avg, future)
    tmin = client.submit(arr_min, df.loc['2013-01-01':'2013-12-31'].fare.to_dask_array())
    tmin = df.fare.min
    tmax = client.submit(arr_max, big_future)
    tsum = client.submit(arr_sum, big_future)
    totals.extend((tavg, tsum, tmax, tmin))
    return totals
# tavg = client.submit(lambda x: x., df.fare.to_dask_array())
# tavg

In [19]:
df = df.drop(['pickup_centroid_location', 'dropoff_centroid_location'], axis=1)
df_13 = df.loc['2013-01-01':'2013-12-31']
df_14 = df.loc['2014-01-01':'2014-12-31']
df_15 = df.loc['2015-01-01':'2015-12-31']
df_16 = df.loc['2016-01-01':'2016-12-31']
df_17 = df.loc['2017-01-01':'2017-12-31']
df_18 = df.loc['2018-01-01':'2018-12-31']
df_19 = df.loc['2019-01-01':'2019-12-31']
print(df_13.npartitions,
df_14.npartitions,
df_15.npartitions,
df_16.npartitions,
df_17.npartitions,
df_18.npartitions,
df_19.npartitions)



178 223 182 141 56 1 1


In [23]:
future = client.scatter(df_17)

In [28]:
future_2 = client.submit(arr_avg, future, 'fare')
# for year in range(2013, 2020):    
#     df_15 = df.loc[f'{year}-01-01':f'{year}-12-31']
#     df[f''].fare.min()
#     df[f''].fare.max()
#     df[f''].fare.mean()
#     df[f''].fare.sum()
#     client.persist(df)

In [40]:
# mean = future_2.result()
future_3 = client.submit(arr_max, future, 'fare')

In [8]:
df_13 = df.loc['2013-01-01':'2013-12-31']
df_14 = df.loc['2014-01-01':'2014-12-31']
df_15 = df.loc['2015-01-01':'2015-12-31']
df_16 = df.loc['2016-01-01':'2016-12-31']
df_17 = df.loc['2017-01-01':'2017-12-31']
df_18 = df.loc['2018-01-01':'2018-12-31']

dfs = [df_13, df_!4...]

for df in dfs
    future = client.persist(df)
    # run the rest of analysis 

trips_2015.size.compute() = 630217112

In [15]:
future = client.persist(df_15)
# tavg = client.submit(arr_avg, future, 'fare')
# 630217112/2708641296 * 776

In [16]:
future

Unnamed: 0_level_0,dropoff_centroid_longitude,tax,fare,taxi_id,payment_type,dropoff_community_area,trip_total,tips,tolls,trip_id,trip_miles,pickup_centroid_location,trip_end_timestamp,extras,pickup_census_tract,pickup_centroid_latitude,dropoff_centroid_location,company,dropoff_census_tract,pickup_centroid_longitude,trip_seconds,dropoff_centroid_latitude,pickup_community_area
npartitions=182,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1
2015-01-01 00:00:00.000000000,float64,float64,float64,object,object,float64,float64,float64,float64,object,float64,object,object,float64,float64,float64,object,object,float64,float64,float64,float64,float64
2015-01-01 13:30:00.000000000,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2015-12-29 15:48:57.740842240,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2015-12-31 23:59:59.999999999,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [17]:
del(df_15)

In [18]:
df_15

NameError: name 'df_15' is not defined

In [10]:
tavg.submit(arr_avg, future, 'fare')

### A short aside: Compute vs Result. 

In Dask, we have the ability to lazily compute everything by first building a task graph, and then submitting. 
We can utilize this to compute across multiple cores and only compute certain results when we need to. Dask extends
the normal futures found in Python and expands that. Result returns a future, and then, that future still needs to
be computed. In the example above, this is what is happening. 

`compute` allows me to actually send the computation out to the nodes. This way, we can also "chain" operations together before they are done.

Also, output for computation was as follows (just believe me here) so I had to go ahead and clean it. 
I mean, how can a fare be 9999.99 and 0.0? Just doesn't make sense.
```
average: 12.800860062723588
total: 1444691984.2
max: 9999.99
min: 0.0
```

In [9]:
fare = gen_futures(df_13.fare.to_dask_array())

  (dask.array<values, shape=(nan,), dtype=float64, chunksize=(nan,)>,)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


In [10]:
print(fare)

[<Future: status: pending, key: arr_avg-d1c135ab0c3cf9371de359b4cf4bd4a3>, <Future: status: pending, key: arr_sum-df46c20d56f7466c5d64fe55fddff770>, <Future: status: pending, key: arr_max-557adfe028be676b09654f9856001cea>, <Future: status: pending, key: arr_min-6cd3650d8c3356680559177564b49b5b>]




As we can peek at, when things are computed and stored in memory, no matter how we access the object it will be finished.
While these run in the background, and we wait to compute them a tad bit later, we will look at how to pass multiple
parameters to a function with futures.

I will calculate the same statistics for taxes, tips and tolls.

In [11]:
tax = gen_futures(df.tax.to_dask_array())
tips = gen_futures(df.tips.to_dask_array())
tolls = gen_futures(df.tolls.to_dask_array())

Now, all of the objects that have been generated are lists of futures.
Calling `.result()` on the objects within the lists will block _all_ processes and dedicate
all resources to compute that result. Instead, we will use a special method: `client.gather(collection_of_futures)`
after checking that they have all been computed.


In [None]:
fare_summ = client.gather(fare)
tax_summ = client.gather(tax)
tips_summ = client.gather(tips)
tolls_summ = client.gather(tolls)

Now that we have reduced the dataframe, let's use pandas in order to generate some small in-memory dataframes!

In [None]:
import pandas as pd

df_summ = pd.DataFrame([fare_summ, tax_summ, tips_summ, tolls_summ], index =['fare', 'tax', 'tolls', 'tips'], 
                                              columns =['Average', 'Sum', 'Max', 'Min']) 

In [None]:
print(df_summ)

Splitting up the data in the dask dataframe by index so that we can query it and ask it to generate us cool things

In [51]:
print(date_arr.compute())

['2016-12-07T09:30:00.000000000' '2015-03-08T15:45:00.000000000'
 '2014-04-09T19:45:00.000000000' ... '2013-06-23T16:45:00.000000000'
 '2015-11-09T09:45:00.000000000' '2015-06-30T19:15:00.000000000']


In [53]:
import time

In [56]:
trips_2013.npartitions

178

In [57]:
import datashader as ds
from colorcet import fire
from datashader import transfer_functions as tf

In [None]:
agg = ds.Canvas().points(trips_2013, 'dropoff_centroid_latitude', 'dropoff_centroid_longitude')
tf.set_background(tf.shade(agg, cmap=fire),"black")