# Download NYC Yellow Taxi Cab Dataset for 2019

A typical data science workflow starts with some data that needs to be understood. A typical first step is data cleaning and exploratory analysis to find interesting details and patterns.

In this notebook, we will be working with the New York City Yellow Taxi Trips Dataset for 2019.

#### we can directly read the file into pandas df

In [1]:
%%time

import pandas as pd

df = pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv")

df

Wall time: 3min 49s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14.0,0.5,0.5,1.00,0.0,0.3,16.30,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.00,1,N,236,236,1,4.5,0.5,0.5,0.00,0.0,0.3,5.80,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.00,1,N,193,193,2,3.5,0.5,0.5,0.00,0.0,0.3,7.55,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.00,2,N,193,193,2,52.0,0.0,0.5,0.00,0.0,0.3,55.55,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7667787,2,2019-01-31 23:57:36,2019-02-01 00:18:39,1,4.79,1,N,263,4,1,18.0,0.5,0.5,3.86,0.0,0.3,23.16,0.0
7667788,2,2019-01-31 23:32:03,2019-01-31 23:33:11,1,0.00,1,N,193,193,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667789,2,2019-01-31 23:36:36,2019-01-31 23:36:40,1,0.00,1,N,264,264,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667790,2,2019-01-31 23:14:53,2019-01-31 23:15:20,1,0.00,1,N,264,7,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0


In [3]:
%%time

# lets evaluate time it takes to do a calculation
df.groupby("passenger_count").tip_amount.mean()

Wall time: 150 ms


passenger_count
0    1.786901
1    1.828308
2    1.833877
3    1.795579
4    1.702710
5    1.869868
6    1.856830
7    6.542632
8    6.480690
9    3.116667
Name: tip_amount, dtype: float64

## Reading and working with tabular data using Dask DataFrame

## Reading data

Dask can be used to scale pandas to larger datasets. Dask's DataFrame API has the same functions as the pandas API because it's a wrapper around pandas. This makes Dask code familiar and easy to use.

First, spin up a cluster!

In [4]:
from dask.distributed import Client

client = Client(n_workers=10)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 59630 instead


0,1
Client  Scheduler: tcp://127.0.0.1:59631  Dashboard: http://127.0.0.1:59630/status,Cluster  Workers: 10  Cores: 20  Memory: 15.74 GiB


Open the Dask Dashboard in JupyterLab -- Cluster Map, Task Stream, and Dask workers

* __Cluster map__ (also called the pew-pew map) visualizes interactions between the scheduler and the workers.
* __Task stream__ shows tasks performed by each worker in real-time.
* __Dask workers__ displays CPU and memory being used by each worker.

The same reading operation with Dask, but this time read the complete dataset - data for all the years.

In [9]:
%%time

# lets time the same dataframe using Dask
import aiohttp
import requests
import dask.dataframe as dd

df = dd.read_csv(r"https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv")

df

  partials[d] = partials_get(d, 0) + n


Wall time: 8.92 s


Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
npartitions=11,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,int64,object,object,int64,float64,int64,object,int64,int64,int64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


  partials[d] = partials_get(d, 0) + n


### Working with the data

The same computation (to calculate mean for the tip amount as a function of passenger count) is now performed on the entire dataset using Dask DataFrame.

Note that Dask code is similar to pandas code.

In [12]:
%%time

mean_tip_amount = df.groupby("passenger_count").tip_amount.mean()
mean_tip_amount

Wall time: 165 ms


Dask Series Structure:
npartitions=1
    float64
        ...
Name: tip_amount, dtype: float64
Dask Name: truediv, 40 tasks

Dask DataFrame is backed by the Delayed API we saw in the previous notebook, so the evaluations here are also lazy.

#### You can use compute() to get the output.

In [13]:
%%time

mean_tip_amount.compute()

  partials[d] = partials_get(d, 0) + n


Wall time: 2min 52s


passenger_count
0    1.786901
1    1.828308
2    1.833877
3    1.795579
4    1.702710
5    1.869868
6    1.856830
7    6.542632
8    6.480690
9    3.116667
Name: tip_amount, dtype: float64

### Sharing intermediate outputs

Sometimes individual computations may related to each other, and can benefit from sharing intermediate results. For example, computing minimum and maximum values.

In pandas (and therefore in Dask DataFrame), you can use min() and max() to compute minimum and maximum respectively.

In [14]:
max_tip_amount = df.tip_amount.max()
min_tip_amount = df.tip_amount.min()

### Without Sharing

In [17]:
%%time
max_tip = max_tip_amount.compute()
min_tip = min_tip_amount.compute()

Wall time: 4min 4s


### With Sharing

In [19]:
%%time
import dask

max_tip, min_tip = dask.compute(max_tip_amount, min_tip_amount)

Wall time: 1min 37s


# Limitations of Dask DataFrame
Dask DataFrame API does not implement the complete pandas interface because some pandas operations are not suited for a parallel and distributed environment.

## Data Shuffling
Dask DataFrames consist of multiple pandas dataframes, each of which has it's index starting from zero. Some operations like indexing (set_index, reset_index) may need the data to be sorted, which requires a lot of time-consuming shuffling of data. These operations are slower in Dask. Hence, presorting the index and making logical partitions are good practices.


## References
* [Dask DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)
* [Dask DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html)
* [Dask DataFrame examples](https://examples.dask.org/dataframe.html)
* [Dask Tutorial - DataFrames](https://github.com/pavithraes/dask-tutorial/blob/master/04_dataframe.ipynb)