## Get better at dask dataframes

In this lesson you will learn some good practices for dask dataframes and dealing with data in general.

## Parquet is where is at!!

You will learn the advantages of working with the parquet data format, and using the Uber/Lyft dataset you will learn to troubleshoot the nuances of working with real data. 


### Work close to your data

To get started when you are working with data that is in the cloud it's always better to work close to your data, to minimize the impact of IO networking. 

In this lesson, we will use coiled clusters that will be created on the same region that our datasets are stored. (the region is `"us-east-2"`)

**NOTE:**
If you do not have access to a coiled cluster you, can follow along just make sure you use the smaller dataset (use the `"0.5GB-"` ones). 

## Parquet vs CSV

Most people are familiarized with csv files, but when it comes to working with data, working with parquet can make a big difference. The Parquet file format is column-oriented and it's designed to efficiently store and retrieve data. 

### Small motivation example: 
Let's see an example where we compare reading the same data but in one case it is stored as `csv` files, while the other as `parquet` files. 

In [None]:
data ={"0.5GB-csv": "s3://coiled-datasets/h2o-benchmark/N_1e7_K_1e2/*.csv",
       "0.5GB-pq": "s3://coiled-datasets/h2o-benchmark/N_1e7_K_1e2_parquet/*.parquet",
       "5GB-csv": "s3://coiled-datasets/h2o-benchmark/N_1e8_K_1e2/*.csv",
       "5GB-pq": "s3://coiled-datasets/h2o-benchmark/N_1e8_K_1e2_parquet/*.parquet",}

In [1]:
import coiled
from dask.distributed import Client
import dask.dataframe as dd
import uuid #to create unique cluster names

In [2]:
import dask
dask.__version__

'2022.12.0'

In [3]:
id_cluster = uuid.uuid4().hex[:4]

In [None]:
%%time
cluster = coiled.Cluster(name=f"dask-tutorial-{id_cluster}",
                        n_workers=10,
                        package_sync=True,
                        backend_options={"region_name": "us-east-2"},
                        );

## maybe use mi6 instead, the default ones are slower...

In [None]:
client = Client(cluster)
client

In [None]:
ddf_csv = dd.read_csv(data["5GB-csv"], storage_options={"anon": True})
ddf_pq = dd.read_parquet(data["5GB-pq"], storage_options={"anon": True})
#dd.read_parquet(data["5GB-pq"], storage_options={"anon": True})

In [None]:
ddf_csv

In [None]:
ddf_pq

In [None]:
%%time
ddf_csv.groupby("id1").agg({"v1": "sum"}).compute()

In [None]:
%%time
ddf_pq.groupby("id1").agg({"v1": "sum"}).compute()

Notice that the `parquet` version without doing much it is already ~5X faster. 

Let's take a look at the memory usage as well as the `dtypes` in both cases.

In [None]:
## memory usage for 1 partition
ddf_csv.partitions[0].memory_usage(deep=True).compute()

In [None]:
ddf_pq.partitions[0].memory_usage(deep=True).compute()

In [None]:
client.shutdown()

### Uber/Lyft data transformation

In the example above we quickly saw that the format in which the data is saved already makes a big difference. But there so much to exploit about the parquet file format. 

Let's work with the data from [High-Volume For-Hire Services](https://www.nyc.gov/site/tlc/businesses/high-volume-for-hire-services.page)

Data dictionary: https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_hvfhs.pdf

In [None]:
import s3fs

s3 = s3fs.S3FileSystem()
files = s3.glob("nyc-tlc/trip data/fhvhv_tripdata_*.parquet")
files[:3]

In [None]:
len(files)

In [None]:
#not sure where the data is but I will write to a bucket in us-east-2
cluster = coiled.Cluster(
    n_workers=10,
    name=f"nyc-uber-lyft-{id_cluster}",
    package_sync=True,
    backend_options={"region": "us-east-2"}, 
    worker_memory="64 GiB", #we know we need a lot of memory from experience
)

In [None]:
client = Client(cluster)
client

## Inspect the data

In [None]:
client.restart()

In [None]:
import dask

In [None]:
ddf = dd.read_parquet(
    "s3://nyc-tlc/trip data/fhvhv_tripdata_*.parquet",
)
ddf

In [None]:
#inspect memory usage of 1 partition
ddf.partitions[0].memory_usage(deep=True).compute().apply(dask.utils.format_bytes)

In [None]:
#inspect dtypes
ddf.dtypes

## Challenges

As you can see, the partitions are very big, and the data types are inefficient.

## Recommendations and best practices:
**Partition size**

In general we aim for ~100MB (in memory) per partition. 

**dtypes**

- Avoid object types for strings: use `"string[pyarrow]"`
- Reduce int/float representation if possible
- Use categorical dtypes when possible.

### Create conversions dictionary

In [27]:
import pandas as pd

In [None]:
conversions = {}
for column, dtype in ddf.dtypes.items():
    if dtype == "object":
        conversions[column] = "string[pyarrow]"
    if dtype == "float64":
        conversions[column] = "float32"
    if dtype == "int64": 
        conversions[column] = "int32"
    if "flag" in column:
        conversions[column] = pd.CategoricalDtype(categories=["Y", "N"])
    if column == "airport_fee":
        conversions[column] = "float32"  #noticed that this has floats and the <NA> is making it an object
conversions

In [None]:
ddf = ddf.astype(conversions)
ddf = ddf.persist()

In [None]:
ddf.partitions[0].memory_usage(deep=True).compute().apply(dask.utils.format_bytes)

In [None]:
dask.utils.format_bytes(
    ddf.partitions[0].memory_usage(deep=True).compute().sum()
)

### Repartition

In [None]:
ddf = ddf.repartition(partition_size="128MB").persist()

In [None]:
dask.utils.format_bytes(
    ddf.memory_usage(deep=True).compute().sum()
)

In [None]:
ddf.npartitions

## Sort and one-day partitioning

In [None]:
ddf = ddf.set_index("request_datetime").persist()

In [None]:
ddf.divisions[:5]

Look like they are a bit longer than a day, we might as well repartition them witha  1-day frequency.

In [None]:
ddf = ddf.repartition(freq="1d")

In [None]:
ddf.divisions[:5]

In [None]:
ddf.npartitions

In [None]:
#Clever name for files when to_parquet
divisions = ddf.divisions

def name_file(index: int) -> str:
    return str(divisions[index].date()) + ".parquet"

name_file(0)

In [None]:
ddf.to_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/", 
    name_function=name_file,
)

## Read data back

use_nullable_dtypes

In [None]:
#client.restart()

In [None]:
df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/", 
    use_nullable_dtypes=True
).astype({"hvfhs_license_num": "string[pyarrow]", 
         "dispatching_base_num": "string[pyarrow]",
         "originating_base_num": "string[pyarrow]",
         }).persist()
#df.dtypes

In [None]:
df.dtypes

In [None]:
df.hvfhs_license_num.dtype

In [None]:
dask.utils.format_bytes(
    df.memory_usage(deep=True).sum().compute()
)

In [None]:
Note:

Without pyarrow strings we get '~200GB'

In [None]:
client.shutdown()

# On to a smaller cluster - let's do data analysis

Now we are at a stage that our whole dataset is ~75GB in memory. This is something we can work with in a smaller cluster. But also, when it comes to exploring data we do not necessarily need the whole data set.

One of the beauties of the parquet file format are:

- Column pruning: Get only the data of the column. 

In [36]:
cluster = coiled.Cluster(name=f"uber-lyft-small-{id_cluster}", 
                         n_workers=10, 
                         package_sync=True,
                         backend_options={"region_name": "us-east-2"},
                         worker_memory="32GB",
)

Output()

In [39]:
client = Client(cluster)

In [40]:
client

0,1
Connection method: Cluster object,Cluster type: coiled.ClusterBeta
Dashboard: http://52.14.236.252:8787,

0,1
Dashboard: http://52.14.236.252:8787,Workers: 10
Total threads: 80,Total memory: 303.47 GiB

0,1
Comm: tls://10.0.30.209:8786,Workers: 10
Dashboard: http://10.0.30.209:8787/status,Total threads: 80
Started: Just now,Total memory: 303.47 GiB

0,1
Comm: tls://10.0.3.215:40183,Total threads: 8
Dashboard: http://10.0.3.215:8787/status,Memory: 30.35 GiB
Nanny: tls://10.0.3.215:36279,
Local directory: /scratch/dask-worker-space/worker-f8jwa_7w,Local directory: /scratch/dask-worker-space/worker-f8jwa_7w

0,1
Comm: tls://10.0.15.0:45691,Total threads: 8
Dashboard: http://10.0.15.0:8787/status,Memory: 30.35 GiB
Nanny: tls://10.0.15.0:45821,
Local directory: /scratch/dask-worker-space/worker-5fd4smig,Local directory: /scratch/dask-worker-space/worker-5fd4smig

0,1
Comm: tls://10.0.12.138:43035,Total threads: 8
Dashboard: http://10.0.12.138:8787/status,Memory: 30.35 GiB
Nanny: tls://10.0.12.138:41351,
Local directory: /scratch/dask-worker-space/worker-rm3qwj0k,Local directory: /scratch/dask-worker-space/worker-rm3qwj0k

0,1
Comm: tls://10.0.13.167:40861,Total threads: 8
Dashboard: http://10.0.13.167:8787/status,Memory: 30.35 GiB
Nanny: tls://10.0.13.167:36559,
Local directory: /scratch/dask-worker-space/worker-bz1fus5z,Local directory: /scratch/dask-worker-space/worker-bz1fus5z

0,1
Comm: tls://10.0.12.154:36301,Total threads: 8
Dashboard: http://10.0.12.154:8787/status,Memory: 30.34 GiB
Nanny: tls://10.0.12.154:34301,
Local directory: /scratch/dask-worker-space/worker-67xjp9xe,Local directory: /scratch/dask-worker-space/worker-67xjp9xe

0,1
Comm: tls://10.0.9.1:38699,Total threads: 8
Dashboard: http://10.0.9.1:8787/status,Memory: 30.34 GiB
Nanny: tls://10.0.9.1:35543,
Local directory: /scratch/dask-worker-space/worker-q4z5sa8x,Local directory: /scratch/dask-worker-space/worker-q4z5sa8x

0,1
Comm: tls://10.0.11.169:44249,Total threads: 8
Dashboard: http://10.0.11.169:8787/status,Memory: 30.35 GiB
Nanny: tls://10.0.11.169:43399,
Local directory: /scratch/dask-worker-space/worker-1cexzrpc,Local directory: /scratch/dask-worker-space/worker-1cexzrpc

0,1
Comm: tls://10.0.8.135:38067,Total threads: 8
Dashboard: http://10.0.8.135:8787/status,Memory: 30.35 GiB
Nanny: tls://10.0.8.135:39319,
Local directory: /scratch/dask-worker-space/worker-ovpcx3rl,Local directory: /scratch/dask-worker-space/worker-ovpcx3rl

0,1
Comm: tls://10.0.8.9:33385,Total threads: 8
Dashboard: http://10.0.8.9:8787/status,Memory: 30.34 GiB
Nanny: tls://10.0.8.9:37157,
Local directory: /scratch/dask-worker-space/worker-y5x890yr,Local directory: /scratch/dask-worker-space/worker-y5x890yr

0,1
Comm: tls://10.0.4.103:33437,Total threads: 8
Dashboard: http://10.0.4.103:8787/status,Memory: 30.36 GiB
Nanny: tls://10.0.4.103:44585,
Local directory: /scratch/dask-worker-space/worker-9xribcf5,Local directory: /scratch/dask-worker-space/worker-9xribcf5


In [41]:
df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/", 
    #use_nullable_dtypes=True              #this is working on 2022.12.0 update dask
).astype({"hvfhs_license_num": "string[pyarrow]", 
         "dispatching_base_num": "string[pyarrow]",
         "originating_base_num": "string[pyarrow]",
         })

In [14]:
import dask
dask.__version__

'2022.12.0'

In [15]:
df.dtypes

hvfhs_license_num               string
dispatching_base_num            string
originating_base_num            string
on_scene_datetime       datetime64[ns]
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     int32
DOLocationID                     int32
trip_miles                     float32
trip_time                        int32
base_passenger_fare            float32
tolls                          float32
bcf                            float32
sales_tax                      float32
congestion_surcharge           float32
airport_fee                    float32
tips                           float32
driver_pay                     float32
shared_request_flag           category
shared_match_flag             category
access_a_ride_flag            category
wav_request_flag              category
wav_match_flag                category
dtype: object

```python
dask.utils.format_bytes(
    df.memory_usage(deep=True).sum().compute()
)
```
'82.73 GiB' #with nullable dtypes

In [16]:
# no nullable dtypes
dask.utils.format_bytes(
    df.memory_usage(deep=True).sum().compute()
)

'74.82 GiB'

In [11]:
df.hvfhs_license_num.dtype

string[pyarrow]

In [19]:
df.columns

Index(['hvfhs_license_num', 'dispatching_base_num', 'originating_base_num',
       'on_scene_datetime', 'pickup_datetime', 'dropoff_datetime',
       'PULocationID', 'DOLocationID', 'trip_miles', 'trip_time',
       'base_passenger_fare', 'tolls', 'bcf', 'sales_tax',
       'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay',
       'shared_request_flag', 'shared_match_flag', 'access_a_ride_flag',
       'wav_request_flag', 'wav_match_flag'],
      dtype='object')

In [None]:
#This is only bringing the 2 columns.
df_small = df[["base_passenger_fare", "driver_pay"]]

#test

In [None]:
#df_small = df_small.persist()

In [None]:
# dask.utils.format_bytes(
#     df_small.memory_usage(deep=True).sum().compute()
# )
# #'10.55 GiB'

In [None]:
#df.head()

## From the data dictionary we know:

The TLC license number of the HVFHS base or business
As of September 2019, the HVFHS licensees are the following:

• HV0002: Juno  
• HV0003: Uber  
• HV0004: Via  
• HV0005: Lyft  

In [20]:
# df =  df[['hvfhs_license_num', 'dispatching_base_num', 'originating_base_num',
#      'pickup_datetime', 'dropoff_datetime','bcf',
#      'PULocationID', 'DOLocationID', 'trip_miles', 'trip_time','base_passenger_fare',
#      'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay',]].persist()

In [43]:
df["tip_flag"] = df.tips > 0

In [44]:
df_small = df[["hvfhs_license_num", "tips", 'base_passenger_fare', 
               'driver_pay', "tip_flag", "trip_miles", "trip_time", "pickup_datetime",
               'dropoff_datetime']].persist()

In [45]:
df_small.base_passenger_fare.sum().compute() / 1e9

14.04137472

In [46]:
df_small.driver_pay.sum().compute() / 1e9

11.404737536

In [47]:
df_small.tips.sum().compute() / 1e6

525.279552

note: on pandas efficent creating columns 
https://stackoverflow.com/questions/52289488/efficient-way-to-add-new-column-to-pandas-dataframe

In [48]:
tip_counts = df_small.groupby(['hvfhs_license_num']).tip_flag.value_counts().compute()

In [49]:
tip_counts

hvfhs_license_num  tip_flag
HV0002             False         5849705
                   True           539229
HV0003             False       434076699
                   True         72850568
HV0004             False        12497572
                   True          1278427
HV0005             False       146269645
                   True         34424234
Name: tip_flag, dtype: int64

In [50]:
tip_counts.unstack(level="tip_flag") / 1e6

tip_flag,False,True
hvfhs_license_num,Unnamed: 1_level_1,Unnamed: 2_level_1
HV0002,5.849705,0.539229
HV0003,434.076699,72.850568
HV0004,12.497572,1.278427
HV0005,146.269645,34.424234


In [51]:
tips_total = df_small.loc[lambda x: x.tip_flag].groupby('hvfhs_license_num').tips.agg(["sum", "mean"]).compute()
tips_total

Unnamed: 0_level_0,sum,mean
hvfhs_license_num,Unnamed: 1_level_1,Unnamed: 2_level_1
HV0003,351948992.0,4.831108
HV0002,2162384.5,4.010141
HV0005,168172736.0,4.885301
HV0004,2995445.5,2.343071


In [52]:
provider = {"HV0002":"Juno", 
            "HV0005":"Lyft" ,
            "HV0003":"Uber" ,
           "HV0004":"Via"}

In [53]:
tips_total = tips_total.assign(provider=lambda df: df.index.map(provider)).set_index("provider")
tips_total

Unnamed: 0_level_0,sum,mean
provider,Unnamed: 1_level_1,Unnamed: 2_level_1
Uber,351948992.0,4.831108
Juno,2162384.5,4.010141
Lyft,168172736.0,4.885301
Via,2995445.5,2.343071


In [54]:
tip_percentage = df_small.tips / df_small.base_passenger_fare 
df_small["tip_percentage"] = tip_percentage

In [55]:
df_small = df_small.persist()

In [65]:
tips_perc_mean = df_small.loc[lambda x: x.tip_flag].groupby('hvfhs_license_num').tip_percentage.mean().compute()
tips_perc_mean

hvfhs_license_num
HV0002    0.230597
HV0003    0.039490
HV0005    0.224473
HV0004    0.165701
Name: tip_percentage, dtype: float64

In [66]:
tips_perc_mean.to_frame().set_index(tips_perc_mean.index.map(provider))

Unnamed: 0_level_0,tip_percentage
hvfhs_license_num,Unnamed: 1_level_1
Juno,0.230597
Uber,0.03949
Lyft,0.224473
Via,0.165701


# Base pay per mile per - by provider

In [67]:
dolars_per_mile = df_small.base_passenger_fare / df_small.trip_miles
df_small["dolars_per_mile"] = dolars_per_mile
df_small = df_small.persist()

In [68]:
df_small.groupby('hvfhs_license_num').dolars_per_mile.agg(["min", "max", "mean", "std"]).compute()

Unnamed: 0_level_0,min,max,mean,std
hvfhs_license_num,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
HV0003,-inf,inf,5.3e-05,0.047217
HV0002,-inf,inf,1.045045,4.245099
HV0005,-inf,inf,0.066468,4.882487
HV0004,0.0,inf,6.4e-05,0.12928


In [69]:
df_small.loc[lambda x: x.tip_flag].groupby('hvfhs_license_num').dolars_per_mile.agg(["min", "max", "mean", "std"]).compute()

Unnamed: 0_level_0,min,max,mean,std
hvfhs_license_num,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
HV0003,-inf,inf,0.244394,2.259485
HV0002,-0.018182,inf,4.43955,7.055813
HV0005,-92.5,inf,1.817102,72.354233
HV0004,0.0,inf,0.364527,2.902641


In [75]:
df_small.trip_miles.min().compute()

0.0

In [76]:
df_small.trip_miles.max().compute()

1310.51

In [77]:
df_small.base_passenger_fare.min().compute()

-1969.59

In [78]:
df_small.base_passenger_fare.max().compute()

8157.74

In [81]:
df_small.trip_miles.quantile(0.95).compute()

21.796000289917

In [82]:
df_small.base_passenger_fare.quantile(0.95).compute()

86.375

##FILTER

0 < trip_mile <22 
0 < base_fare <87 

In [87]:
df_small.index

Dask Index Structure:
npartitions=1340
    datetime64[ns]
               ...
         ...      
               ...
               ...
Name: request_datetime, dtype: datetime64[ns]
Dask Name: assign-index, 2 graph layers

In [85]:
###HOW TO FILTER WITH DASK ARRAYS
df_small = df_small.loc[[(0 < df_small["trip_miles"]) & (df_small["trip_miles"] < 22) ]]
df_small = df_small.persist()

KeyError: 'Cannot index with list against unknown division. Try setting divisions using ``ddf.set_index``'

In [None]:
df_small = df_small[[(0 < df_small["trip_miles"]) & (df_small["trip_miles"] < 22) ]]

In [72]:

##HOW TO FILTER THINGS
df_small.loc[lambda x: 0 <= x.dolars_per_mile.values < 100].dolars_per_mile.agg(["min", "max", "mean", "std"]).compute()

ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

NOTES to include in text:

Repartition [docs](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.repartition.html):

- Exactly one of divisions, npartitions, partition_size, or freq should be specified.
- "You should aim for partitions that have around 100MB of data each." https://docs.dask.org/en/stable/dataframe-best-practices.html  but Why?

https://docs.dask.org/en/stable/dataframe-best-practices.html#repartition-to-reduce-overhead