# Pandas to Dask to Coiled

We scale a simple ETL workflow from a single thread to the cloud

1.  Pandas on a sample of data
2.  Pandas on a complete file
3.  Dask + Pandas on a set of files in parallel
4.  Coiled + Dask + Pandas to run on the cloud

## Pandas: Convert CSV to Parquet

### Download the data from Amazon

### Investigate data locally with Pandas


In [2]:
import pandas as pd
df = pd.read_csv(
    "data_taxi/yellow_tripdata_2019-10.csv", 
    nrows=10000,
)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-10-01 00:19:55,2019-10-01 00:23:57,1,0.40,1,N,48,163,2,4.5,3.0,0.5,0.00,0.00,0.3,8.30,2.5
1,1,2019-10-01 00:40:19,2019-10-01 00:55:17,2,4.30,1,N,144,141,1,14.5,3.0,0.5,2.00,0.00,0.3,20.30,2.5
2,1,2019-10-01 00:06:52,2019-10-01 00:21:23,1,5.00,1,N,137,80,1,17.0,3.0,0.5,5.20,0.00,0.3,26.00,2.5
3,2,2019-10-01 00:36:08,2019-10-01 00:36:15,1,0.00,1,N,25,25,4,-2.5,-0.5,-0.5,0.00,0.00,-0.3,-3.80,0.0
4,2,2019-10-01 00:36:08,2019-10-01 00:36:15,1,0.00,1,N,25,25,2,2.5,0.5,0.5,0.00,0.00,0.3,3.80,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,2,2019-10-01 06:52:13,2019-10-01 07:03:36,2,3.21,1,N,161,231,2,11.0,0.0,0.5,0.00,0.00,0.3,14.30,2.5
9996,1,2019-10-01 06:51:25,2019-10-01 07:15:12,1,0.00,1,N,42,146,1,24.2,0.0,0.5,0.00,6.12,0.3,31.12,0.0
9997,2,2019-10-01 06:23:53,2019-10-01 06:26:30,1,0.52,1,N,263,263,2,4.0,0.0,0.5,0.00,0.00,0.3,7.30,2.5
9998,2,2019-10-01 06:41:37,2019-10-01 06:52:00,1,2.20,1,N,236,163,1,9.5,0.0,0.5,1.54,0.00,0.3,14.34,2.5


In [3]:
!head data_taxi/yellow_tripdata_2019-01.csv

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7,0.5,0.5,1.65,0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14,0.5,0.5,1,0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,.00,1,N,236,236,1,4.5,0.5,0.5,0,0,0.3,5.8,
2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,.00,1,N,193,193,2,3.5,0.5,0.5,0,0,0.3,7.55,
2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,.00,2,N,193,193,2,52,0,0.5,0,0,0.3,55.55,
2,2018-11-28 16:25:49,2018-11-28 16:28:26,5,.00,1,N,193,193,2,3.5,0.5,0.5,0,5.76,0.3,13.31,
2,2018-11-28 16:29:37,2018-11-28 16:33:43,5,.00,2,N,193,193,2,52,0,0.5,0,0,0.3,55.55,
1,2019-01-01 00:21:28,2019-01-01 00:28:37,1,1.30,1,N,163,229,1,6.5,0.5,0.5,1.25,0,0.3,9.05,
1,2019-01-01 00:32:01,2019-01-01 0

### Massage data types 

Before we convert to parquet format, let's clean up our types a little

In [4]:
%%time

#import pandas as pd

df = pd.read_csv(
    "data_taxi/yellow_tripdata_2019-01.csv", 
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
)
df

CPU times: user 14.4 s, sys: 3.76 s, total: 18.2 s
Wall time: 18.5 s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14.0,0.5,0.5,1.00,0.0,0.3,16.30,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.00,1,N,236,236,1,4.5,0.5,0.5,0.00,0.0,0.3,5.80,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.00,1,N,193,193,2,3.5,0.5,0.5,0.00,0.0,0.3,7.55,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.00,2,N,193,193,2,52.0,0.0,0.5,0.00,0.0,0.3,55.55,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7667787,2,2019-01-31 23:57:36,2019-02-01 00:18:39,1,4.79,1,N,263,4,1,18.0,0.5,0.5,3.86,0.0,0.3,23.16,0.0
7667788,2,2019-01-31 23:32:03,2019-01-31 23:33:11,1,0.00,1,N,193,193,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667789,2,2019-01-31 23:36:36,2019-01-31 23:36:40,1,0.00,1,N,264,264,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667790,2,2019-01-31 23:14:53,2019-01-31 23:15:20,1,0.00,1,N,264,7,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0


In [5]:
df = df.astype({
    "VendorID": "uint8",
    "passenger_count": "uint8",
    "RatecodeID": "uint8",
    "store_and_fwd_flag": "category",
    "PULocationID": "uint16",
    "DOLocationID": "uint16",    
})

df["tip_amount"] = df.total_amount / df.tip_amount


## Convert to Parquet

In [6]:
%%time
df.to_parquet("data_taxi/yellow_tripdata_2019-01.parq")

CPU times: user 5.17 s, sys: 1.96 s, total: 7.13 s
Wall time: 5.43 s


In [10]:
%%time
df = pd.read_parquet("data_taxi/yellow_tripdata_2019-01.parq", columns=["passenger_count"])

CPU times: user 54.7 ms, sys: 139 ms, total: 194 ms
Wall time: 245 ms


## Operate on many files in a for loop?

We could do this, but it's unpleasant

```python
for filename in glob("~/data/nyctaxi/yellow_tripdata_2019-*.parq"):
    df = pd.read_csv(filename)
    ...
    df.to_parquet(...)
```

## Use Dask locally to process the full dataset

In [11]:
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:50643  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.59 GB


In [12]:
%%time

import dask.dataframe as dd

df = dd.read_csv(
    "data_taxi/yellow_tripdata_2019-*.csv", 
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype={'RatecodeID': 'float64',
       'VendorID': 'float64',
       'passenger_count': 'float64',
       'payment_type': 'float64'}

)
df

CPU times: user 234 ms, sys: 63.3 ms, total: 297 ms
Wall time: 395 ms


Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
npartitions=117,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,float64,datetime64[ns],datetime64[ns],float64,float64,float64,object,int64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [13]:
df = df.astype({
    "VendorID": "UInt8",
    "passenger_count": "UInt8",
    "RatecodeID": "UInt8",
    "store_and_fwd_flag": "category",
    "PULocationID": "UInt16",
    "DOLocationID": "UInt16",    
})

df["tip_amount"] = df.total_amount / df.tip_amount

In [14]:
%%time
df.to_parquet("data_taxi/yellow_tripdata_2019.parq")

CPU times: user 28.8 s, sys: 5.7 s, total: 34.5 s
Wall time: 4min 40s


## We could target the data on S3 directly

This data lives on Amazon S3, in a bucket by the NYC Taxi and Livery Commision, `nyc-tlc`

In [None]:
import s3fs

s3 = s3fs.S3FileSystem()

In [None]:
s3.ls("nyc-tlc/trip data")

In [None]:
with s3.open("nyc-tlc/trip data/yellow_tripdata_2019-12.csv") as f:
    print(f.read(1000))

## Work directly from the cloud with Coiled 

It will take about a minute to provision our resources on the cloud

In [17]:
! pip install coiled

Collecting coiled
  Downloading coiled-0.0.23-py3-none-any.whl (44 kB)
[K     |████████████████████████████████| 44 kB 855 kB/s eta 0:00:01
Collecting aiobotocore>=1.0.7
  Downloading aiobotocore-1.1.1-py3-none-any.whl (45 kB)
[K     |████████████████████████████████| 45 kB 214 kB/s eta 0:00:01
Collecting s3fs
  Using cached s3fs-0.5.0-py3-none-any.whl (21 kB)
Collecting aioitertools>=0.5.1
  Using cached aioitertools-0.7.0-py3-none-any.whl (20 kB)
Collecting botocore<1.17.45,>=1.17.44
  Downloading botocore-1.17.44-py2.py3-none-any.whl (6.5 MB)
[K     |████████████████████████████████| 6.5 MB 596 kB/s eta 0:00:01
[?25hProcessing /Users/hugobowne-anderson/Library/Caches/pip/wheels/5f/fd/9e/b6cf5890494cb8ef0b5eaff72e5d55a70fb56316007d6dfe73/wrapt-1.12.1-cp38-cp38-macosx_10_9_x86_64.whl
Collecting docutils<0.16,>=0.10
  Using cached docutils-0.15.2-py3-none-any.whl (547 kB)
Collecting jmespath<1.0.0,>=0.7.1
  Using cached jmespath-0.10.0-py2.py3-none-any.whl (24 kB)
Installing collec

In [2]:
! coiled login --token df30e37536bf6205e210aaefde5541d76f50317a

Credentials have been saved at /Users/hugobowne-anderson/.config/dask/coiled.yaml


In [5]:
from dask.distributed import LocalCluster, Client

In [None]:
#import coiled

cluster = coiled.Cluster(n_workers=10, configuration="parquet")


In [6]:
client = Client(cluster)
client


+-------------+--------+-----------+---------+
| Package     | client | scheduler | workers |
+-------------+--------+-----------+---------+
| blosc       | None   | 1.9.1     | 1.9.1   |
| cloudpickle | 1.6.0  | 1.5.0     | 1.5.0   |
| dask        | 2.25.0 | 2.23.0    | 2.23.0  |
| distributed | 2.25.0 | 2.23.0    | 2.23.0  |
| lz4         | None   | 3.1.0     | 3.1.0   |
+-------------+--------+-----------+---------+


0,1
Client  Scheduler: tls://ec2-18-218-120-88.us-east-2.compute.amazonaws.com:8786  Dashboard: http://ec2-18-218-120-88.us-east-2.compute.amazonaws.com:8787/status,Cluster  Workers: 10  Cores: 40  Memory: 171.80 GB


In [None]:
import dask.dataframe as dd
df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv", 
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype={
        'RatecodeID': 'float64',
       'VendorID': 'float64',
       'passenger_count': 'float64',
       'payment_type': 'float64'
    },
    storage_options={"anon":True}
)
df

In [None]:
df = df.astype({
    "VendorID": "UInt8",
    "passenger_count": "UInt8",
    "RatecodeID": "UInt8",
    "store_and_fwd_flag": "category",
    "PULocationID": "UInt16",
    "DOLocationID": "UInt16",    
})

df["tip_amount"] = df.total_amount / df.tip_amount

In [None]:
%%time
df.to_parquet("s3://coiled-data/nyctaxi-2019.parq")

In [None]:
cluster.close()