Step 1: download and convert dataset from CSV to Parquet

In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster
import dask
from dask.distributed import get_task_stream

In [3]:
import time

In [4]:
print('pandas version: %s' % pd.__version__)
print('numpy version: %s' % np.__version__)
print('dask version: %s' % dask.__version__)

pandas version: 1.4.2
numpy version: 1.22.3
dask version: 2022.05.0


https://docs.databricks.com/_static/notebooks/koalas-benchmark-distributed-execution.html?_ga=2.216403934.95291449.1648935555-599276868.1645477063

In [5]:
filename = "taxi_dataset.txt"

In [6]:
with open(filename) as file:
    csv_files = [line.rstrip() for line in file]
# only choose yellow taxis
yellow = list(filter(lambda x: "yellow" in x, csv_files))

In [7]:
# make the list small for now
yellow = ['https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2013-11.csv']

In [8]:
yellow

['https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2013-11.csv']

In [11]:
import re
pattern = '/[^\/]*\.csv$/gm'

while len(yellow) > 0:
    try:
        csv_url = yellow.pop()
        print(csv_url)
    #     get the filename only so I can sort by month / year
        csv_name = re.findall(r"[^\/]*\.csv$",csv_url)[0]
        df = dd.read_csv(csv_url,dtype={'tolls_amount': 'float64',
                                       'tip_amount': 'float64',
                                        'Tip_Amt': 'float64',
                                       'Tolls_Amt': 'float64',
                                        'surcharge': 'float64',
                                        'store_and_fwd_flag': 'object'
                                       })

        df = df.repartition(npartitions=4)
        df.to_parquet(f'./tmp/trip_data_{csv_name}',
                      compression="gzip", 
                      engine="pyarrow")
    except Exception as e:
        print(f"failed, {csv_url}")
        print(e)


https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2013-11.csv


Read in parquet to dask

In [7]:
#first rerun open filename as file block above, then this block for accurate length
len(yellow)

151

In [16]:
#client.close()

In [8]:
from dask.distributed import get_task_stream

#client = Client()

In [9]:
cluster = LocalCluster(n_workers=4,
                      threads_per_worker=1,
                      memory_limit='4GB')
client = Client(cluster)

# set up testbench

In [10]:
def benchmark(f, df, benchmarks, task_name, **kwargs):
    """Benchmark the given function against the given DataFrame.
    
    Parameters
    ----------
    f: function to benchmark
    df: data frame
    benchmarks: container for benchmark results
    name: task name
    
    Returns
    -------
    Duration (in seconds) of the given operation
    """
    ret_benchmark_vals = {}
    with get_task_stream(plot='save', filename="task-stream.html") as ts:
        start_time = time.time()
        ret = f(df, **kwargs)
        ret_benchmark_vals['raw_duration'] = time.time() - start_time
        ret_benchmark_vals['history'] = ts.data
    benchmarks[task_name] = ret_benchmark_vals
    print(f"{task_name} took: {benchmarks[task_name].get('raw_duration')} seconds")
    return benchmarks[task_name].get("raw_duration")

In [11]:
import collections
dask_benchmarks = collections.defaultdict(dict)
# benchmarks = {"task1" : {"stat1": val, "stat2": val}}

# Define benchmark tasks

In [12]:
all_tasks = []

In [13]:
# sum, then means = simple mapreduce
def read_to_basic_ETL(df = None):
    df = dd.read_parquet(
    "./tmp/", 
    storage_options={"anon": True, 'use_ssl': True})    
    return (df.fare_amount + df.tip_amount).mean().compute()
all_tasks.append(read_to_basic_ETL)

In [14]:
# counts of values seen = simple map, groupby, reduce
def count_values(df):
    return df.fare_amount.value_counts().compute()
all_tasks.append(count_values)

In [15]:
# cpu heavy arithmetic : mapreduce
def complicated_arithmetic_operation(df):
    theta_1 = df.pickup_longitude
    phi_1 = df.pickup_latitude
    theta_2 = df.dropoff_longitude
    phi_2 = df.dropoff_latitude
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.compute()
all_tasks.append(complicated_arithmetic_operation)

In [16]:
def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg(
      {
        'total_amount': ['mean', 'std'], 
        'tip_amount': ['mean', 'std']
      }
    ).compute()
all_tasks.append(groupby_statistics)

In [17]:
# join two datasets
def join_data(df):
    return dd.merge(df, df, left_index=True, right_index=True).compute()
all_tasks.append(join_data)

In [18]:
all_tasks

[<function __main__.read_to_basic_ETL(df=None)>,
 <function __main__.count_values(df)>,
 <function __main__.complicated_arithmetic_operation(df)>,
 <function __main__.groupby_statistics(df)>,
 <function __main__.join_data(df)>]

# run the tasks

In [19]:
df = dd.read_parquet(
    "./tmp/", 
    storage_options={"anon": True, 'use_ssl': True})

In [20]:
# for task in all_tasks:
#     benchmark(task, df=df, benchmarks = dask_benchmarks, task_name = task.__name__)

In [25]:
[benchmark(task, df=df, benchmarks = dask_benchmarks, task_name = task.__name__) for task in all_tasks]

read_to_basic_ETL took: 1.5093305110931396 seconds
count_values took: 0.5255937576293945 seconds
complicated_arithmetic_operation took: 3.2453343868255615 seconds
groupby_statistics took: 13.304431915283203 seconds


KeyboardInterrupt: 

In [23]:
#client.close()

# history groking

In [26]:
# add the analyzed dataframes

for task_name, output_values in dask_benchmarks.items():
    dask_hx = output_values.get("history")
    hx_df = pd.DataFrame (dask_hx, columns = ['worker','status','nbytes', 'thread', 'type', 'typename', 'metadata', 'startstops', 'key'])
    hx_ddf = dd.from_pandas(hx_df, npartitions=1)
    exploded_df = hx_ddf.explode("startstops")
    exploded_df['action'] = exploded_df['startstops'].apply(lambda x: x['action'], meta = ("action", str))
    exploded_df['start'] = exploded_df['startstops'].apply(lambda x: x['start'], meta = ("start", np.float64))
    exploded_df['end'] = exploded_df['startstops'].apply(lambda x: x['stop'], meta = ("stop", np.float64))
    exploded_df['action_duration'] = exploded_df['end'] - exploded_df['start']
    exploded_df_only_agg_fields = exploded_df[['worker', 'action', 'action_duration']]
    time_per_worker_and_action = exploded_df_only_agg_fields.groupby(['worker','action']).agg("sum")
    nbytes_per_worker = hx_ddf[['worker', 'nbytes']].groupby(["worker"]).agg("sum")
    output_values["time_per_worker_and_action"] = time_per_worker_and_action.compute()
    output_values["nbytes_per_worker"] = nbytes_per_worker.compute()

In [27]:
#client.close()

In [48]:
# access the analyzed dataframes like so:
dask_benchmarks['read_to_basic_ETL']["time_per_worker_and_action"]

Unnamed: 0_level_0,Unnamed: 1_level_0,action_duration
worker,action,Unnamed: 2_level_1
tcp://127.0.0.1:56243,compute,0.603825
tcp://127.0.0.1:56243,deserialize,0.324824
tcp://127.0.0.1:56243,transfer,0.022714
tcp://127.0.0.1:56246,compute,0.604824
tcp://127.0.0.1:56246,deserialize,0.300908
tcp://127.0.0.1:56246,transfer,0.02311
tcp://127.0.0.1:56249,compute,0.611411
tcp://127.0.0.1:56249,deserialize,0.30065
tcp://127.0.0.1:56253,compute,0.671207
tcp://127.0.0.1:56253,deserialize,0.294801


In [49]:
dask_benchmarks['read_to_basic_ETL']["nbytes_per_worker"]

Unnamed: 0_level_0,nbytes
worker,Unnamed: 1_level_1
tcp://127.0.0.1:56243,55600448
tcp://127.0.0.1:56246,56012688
tcp://127.0.0.1:56249,56012624
tcp://127.0.0.1:56253,62599088


In [50]:
dask_benchmarks['count_values']["time_per_worker_and_action"]

Unnamed: 0_level_0,Unnamed: 1_level_0,action_duration
worker,action,Unnamed: 2_level_1
tcp://127.0.0.1:56243,compute,0.445477
tcp://127.0.0.1:56243,transfer,0.007983
tcp://127.0.0.1:56246,compute,0.373273
tcp://127.0.0.1:56249,compute,0.396725
tcp://127.0.0.1:56253,compute,0.435246


In [51]:
dask_benchmarks['count_values']["nbytes_per_worker"]

Unnamed: 0_level_0,nbytes
worker,Unnamed: 1_level_1
tcp://127.0.0.1:56243,55176
tcp://127.0.0.1:56246,27280
tcp://127.0.0.1:56249,28432
tcp://127.0.0.1:56253,28608


In [52]:
dask_benchmarks['complicated_arithmetic_operation']["time_per_worker_and_action"]

Unnamed: 0_level_0,Unnamed: 1_level_0,action_duration
worker,action,Unnamed: 2_level_1
tcp://127.0.0.1:56243,compute,2.195702
tcp://127.0.0.1:56246,compute,2.258475
tcp://127.0.0.1:56249,compute,2.413339
tcp://127.0.0.1:56253,compute,2.591246


In [53]:
dask_benchmarks['complicated_arithmetic_operation']["nbytes_per_worker"]

Unnamed: 0_level_0,nbytes
worker,Unnamed: 1_level_1
tcp://127.0.0.1:56243,55600288
tcp://127.0.0.1:56246,56012496
tcp://127.0.0.1:56249,56012496
tcp://127.0.0.1:56253,62598960


In [54]:
dask_benchmarks['groupby_statistics']["time_per_worker_and_action"]

Unnamed: 0_level_0,Unnamed: 1_level_0,action_duration
worker,action,Unnamed: 2_level_1
tcp://127.0.0.1:56243,compute,8.300264
tcp://127.0.0.1:56243,transfer,0.013252
tcp://127.0.0.1:56246,compute,10.45808
tcp://127.0.0.1:56249,compute,13.187732
tcp://127.0.0.1:56253,compute,10.340395


In [55]:
dask_benchmarks['groupby_statistics']["nbytes_per_worker"]

Unnamed: 0_level_0,nbytes
worker,Unnamed: 1_level_1
tcp://127.0.0.1:56243,7672
tcp://127.0.0.1:56246,5120
tcp://127.0.0.1:56249,5120
tcp://127.0.0.1:56253,5120


# try to do something fancy with the history

In [56]:
hx = dask_benchmarks.get("read_to_basic_ETL").get("history")

use dask to do dask :D

In [57]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [58]:
hx_df = pd.DataFrame (hx, columns = ['worker','status','nbytes', 'thread', 'type', 'typename', 'metadata', 'startstops', 'key'])

In [59]:
hx_ddf = dd.from_pandas(hx_df, npartitions=1)

In [60]:
# the startstops are nested. we need to unnest this for action stuff only. but use nested for all other
hx_ddf.head(20)

Unnamed: 0,worker,status,nbytes,thread,type,typename,metadata,startstops,key
0,tcp://127.0.0.1:56246,OK,27997016,27104,b'\x80\x04\x95!\x00\x00\x00\x00\x00\x00\x00\x8c\x12pandas.core.series\x94\x8c\x06Series\x94\x93\x94.',pandas.core.series.Series,{},"({'action': 'deserialize', 'start': 1652304657.666907, 'stop': 1652304657.967815}, {'action': 'compute', 'start': 1652304657.968235, 'stop': 1652304658.1867692})","('add-d716864db68e7bc4b58dd46d6a77eb1c', 2)"
1,tcp://127.0.0.1:56243,OK,27800144,26052,b'\x80\x04\x95!\x00\x00\x00\x00\x00\x00\x00\x8c\x12pandas.core.series\x94\x8c\x06Series\x94\x93\x94.',pandas.core.series.Series,{},"({'action': 'deserialize', 'start': 1652304657.6671968, 'stop': 1652304657.9920208}, {'action': 'compute', 'start': 1652304657.9918344, 'stop': 1652304658.1957502})","('add-d716864db68e7bc4b58dd46d6a77eb1c', 0)"
2,tcp://127.0.0.1:56249,OK,28015480,26148,b'\x80\x04\x95!\x00\x00\x00\x00\x00\x00\x00\x8c\x12pandas.core.series\x94\x8c\x06Series\x94\x93\x94.',pandas.core.series.Series,{},"({'action': 'deserialize', 'start': 1652304657.6668663, 'stop': 1652304657.9675162}, {'action': 'compute', 'start': 1652304657.9722996, 'stop': 1652304658.218315})","('add-d716864db68e7bc4b58dd46d6a77eb1c', 1)"
3,tcp://127.0.0.1:56253,OK,31299480,6972,b'\x80\x04\x95!\x00\x00\x00\x00\x00\x00\x00\x8c\x12pandas.core.series\x94\x8c\x06Series\x94\x93\x94.',pandas.core.series.Series,{},"({'action': 'deserialize', 'start': 1652304657.6727228, 'stop': 1652304657.9675233}, {'action': 'compute', 'start': 1652304657.9688478, 'stop': 1652304658.2466178})","('add-d716864db68e7bc4b58dd46d6a77eb1c', 3)"
4,tcp://127.0.0.1:56249,OK,27997016,26148,b'\x80\x04\x95!\x00\x00\x00\x00\x00\x00\x00\x8c\x12pandas.core.series\x94\x8c\x06Series\x94\x93\x94.',pandas.core.series.Series,{},"({'action': 'compute', 'start': 1652304658.21627, 'stop': 1652304658.5319226},)","('add-d716864db68e7bc4b58dd46d6a77eb1c', 6)"
5,tcp://127.0.0.1:56246,OK,28015480,27104,b'\x80\x04\x95!\x00\x00\x00\x00\x00\x00\x00\x8c\x12pandas.core.series\x94\x8c\x06Series\x94\x93\x94.',pandas.core.series.Series,{},"({'action': 'compute', 'start': 1652304658.195911, 'stop': 1652304658.5345247},)","('add-d716864db68e7bc4b58dd46d6a77eb1c', 5)"
6,tcp://127.0.0.1:56246,OK,32,27104,b'\x80\x04\x95\x13\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x05int64\x94\x93\x94.',numpy.int64,{},"({'action': 'compute', 'start': 1652304658.5452201, 'stop': 1652304658.5576632},)","('series-count-chunk-0c5ca3d77478f555ae1be3d12e00edc6-838892189725e30078715bd8555c08d1', 2)"
7,tcp://127.0.0.1:56243,OK,27800144,26052,b'\x80\x04\x95!\x00\x00\x00\x00\x00\x00\x00\x8c\x12pandas.core.series\x94\x8c\x06Series\x94\x93\x94.',pandas.core.series.Series,{},"({'action': 'compute', 'start': 1652304658.207799, 'stop': 1652304658.5605364},)","('add-d716864db68e7bc4b58dd46d6a77eb1c', 4)"
8,tcp://127.0.0.1:56243,OK,32,26052,b'\x80\x04\x95\x13\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x05int64\x94\x93\x94.',numpy.int64,{},"({'action': 'compute', 'start': 1652304658.5608912, 'stop': 1652304658.5700924},)","('series-count-chunk-0c5ca3d77478f555ae1be3d12e00edc6-838892189725e30078715bd8555c08d1', 0)"
9,tcp://127.0.0.1:56246,OK,32,27104,b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07float64\x94\x93\x94.',numpy.float64,{},"({'action': 'compute', 'start': 1652304658.5598354, 'stop': 1652304658.570451},)","('series-sum-chunk-25935b77d571448ff443f276a179f31f-59220bbfba86fa5b47b80df5a5587a70', 2)"


In [61]:
# the startstops are nested. we need to unnest this for action stuff only.
exploded_df = hx_ddf.explode("startstops")

In [62]:
# AAAAGH THIS TOOK FOREVER TO FIGURE OUT O_O
exploded_df['action'] = exploded_df['startstops'].apply(lambda x: x['action'], meta = ("action", str))
exploded_df['start'] = exploded_df['startstops'].apply(lambda x: x['start'], meta = ("start", np.float64))
exploded_df['end'] = exploded_df['startstops'].apply(lambda x: x['stop'], meta = ("stop", np.float64))
exploded_df['action_duration'] = exploded_df['end'] - exploded_df['start']

In [63]:
exploded_df_only_agg_fields = exploded_df[['worker', 'action', 'action_duration']]

In [64]:
time_per_worker_and_action = exploded_df_only_agg_fields.groupby(['worker','action']).agg("sum")

This is final for time_per_worker_and_action

In [65]:
time_per_worker_and_action.head(20)

Unnamed: 0_level_0,Unnamed: 1_level_0,action_duration
worker,action,Unnamed: 2_level_1
tcp://127.0.0.1:56243,compute,0.603825
tcp://127.0.0.1:56243,deserialize,0.324824
tcp://127.0.0.1:56243,transfer,0.022714
tcp://127.0.0.1:56246,compute,0.604824
tcp://127.0.0.1:56246,deserialize,0.300908
tcp://127.0.0.1:56246,transfer,0.02311
tcp://127.0.0.1:56249,compute,0.611411
tcp://127.0.0.1:56249,deserialize,0.30065
tcp://127.0.0.1:56253,compute,0.671207
tcp://127.0.0.1:56253,deserialize,0.294801


In [66]:
# now get nbytes per worker

In [67]:
nbytes_per_worker = hx_ddf[['worker', 'nbytes']].groupby(["worker"]).agg("sum")

In [68]:
nbytes_per_worker.head(20)

Unnamed: 0_level_0,nbytes
worker,Unnamed: 1_level_1
tcp://127.0.0.1:56243,55600448
tcp://127.0.0.1:56246,56012688
tcp://127.0.0.1:56249,56012624
tcp://127.0.0.1:56253,62599088


In [69]:
dfx = dd.read_parquet(
    "./tmp/trip_data_['yellow_tripdata_2013-11.csv']", 
    storage_options={"anon": True, 'use_ssl': True})

In [70]:
dfx.head(10)

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,CMT,2013-11-25 15:53:33,2013-11-25 16:00:51,1,0.6,-73.978104,40.752966,1,N,-73.985756,40.762685,CRD,6.0,1.0,0.5,1.0,0.0,8.5
1,CMT,2013-11-25 15:24:41,2013-11-25 15:30:18,1,0.5,-73.982313,40.764827,1,N,-73.982129,40.758889,CRD,5.5,0.0,0.5,3.0,0.0,9.0
2,CMT,2013-11-25 09:43:42,2013-11-25 10:02:57,1,3.3,-73.982013,40.762507,1,N,-74.006854,40.719582,CRD,15.0,0.0,0.5,2.0,0.0,17.5
3,CMT,2013-11-25 06:49:58,2013-11-25 07:04:22,1,3.8,-73.976005,40.744481,1,N,-74.016063,40.717298,CRD,14.0,0.0,0.5,2.9,0.0,17.4
4,CMT,2013-11-25 10:02:12,2013-11-25 10:17:15,1,2.2,-73.952625,40.780962,1,N,-73.98163,40.777978,CRD,12.0,0.0,0.5,2.0,0.0,14.5
5,CMT,2013-11-25 15:18:07,2013-11-25 15:33:25,1,1.0,-73.992423,40.749517,1,N,-73.98816,40.746557,CRD,10.0,0.0,0.5,2.22,0.0,12.72
6,CMT,2013-11-25 21:20:50,2013-11-25 21:26:22,1,1.1,-73.946371,40.775369,1,N,-73.95309,40.785103,CRD,6.5,0.5,0.5,1.5,0.0,9.0
7,CMT,2013-11-25 07:00:55,2013-11-25 07:04:37,1,1.2,-73.983357,40.767193,1,N,-73.978394,40.75558,CRD,5.5,0.0,0.5,1.0,0.0,7.0
8,CMT,2013-11-25 05:34:37,2013-11-25 05:48:15,1,3.6,-73.971555,40.794548,1,N,-73.975399,40.755404,CRD,14.5,0.5,0.5,1.0,0.0,16.5
9,CMT,2013-11-25 08:31:21,2013-11-25 08:55:05,1,5.9,-73.94764,40.830465,1,N,-73.972323,40.76332,CRD,21.0,0.0,0.5,3.0,0.0,24.5
