# Create NYC TLC Parquet files

There is New York City Taxi and Limousine Commission (TLC) Trip Record Data stored in S3, [see here](https://registry.opendata.aws/nyc-tlc-trip-records-pds/).

This notebook reads in the CSV data and writes out Parquet files that are easier and more performant to work with.

In [59]:
import coiled
import dask
import dask.dataframe as dd
import pandas as pd

In [60]:
cluster = coiled.Cluster(name="powers-demo", n_workers=10)



Found software environment build
Created fw rule: inbound [8786-8787] [0.0.0.0/0] []
Created FW rules: coiled-dask-matthew24-97964-firewall
Created fw rule: cluster [0-65535] [None] [coiled-dask-matthew24-97964-firewall -> coiled-dask-matthew24-97964-firewall]
Created FW rules: coiled-dask-matthew24-97964-cluster-firewall
Created fw rule: cluster [0-65535] [None] [coiled-dask-matthew24-97964-cluster-firewall -> coiled-dask-matthew24-97964-cluster-firewall]
Created scheduler VM: coiled-dask-matthew24-97964-scheduler (type: t3a.medium, ip: ['3.226.238.125'])


In [61]:
client = dask.distributed.Client(cluster)


+-------------+---------------+---------------+---------------+
| Package     | client        | scheduler     | workers       |
+-------------+---------------+---------------+---------------+
| dask        | 2021.11.2     | 2021.12.0     | 2021.12.0     |
| distributed | 2021.11.2     | 2021.12.0     | 2021.12.0     |
| numpy       | 1.21.4        | 1.22.0        | 1.22.0        |
| pandas      | 1.3.4         | 1.3.5         | 1.3.5         |
| python      | 3.9.7.final.0 | 3.9.9.final.0 | 3.9.9.final.0 |
+-------------+---------------+---------------+---------------+


## 2009 data create

In [23]:
ddf = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2009-*.csv",
    parse_dates=["Trip_Pickup_DateTime", "Trip_Dropoff_DateTime"],
    dtype={
        "Tolls_Amt": "float64",
        "vendor_name": "string[pyarrow]",
        "Payment_Type": "string[pyarrow]",
    },
)

In [24]:
ddf.dtypes

vendor_name                      string
Trip_Pickup_DateTime     datetime64[ns]
Trip_Dropoff_DateTime    datetime64[ns]
Passenger_Count                   int64
Trip_Distance                   float64
Start_Lon                       float64
Start_Lat                       float64
Rate_Code                       float64
store_and_forward               float64
End_Lon                         float64
End_Lat                         float64
Payment_Type                     string
Fare_Amt                        float64
surcharge                       float64
mta_tax                         float64
Tip_Amt                         float64
Tolls_Amt                       float64
Total_Amt                       float64
dtype: object

In [25]:
ddf.head()

Unnamed: 0,vendor_name,Trip_Pickup_DateTime,Trip_Dropoff_DateTime,Passenger_Count,Trip_Distance,Start_Lon,Start_Lat,Rate_Code,store_and_forward,End_Lon,End_Lat,Payment_Type,Fare_Amt,surcharge,mta_tax,Tip_Amt,Tolls_Amt,Total_Amt
0,VTS,2009-01-04 02:52:00,2009-01-04 03:02:00,1,2.63,-73.991957,40.721567,,,-73.993803,40.695922,CASH,8.9,0.5,,0.0,0.0,9.4
1,VTS,2009-01-04 03:31:00,2009-01-04 03:38:00,3,4.55,-73.982102,40.73629,,,-73.95585,40.76803,Credit,12.1,0.5,,2.0,0.0,14.6
2,VTS,2009-01-03 15:43:00,2009-01-03 15:57:00,5,10.35,-74.002587,40.739748,,,-73.869983,40.770225,Credit,23.7,0.0,,4.74,0.0,28.44
3,DDS,2009-01-01 20:52:58,2009-01-01 21:14:00,1,5.0,-73.974267,40.790955,,,-73.996558,40.731849,CREDIT,14.9,0.5,,3.05,0.0,18.45
4,DDS,2009-01-24 16:18:23,2009-01-24 16:24:56,1,0.4,-74.00158,40.719382,,,-74.008378,40.72035,CASH,3.7,0.0,,0.0,0.0,3.7


In [26]:
ddf.describe().compute()

Unnamed: 0,Passenger_Count,Trip_Distance,Start_Lon,Start_Lat,Rate_Code,store_and_forward,End_Lon,End_Lat,Fare_Amt,surcharge,mta_tax,Tip_Amt,Tolls_Amt,Total_Amt
count,170896100.0,170896100.0,170896100.0,170896100.0,0.0,67300370.0,170896100.0,170896100.0,170896100.0,170896100.0,29855490.0,170896100.0,170896100.0,170896100.0
mean,1.691026,2.708163,-73.04755,40.24202,,0.0179029,-73.06975,40.25457,9.905162,0.2124957,0.4483104,0.5733786,0.1531628,10.93008
std,1.317733,3.119908,8.33958,4.634872,,0.1325986,8.217777,4.633053,7.686219,0.3366762,0.1522268,1.428889,0.8581441,8.855952
min,0.0,0.0,-2828.434,-3084.3,,0.0,-3509.015,-3579.139,2.5,0.0,0.0,0.0,0.0,2.5
25%,1.0,1.02,-73.99142,40.73689,,0.0,-73.99087,40.73613,5.7,0.0,0.5,0.0,0.0,6.7
50%,1.0,1.76,-73.98118,40.75452,,0.0,-73.97961,40.75498,7.7,0.5,0.5,0.0,0.0,9.0
75%,2.0,3.12,-73.96657,40.76876,,0.0,-73.96326,40.76987,11.3,0.5,0.5,1.0,0.0,13.0
max,255.0,50.0,3570.224,3210.379,,1.0,1565.33,3172.506,200.0,12.0,0.5,100.0,20.0,235.5


In [27]:
ddf.npartitions

490

In [29]:
ddf.memory_usage(deep=True).compute().sum()

24530855549

In [31]:
ddf.known_divisions

False

In [32]:
ddf.repartition(partition_size="100MB").to_parquet(
    "s3://coiled-datasets/nyc-tlc/2009",
    engine="pyarrow",
    compression="snappy",
    write_metadata_file=False,
)

CancelledError: ('metadata-to-parquet-c600b36a787d07f36b9e017b07137bbc', 0)

## 2009 data query

In [45]:
ddf = dd.read_parquet(
    "s3://coiled-datasets/nyc-tlc/2009",
    engine="pyarrow",
)

In [46]:
dtypes_2009 = ddf.dtypes

In [49]:
dtypes_2009

vendor_name                      string
Trip_Pickup_DateTime     datetime64[ns]
Trip_Dropoff_DateTime    datetime64[ns]
Passenger_Count                   int64
Trip_Distance                   float64
Start_Lon                       float64
Start_Lat                       float64
Rate_Code                       float64
store_and_forward               float64
End_Lon                         float64
End_Lat                         float64
Payment_Type                     string
Fare_Amt                        float64
surcharge                       float64
mta_tax                         float64
Tip_Amt                         float64
Tolls_Amt                       float64
Total_Amt                       float64
dtype: object

In [38]:
len(ddf)

170896055

In [42]:
ddf = dd.read_parquet(
    "s3://coiled-datasets/nyc-tlc/2009",
    engine="pyarrow",
    columns=["Fare_Amt"]
)

In [44]:
%%time
ddf.Fare_Amt.mean().compute()

CPU times: user 124 ms, sys: 7.65 ms, total: 131 ms
Wall time: 4.79 s


9.905162372589585

In [None]:
ddf = dd.read_parquet(
    "s3://coiled-datasets/nyc-tlc/2009",
    engine="pyarrow",
)

In [None]:
ddf.dtypes

## 2010 data create

In [74]:
ddf = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2010-*.csv",
    # blocksize=None,
    # parse_dates=["pickup_datetime", "dropoff_datetime"],
    # dtype={
    #     "tolls_amount": "float64",
    #     "vendor_id": "string[pyarrow]",
    #     "payment_type": "string[pyarrow]",
    # },
)

In [75]:
ddf.dtypes

vendor_id              object
pickup_datetime        object
dropoff_datetime       object
passenger_count         int64
trip_distance         float64
pickup_longitude      float64
pickup_latitude       float64
rate_code               int64
store_and_fwd_flag    float64
dropoff_longitude     float64
dropoff_latitude      float64
payment_type           object
fare_amount           float64
surcharge             float64
mta_tax               float64
tip_amount            float64
tolls_amount            int64
total_amount          float64
dtype: object

In [72]:
ddf.head()

KilledWorker: ("('read-csv-3668b804361760cdad62ea5b1c934331', 0)", <WorkerState 'tls://10.4.15.201:34737', name: coiled-dask-matthew24-97964-worker-eeb8e3676a, status: closed, memory: 0, processing: 1>)

In [73]:
ddf.vendor_id.nunique().compute()

KeyboardInterrupt: 

In [91]:
ddf = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2010-*.csv",
    parse_dates=["pickup_datetime", "dropoff_datetime"],
    blocksize=None,
    dtype={
        "tolls_amount": "float64",
        "vendor_id": "string[pyarrow]",
        "payment_type": "string[pyarrow]",
        "store_and_fwd_flag": "string[pyarrow]",
    },
)

In [92]:
dtypes2010 = ddf.dtypes

In [93]:
dtypes2010

vendor_id                     string
pickup_datetime       datetime64[ns]
dropoff_datetime      datetime64[ns]
passenger_count                int64
trip_distance                float64
pickup_longitude             float64
pickup_latitude              float64
rate_code                      int64
store_and_fwd_flag            string
dropoff_longitude            float64
dropoff_latitude             float64
payment_type                  string
fare_amount                  float64
surcharge                    float64
mta_tax                      float64
tip_amount                   float64
tolls_amount                 float64
total_amount                 float64
dtype: object

In [89]:
ddf.head()

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,VTS,2010-01-26 07:41:00,2010-01-26 07:45:00,1,0.75,-73.956778,40.76775,1,,-73.965957,40.765232,CAS,4.5,0.0,0.5,0.0,0.0,5.0
1,DDS,2010-01-30 23:31:00,2010-01-30 23:46:12,1,5.9,-73.996118,40.763932,1,,-73.981512,40.741193,CAS,15.3,0.5,0.5,0.0,0.0,16.3
2,DDS,2010-01-18 20:22:20,2010-01-18 20:38:12,1,4.0,-73.979673,40.78379,1,,-73.917852,40.87856,CAS,11.7,0.5,0.5,0.0,0.0,12.7
3,VTS,2010-01-09 01:18:00,2010-01-09 01:35:00,2,4.7,-73.977922,40.763997,1,,-73.923908,40.759725,CAS,13.3,0.5,0.5,0.0,0.0,14.3
4,CMT,2010-01-18 19:10:14,2010-01-18 19:17:07,1,0.6,-73.990924,40.734682,1,0.0,-73.995511,40.739088,Cre,5.3,0.0,0.5,0.87,0.0,6.67


In [95]:
ddf.to_parquet(
    "s3://coiled-datasets/nyc-tlc/2010",
    engine="pyarrow",
    compression="snappy",
    write_metadata_file=False,
)

ParserError: Error tokenizing data. C error: Expected 18 fields in line 2958, saw 19


## 2011 data create

In [53]:
ddf = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2011-*.csv",
    # parse_dates=["Trip_Pickup_DateTime", "Trip_Dropoff_DateTime"],
    dtype={
        "Tolls_Amt": "float64",
        "vendor_name": "string[pyarrow]",
        "Payment_Type": "string[pyarrow]",
    },
)

In [54]:
ddf.dtypes

vendor_id              object
pickup_datetime        object
dropoff_datetime       object
passenger_count         int64
trip_distance         float64
pickup_longitude      float64
pickup_latitude       float64
rate_code               int64
store_and_fwd_flag     object
dropoff_longitude     float64
dropoff_latitude      float64
payment_type           object
fare_amount           float64
surcharge             float64
mta_tax               float64
tip_amount              int64
tolls_amount            int64
total_amount          float64
dtype: object

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
