# Pandas to Dask to Coiled

We scale a simple ETL workflow from a single thread to the cloud

1.  Pandas on a sample of data
2.  Pandas on a complete file
3.  Dask + Pandas on a set of files in parallel
4.  Coiled + Dask + Pandas to run on the cloud

## Pandas: Convert CSV to Parquet

### Download the data from Amazon

In [None]:
!wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-{01..12}.csv

In [None]:
!ls /home/mrocklin/data/nyctaxi/

### Investigate data locally with Pandas


In [None]:
import pandas as pd

df = pd.read_csv(
    "~/data/nyctaxi/yellow_tripdata_2019-01.csv", 
    nrows=10000,
)
df

In [None]:
!head ~/data/nyctaxi/yellow_tripdata_2019-01.csv

### Massage data types 

Before we convert to parquet format, let's clean up our types a little

In [None]:
%%time

import pandas as pd

df = pd.read_csv(
    "~/data/nyctaxi/yellow_tripdata_2019-01.csv", 
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
)
df

In [None]:
df = df.astype({
    "VendorID": "uint8",
    "passenger_count": "uint8",
    "RatecodeID": "uint8",
    "store_and_fwd_flag": "category",
    "PULocationID": "uint16",
    "DOLocationID": "uint16",    
})

df["tip_amount"] = df.total_amount / df.tip_amount


## Convert to Parquet

In [None]:
%%time
df.to_parquet("~/data/nyctaxi/yellow_tripdata_2019-01.parq")

In [None]:
%%time
df = pd.read_parquet("~/data/nyctaxi/yellow_tripdata_2019-01.parq", columns="passenger_count")

## Operate on many files in a for loop?

We could do this, but it's unpleasant

In [None]:
for filename in glob("~/data/nyctaxi/yellow_tripdata_2019-*.parq"):
    df = pd.read_csv(filename)
    ...
    df.to_parquet(...)

## Use Dask locally to process the full dataset

In [None]:
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
client = Client(cluster)
client

In [None]:
cluster.scale(3)

In [None]:
%%time

import dask.dataframe as dd

df = dd.read_csv(
    "~/data/nyctaxi/yellow_tripdata_2019-*.csv", 
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype={'RatecodeID': 'float64',
       'VendorID': 'float64',
       'passenger_count': 'float64',
       'payment_type': 'float64'}

)
df

In [None]:
df = df.astype({
    "VendorID": "UInt8",
    "passenger_count": "UInt8",
    "RatecodeID": "UInt8",
    "store_and_fwd_flag": "category",
    "PULocationID": "UInt16",
    "DOLocationID": "UInt16",    
})

df["tip_amount"] = df.total_amount / df.tip_amount

In [None]:
%%time
df.to_parquet("~/data/nyctaxi/yellow_tripdata_2019.parq")

## We could target the data on S3 directly

This data lives on Amazon S3, in a bucket by the NYC Taxi and Livery Commision, `nyc-tlc`

In [None]:
import s3fs

s3 = s3fs.S3FileSystem()

In [None]:
s3.ls("nyc-tlc/trip data")

In [None]:
with s3.open("nyc-tlc/trip data/yellow_tripdata_2019-12.csv") as f:
    print(f.read(1000))

## Work directly from the cloud with Coiled 

### Create software environment

We'll need Dask, Arrow, and s3fs

In [None]:
import coiled

In [None]:
coiled.create_software_environment(
    name="parquet", 
    conda=["dask", "pyarrow", "s3fs"],
)

coiled.create_cluster_configuration(
    name="parquet", 
    software="parquet", 
    worker_cpu=4, 
    worker_memory="16 GiB"
)

### Create Cluster

It will take about a minute to provision our resources on the cloud

In [None]:
cluster = CoiledCluster(n_workers=10, configuration="parquet")
client = Client(cluster)

In [None]:
import dask.dataframe as dd
df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv", 
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype={
        'RatecodeID': 'float64',
       'VendorID': 'float64',
       'passenger_count': 'float64',
       'payment_type': 'float64'
    },
)
df

In [None]:
df = df.astype({
    "VendorID": "UInt8",
    "passenger_count": "UInt8",
    "RatecodeID": "UInt8",
    "store_and_fwd_flag": "category",
    "PULocationID": "UInt16",
    "DOLocationID": "UInt16",    
})

In [None]:
%%time
df.to_parquet("s3://coiled-data/nyctaxi-2019.parq")

In [None]:
cluster.close()