In [2]:
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask.dataframe as dd
import math


def roundup(x, base: int = 5):
    """Round `x` up to nearest `base`"""
    return int(math.ceil(x / float(base))) * base


def round_series_up(s: dd.Series) -> dd.Series:
    """Apply roundup function to all elements of `s`"""
    return s.apply(roundup, meta=pd.Series(data=[], dtype=np.float32))


def transform_dask_dataframe(df: dd.DataFrame) -> dd.DataFrame:
    """Process NYC taxi data"""
    return (
        df[[
            'tpep_pickup_datetime', 'tpep_dropoff_datetime',
            'trip_distance', 'total_amount'
        ]]
        .astype({
            'tpep_pickup_datetime': 'datetime64[ms]',
            'tpep_dropoff_datetime': 'datetime64[ms]'
        })
        .assign(drive_time=(lambda df: (
            df.tpep_dropoff_datetime - df.tpep_pickup_datetime).dt.seconds
            // 300))
        .assign(drive_time=lambda df: round_series_up(df.drive_time))
        .assign(trip_distance=lambda df: round_series_up(df.trip_distance))
        .query('drive_time <= 120 & trip_distance <= 50')
        .drop(['tpep_pickup_datetime', 'tpep_dropoff_datetime'], axis=1)
        .round({'trip_distance': 0})
        .groupby(['drive_time', 'trip_distance'])
        .mean()
        .rename(columns={'total_amount': 'avg_amount'})
    )


def compute_final_dataframe(df: dd.DataFrame) -> pd.DataFrame:
    """Execute dask task graph and compute final results"""
    return (
        df
        .compute()
        .reset_index()
        .pivot(
             index='drive_time',
             columns='trip_distance',
             values='avg_amount'
        )
        .fillna(0)
    )

In [4]:
client = Client()

taxi_data = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2018-04.csv',
    storage_options={'anon': True, 'use_ssl': False}
)

In [5]:
transformed_data = transform_dask_dataframe(taxi_data)
fare_distribution = compute_final_dataframe(transformed_data)

print(fare_distribution.to_string())

trip_distance          0          5          10         15         20          25          30          35          40          45          50
drive_time                                                                                                                                   
0               34.106532   6.777906  29.322293  41.465332  48.328954   53.812857   62.784265  114.779697  137.180000  188.779091  237.975000
5               13.162592  12.659089  28.653480  43.249538  64.541436   71.395503   91.944865   80.038333   52.800000    0.000000    0.000000
10              28.810443  25.936256  36.371690  49.040409  65.759698   70.834030   88.876753  131.893420  155.772594  182.284359  245.113750
15              35.985702  36.814331  49.385895  57.912027  66.867544   69.883872   85.950816  117.336853  153.899789  180.471179  220.812613
20              55.574459  30.579348  60.521722  68.039804  68.516193   72.595018   90.693380  108.568716  157.129796  161.684426  196.237400
25    