# Python Dask

For those of you that are familiar with Pandas, Dask is simply a scalable framework that wraps around libraries like Pandas. A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask DataFrame operation triggers many operations on the constituent Pandas DataFrames.In this lecture, we're going to deep dive into using Dask and apply what we've learned in previous lectures to understand why the code works the way that it does.

## Getting Started
First, we are going import a dask dataframe. We start very similar to the way that we start with Pandas 

In [57]:
import dask.dataframe as dd

Let's now download some data for us to analyze. We will download New York's Taxi data from the first few months of 2020.

```
https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-01.csv
https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-02.csv
https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-03.csv
https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-04.csv
https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-05.csv
https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-06.csv
```
Here's a little trick that you should learn, you can download files directly from the command line using the wget command.

In [5]:
! wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-01.csv

--2022-05-11 09:33:29--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-01.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.192.73
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.192.73|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1303901601 (1.2G) [text/csv]
Saving to: ‘fhvhv_tripdata_2020-01.csv’


2022-05-11 09:34:48 (15.8 MB/s) - ‘fhvhv_tripdata_2020-01.csv’ saved [1303901601/1303901601]



In [6]:
! wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-02.csv
! wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-03.csv
! wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-04.csv
! wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-05.csv
! wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-06.csv

--2022-05-11 09:37:27--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.245.12
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.245.12|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1377036387 (1.3G) [text/csv]
Saving to: ‘fhvhv_tripdata_2020-02.csv’


2022-05-11 09:38:54 (15.0 MB/s) - ‘fhvhv_tripdata_2020-02.csv’ saved [1377036387/1377036387]

--2022-05-11 09:38:55--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2020-03.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.9.195
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.9.195|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 847836327 (809M) [text/csv]
Saving to: ‘fhvhv_tripdata_2020-03.csv’


2022-05-11 09:39:45 (16.0 MB/s) - ‘fhvhv_tripdata_2020-03.csv’ saved [847836327/847836327]

--2022-05-11 09:39:

Now, let's load some data into this notebook.

In [58]:
df = dd.read_csv('tripdata/fhvhv_tripdata_2020-01.csv')
df.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
0,HV0003,B02864,2020-01-01 00:45:34,2020-01-01 01:02:20,148,90,
1,HV0003,B02682,2020-01-01 00:47:50,2020-01-01 00:53:23,114,79,
2,HV0003,B02764,2020-01-01 00:04:37,2020-01-01 00:21:49,4,125,
3,HV0003,B02764,2020-01-01 00:26:36,2020-01-01 00:33:00,231,113,
4,HV0003,B02764,2020-01-01 00:37:49,2020-01-01 00:46:59,114,144,


Let's do some basic analysis, how many rows are there in this dataset?

In [38]:
%time  len(df) #why does this take longer than loading

CPU times: user 30.9 s, sys: 5.32 s, total: 36.3 s
Wall time: 23 s


20569325

Why does this take longer than loading? Systems like Dask are often designed to load very large amounts of data from different files. For example, we can run code that looks like this:

In [131]:
df = dd.read_csv('tripdata/fhvhv_tripdata_2020-*.csv')
df.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
0,HV0003,B02864,2020-01-01 00:45:34,2020-01-01 01:02:20,148,90,
1,HV0003,B02682,2020-01-01 00:47:50,2020-01-01 00:53:23,114,79,
2,HV0003,B02764,2020-01-01 00:04:37,2020-01-01 00:21:49,4,125,
3,HV0003,B02764,2020-01-01 00:26:36,2020-01-01 00:33:00,231,113,
4,HV0003,B02764,2020-01-01 00:37:49,2020-01-01 00:46:59,114,144,


Running code on these "lazy" DataFrames is a little more nuanced than simply using pandas. Let's see what happens with a simple example. Suppose, I ran

In [41]:
df[df['hvfhs_license_num'] == 'HV0003'] .count()

Dask Series Structure:
npartitions=1
DOLocationID       int64
pickup_datetime      ...
dtype: int64
Dask Name: dataframe-count-agg, 386 tasks

It returns almost immediately but doesn't give me the result. `df.count()` is description of what computation to do, and doesn't actually do the computation yet. To realize the answer, we need to explicitly run .compute()

In [42]:
%time df.count().compute()

CPU times: user 2min 17s, sys: 20.8 s, total: 2min 38s
Wall time: 1min 44s


hvfhs_license_num       73640833
dispatching_base_num    73640833
pickup_datetime         73640833
dropoff_datetime        73640833
PULocationID            73640833
DOLocationID            73640833
SR_Flag                  6896323
dtype: int64

We can also do some more complicated analyses. Let's try to figure out the average length of a trip.

In [140]:
df['dropoff_datetime_py'] = dd.to_datetime(df['dropoff_datetime'])
df.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag,pickup_datetime_py,dropoff_datetime_py
0,HV0003,B02864,2020-01-01 00:45:34,2020-01-01 01:02:20,148,90,,2020-01-01 00:45:34,2020-01-01 01:02:20
1,HV0003,B02682,2020-01-01 00:47:50,2020-01-01 00:53:23,114,79,,2020-01-01 00:47:50,2020-01-01 00:53:23
2,HV0003,B02764,2020-01-01 00:04:37,2020-01-01 00:21:49,4,125,,2020-01-01 00:04:37,2020-01-01 00:21:49
3,HV0003,B02764,2020-01-01 00:26:36,2020-01-01 00:33:00,231,113,,2020-01-01 00:26:36,2020-01-01 00:33:00
4,HV0003,B02764,2020-01-01 00:37:49,2020-01-01 00:46:59,114,144,,2020-01-01 00:37:49,2020-01-01 00:46:59


In [139]:
df['pickup_datetime_py'] = dd.to_datetime(df['pickup_datetime'])

In [141]:
df['difference'] = (df['dropoff_datetime_py'] - df['pickup_datetime_py'])

In [51]:
df.head(30)

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag,dropoff_datetime_py,pickup_datetime_py,difference,difference_seconds
0,HV0003,B02864,2020-01-01 00:45:34,2020-01-01 01:02:20,148,90,,2020-01-01 01:02:20,2020-01-01 00:45:34,0 days 00:16:46,1006.0
1,HV0003,B02682,2020-01-01 00:47:50,2020-01-01 00:53:23,114,79,,2020-01-01 00:53:23,2020-01-01 00:47:50,0 days 00:05:33,333.0
2,HV0003,B02764,2020-01-01 00:04:37,2020-01-01 00:21:49,4,125,,2020-01-01 00:21:49,2020-01-01 00:04:37,0 days 00:17:12,1032.0
3,HV0003,B02764,2020-01-01 00:26:36,2020-01-01 00:33:00,231,113,,2020-01-01 00:33:00,2020-01-01 00:26:36,0 days 00:06:24,384.0
4,HV0003,B02764,2020-01-01 00:37:49,2020-01-01 00:46:59,114,144,,2020-01-01 00:46:59,2020-01-01 00:37:49,0 days 00:09:10,550.0
5,HV0003,B02764,2020-01-01 00:49:23,2020-01-01 01:07:26,144,137,,2020-01-01 01:07:26,2020-01-01 00:49:23,0 days 00:18:03,1083.0
6,HV0003,B02870,2020-01-01 00:21:11,2020-01-01 00:36:58,249,148,,2020-01-01 00:36:58,2020-01-01 00:21:11,0 days 00:15:47,947.0
7,HV0003,B02870,2020-01-01 00:38:28,2020-01-01 00:42:38,148,4,,2020-01-01 00:42:38,2020-01-01 00:38:28,0 days 00:04:10,250.0
8,HV0003,B02870,2020-01-01 00:46:26,2020-01-01 01:09:55,79,7,,2020-01-01 01:09:55,2020-01-01 00:46:26,0 days 00:23:29,1409.0
9,HV0003,B02836,2020-01-01 00:15:35,2020-01-01 00:23:21,140,236,,2020-01-01 00:23:21,2020-01-01 00:15:35,0 days 00:07:46,466.0


How do you think the above code works with lazy evaluation?

In [142]:
df['difference_seconds'] = df['difference'].dt.total_seconds()
df.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag,pickup_datetime_py,dropoff_datetime_py,difference,difference_seconds
0,HV0003,B02864,2020-01-01 00:45:34,2020-01-01 01:02:20,148,90,,2020-01-01 00:45:34,2020-01-01 01:02:20,0 days 00:16:46,1006.0
1,HV0003,B02682,2020-01-01 00:47:50,2020-01-01 00:53:23,114,79,,2020-01-01 00:47:50,2020-01-01 00:53:23,0 days 00:05:33,333.0
2,HV0003,B02764,2020-01-01 00:04:37,2020-01-01 00:21:49,4,125,,2020-01-01 00:04:37,2020-01-01 00:21:49,0 days 00:17:12,1032.0
3,HV0003,B02764,2020-01-01 00:26:36,2020-01-01 00:33:00,231,113,,2020-01-01 00:26:36,2020-01-01 00:33:00,0 days 00:06:24,384.0
4,HV0003,B02764,2020-01-01 00:37:49,2020-01-01 00:46:59,114,144,,2020-01-01 00:37:49,2020-01-01 00:46:59,0 days 00:09:10,550.0


In [35]:
df['difference_seconds'].mean().compute(), df['difference_seconds'].std().compute()

(1013.1593988487339, 691.1400486498965)