# Performance Analysis

### Conducted by: </br>
        Ana Francisca Rocha (202208946)
        Ana Amorim (2022)
        Pedro Rufino (202208600)

## Introduction

This notebook aims to evaluate the performance of several Python libraries for processing large-scale datasets, as part of the Large-Scale Data Science course.

The work is divided into two main phases:

- In the first phase, we reproduce an experiment comparing basic operations between Koalas (PySpark) and Dask.

- In the second phase, we extend the analysis by including additional libraries (Modin, JobLib, and RAPIDS) and testing different combinations on the large dataset.

## Experiment of "NYC taxi driver"

Source: https://www.databricks.com/blog/2021/04/07/benchmark-koalas-pyspark-and-dask.html

In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
import pyspark.pandas as ps
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
import pyspark
import dask
import dask.dataframe as dd
import pandas as pd
import numpy as np
import time
from dask.distributed import Client, LocalCluster

print('pandas version: %s' % pd.__version__)
print('numpy version: %s' % np.__version__)
print('pyspark version: %s' % pyspark.__version__)
print('dask version: %s' % dask.__version__)

pandas version: 2.2.3
numpy version: 2.2.6
pyspark version: 4.0.0
dask version: 2025.5.1


#### Regular Dataset

In [4]:
spark = SparkSession.builder \
    .appName("Read Parquet Example") \
    .getOrCreate()


parquet_file = "green_tripdata_2015-01.parquet"
data = spark.read.parquet(parquet_file)


Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: Cannot assign requested address: bind: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
	at java.base/sun.nio.ch.Net.bind0(Native Method)
	at java.base/sun.nio.ch.Net.bind(Net.java:565)
	at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:344)
	at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:301)
	at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:141)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:561)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1281)
	at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:600)
	at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:579)
	at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:922)
	at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:259)
	at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:380)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [None]:
data.dtypes

[('VendorID', 'bigint'),
 ('lpep_pickup_datetime', 'timestamp_ntz'),
 ('lpep_dropoff_datetime', 'timestamp_ntz'),
 ('store_and_fwd_flag', 'string'),
 ('RatecodeID', 'bigint'),
 ('PULocationID', 'bigint'),
 ('DOLocationID', 'bigint'),
 ('passenger_count', 'bigint'),
 ('trip_distance', 'double'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'double'),
 ('ehail_fee', 'int'),
 ('improvement_surcharge', 'double'),
 ('total_amount', 'double'),
 ('payment_type', 'bigint'),
 ('trip_type', 'double'),
 ('congestion_surcharge', 'int')]

In [None]:
print("Number of rows:", data.count())

Number of rows: 1508493


In [None]:
expr_filter = (data['tip_amount'] >= 1) & (data['tip_amount'] <= 5)

filtered_data = data.filter(expr_filter)
percentage = (filtered_data.count() / data.count()) * 100

print(f'Filtered data is {percentage:.4f}% of total data')

Filtered data is 33.7250% of total data


#### Large Dataset

In [None]:
large_df_url = "yellow_tripdata_2011-*.parquet"
large_data = spark.read.parquet(large_df_url)

In [None]:
large_data.dtypes

[('VendorID', 'bigint'),
 ('tpep_pickup_datetime', 'timestamp_ntz'),
 ('tpep_dropoff_datetime', 'timestamp_ntz'),
 ('passenger_count', 'bigint'),
 ('trip_distance', 'double'),
 ('RatecodeID', 'bigint'),
 ('store_and_fwd_flag', 'string'),
 ('PULocationID', 'bigint'),
 ('DOLocationID', 'bigint'),
 ('payment_type', 'bigint'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'double'),
 ('improvement_surcharge', 'double'),
 ('total_amount', 'double'),
 ('congestion_surcharge', 'double'),
 ('airport_fee', 'double')]

In [None]:
print("Number of rows:", large_data.count())

Number of rows: 13464997


In [None]:
filtered_data = large_data.filter((col("tip_amount") >= 1) & (col("tip_amount") <= 5))

total_count = large_data.count()
filtered_count = filtered_data.count()
percentage = (filtered_count / total_count) * 100

print(f"Filtered data is {percentage:.4f}% of total data")

Filtered data is 35.8422% of total data


#### Small Dataset

In [None]:
url = "yellow_tripdata_2012-01.parquet"
data = pd.read_parquet(url)
small_data = data.head(50000)
small_data.to_parquet('yellow_tripdata_2012-01_subset-50000.parquet')

In [None]:
spark = SparkSession.builder \
    .appName("Read Parquet Example") \
    .getOrCreate()


small_df_url = 'yellow_tripdata_2012-01_subset-50000.parquet'
small_data = spark.read.parquet(small_df_url)

In [None]:
small_data.dtypes

[('VendorID', 'bigint'),
 ('tpep_pickup_datetime', 'timestamp_ntz'),
 ('tpep_dropoff_datetime', 'timestamp_ntz'),
 ('passenger_count', 'bigint'),
 ('trip_distance', 'double'),
 ('RatecodeID', 'bigint'),
 ('store_and_fwd_flag', 'string'),
 ('PULocationID', 'bigint'),
 ('DOLocationID', 'bigint'),
 ('payment_type', 'bigint'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'double'),
 ('improvement_surcharge', 'double'),
 ('total_amount', 'double'),
 ('congestion_surcharge', 'int'),
 ('airport_fee', 'int')]

In [None]:
print("Number of rows:", small_data.count())

Number of rows: 50000


In [None]:
expr_filter = (small_data['tip_amount'] >= 1) & (small_data['tip_amount'] <= 5)

filtered_data = small_data.filter(expr_filter)
percentage = (filtered_data.count() / small_data.count()) * 100

print(f'Filtered data is {percentage:.4f}% of total data')

Filtered data is 30.2000% of total data


In [None]:
def benchmark(f, df, benchmarks, name, **kwargs):

    start_time = time.monotonic()
    ret = f(df, **kwargs)
    benchmarks['duration'].append(time.monotonic() - start_time)
    benchmarks['task'].append(name)
    print(f"{name} took: {benchmarks['duration'][-1]} seconds")

def get_results(benchmarks):
    """Return a pandas DataFrame containing benchmark results."""
    return pd.DataFrame.from_dict(benchmarks)

### Koalas

#### Operations

In [None]:
def count(df=None):
    return df.count()

def mean(df):
    return df.select(F.mean("fare_amount")).collect()[0][0]

def standard_deviation(df):
    return float(df.select(F.stddev("fare_amount")).collect()[0][0])

def mean_of_sum(df):
    return float(df.select((F.col("fare_amount") + F.col("tip_amount")).alias("total")).agg(F.mean("total")).collect()[0][0])

def sum_columns(df):
    x = df.fare_amount + df.tip_amount
    return x

def mean_of_product(df):
    return float(df.select((F.col("fare_amount") * F.col("tip_amount")).alias("product")).agg(F.mean("product")).collect()[0][0])

def product_columns(df):
    x = df.fare_amount * df.tip_amount
    return x

def value_counts(df):
    return df.groupBy("fare_amount").count().orderBy(F.desc("count"))


#   Since the datasets provided by NYC TLC Trip Record Data no longer have longitude and latitude values (only the pickup and dropout places IDs),
# we used arbitrary longitude and latitude values. The goal of this experiment is to compare the computational cost of the calculations, hence
# the values are not relevant.
def complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)

    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    return ret

def mean_of_complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)

    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    return ret.mean()


def groupby_statistics(df):
    gb = df.groupBy("passenger_count").agg(
        F.mean("fare_amount").alias("fare_amount_mean"),
        F.stddev("fare_amount").alias("fare_amount_std"),
        F.mean("tip_amount").alias("tip_amount_mean"),
        F.stddev("tip_amount").alias("tip_amount_std")
    )
    return gb

def join_count(df, other):
    return df.join(other.hint("broadcast"), on="passenger_count").count()

def join_data(df, other):
    return df.join(other.hint("broadcast"), on="passenger_count")


##### Regular dataset

In [None]:
koalas_regular_df_benchmarks = {
    'duration': [],     # in seconds
    'task': []
}

regular_df_url = "green_tripdata_2015-01.parquet"
koalas_data = spark.read.parquet(regular_df_url)

grouped_stats = groupby_statistics(koalas_data)
other_pd = grouped_stats.toPandas()
other = spark.createDataFrame(other_pd)

In [None]:
def read_file_parquet(df=None):
    return spark.read.parquet(regular_df_url)

koalas_data = read_file_parquet()

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=koalas_regular_df_benchmarks, name='read file')
benchmark(count, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='count')
benchmark(mean, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='mean')
benchmark(standard_deviation, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='value counts')
benchmark(complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='complex arithmetic ops')
benchmark(mean_of_complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='mean of complex arithmetic ops')
benchmark(groupby_statistics, df=koalas_data, benchmarks=koalas_regular_df_benchmarks, name='groupby statistics')
benchmark(join_count, koalas_data, benchmarks=koalas_regular_df_benchmarks, name='join count', other=other)
benchmark(join_data, koalas_data, benchmarks=koalas_regular_df_benchmarks, name='join', other=other)

read file took: 0.06300000000010186 seconds
count took: 0.125 seconds
mean took: 0.23399999999946886 seconds
standard deviation took: 0.25 seconds
mean of columns addition took: 0.2820000000001528 seconds
addition of columns took: 0.0 seconds
mean of columns multiplication took: 0.31199999999989814 seconds
multiplication of columns took: 0.0 seconds
value counts took: 0.03099999999994907 seconds
complex arithmetic ops took: 0.0 seconds
mean of complex arithmetic ops took: 0.0 seconds
groupby statistics took: 0.04700000000048021 seconds
join count took: 13.234999999999673 seconds
join took: 0.015000000000327418 seconds


##### Small dataset

In [None]:
koalas_small_df_benchmarks = {
    'duration': [],     # in seconds
    'task': []
}

small_df_url = "yellow_tripdata_2012-01_subset-50000.parquet"
koalas_data = spark.read.parquet(small_df_url)

grouped_stats = groupby_statistics(koalas_data)
other_small_pd = grouped_stats.toPandas()
other_small = spark.createDataFrame(other_small_pd)

In [None]:
def read_file_parquet(df=None):
    return spark.read.parquet(small_df_url)

koalas_data = read_file_parquet()

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=koalas_small_df_benchmarks, name='read file')
benchmark(count, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='count')
benchmark(mean, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='mean')
benchmark(standard_deviation, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='value counts')
benchmark(complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='complex arithmetic ops')
benchmark(mean_of_complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='mean of complex arithmetic ops')
benchmark(groupby_statistics, df=koalas_data, benchmarks=koalas_small_df_benchmarks, name='groupby statistics')
benchmark(join_count, koalas_data, benchmarks=koalas_small_df_benchmarks, name='join count', other=other_small)
benchmark(join_data, koalas_data, benchmarks=koalas_small_df_benchmarks, name='join', other=other_small)

read file took: 0.10900000000037835 seconds
count took: 0.09400000000005093 seconds
mean took: 0.125 seconds
standard deviation took: 0.17199999999957072 seconds
mean of columns addition took: 0.14000000000032742 seconds
addition of columns took: 0.0 seconds
mean of columns multiplication took: 0.14099999999962165 seconds
multiplication of columns took: 0.0 seconds
value counts took: 0.016000000000531145 seconds
complex arithmetic ops took: 0.0 seconds
mean of complex arithmetic ops took: 0.0 seconds
groupby statistics took: 0.03099999999994907 seconds
join count took: 11.140999999999622 seconds
join took: 0.015000000000327418 seconds


##### Large dataset

In [None]:
koalas_large_df_benchmarks = {
    'duration': [],     # in seconds
    'task': []
}

large_df_url = "yellow_tripdata_2011-*.parquet"
koalas_data = spark.read.parquet(large_df_url)

grouped_stats = groupby_statistics(koalas_data)
other_large_pd = grouped_stats.toPandas()
other_large = spark.createDataFrame(other_large_pd)

In [None]:
def read_file_parquet(df=None):
    return spark.read.parquet(large_df_url)

koalas_data = read_file_parquet()

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=koalas_large_df_benchmarks, name='read file')
benchmark(count, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='count')
benchmark(mean, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='mean')
benchmark(standard_deviation, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='value counts')
benchmark(complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='complex arithmetic ops')
benchmark(mean_of_complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='mean of complex arithmetic ops')
benchmark(groupby_statistics, df=koalas_data, benchmarks=koalas_large_df_benchmarks, name='groupby statistics')
benchmark(join_count, koalas_data, benchmarks=koalas_large_df_benchmarks, name='join count', other=other_large)
benchmark(join_data, koalas_data, benchmarks=koalas_large_df_benchmarks, name='join', other=other_large)

read file took: 0.07800000000042928 seconds
count took: 0.1570000000001528 seconds
mean took: 0.6089999999994689 seconds
standard deviation took: 0.6559999999999491 seconds
mean of columns addition took: 0.9380000000001019 seconds
addition of columns took: 0.0 seconds
mean of columns multiplication took: 0.9369999999998981 seconds
multiplication of columns took: 0.0 seconds
value counts took: 0.016000000000531145 seconds
complex arithmetic ops took: 0.0 seconds
mean of complex arithmetic ops took: 0.0 seconds
groupby statistics took: 0.03099999999994907 seconds
join count took: 10.54699999999957 seconds
join took: 0.016000000000531145 seconds


#### Operations with filtering

##### Regular dataset

In [None]:
def read_file_parquet(df=None):
    return spark.read.parquet(regular_df_url)

koalas_data = read_file_parquet()

In [None]:
expr_filter = (koalas_data.tip_amount >= 1) & (koalas_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

koalas_filtered = filter_data(koalas_data)

In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered count')
benchmark(mean, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered groupby statistics')

other_pd = groupby_statistics(koalas_filtered).toPandas()
other_pd.columns = [f"{c[0]}_{c[1]}" if isinstance(c, tuple) else c for c in other_pd.columns]
other = spark.createDataFrame(other_pd)

benchmark(join_data, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered join count', other=other)

filtered count took: 0.21900000000005093 seconds
filtered mean took: 0.34400000000005093 seconds
filtered standard deviation took: 0.375 seconds
filtered mean of columns addition took: 0.31199999999989814 seconds
filtered addition of columns took: 0.0 seconds
filtered mean of columns multiplication took: 0.3279999999995198 seconds
filtered multiplication of columns took: 0.0 seconds
filtered mean of complex arithmetic ops took: 0.0 seconds
filtered complex arithmetic ops took: 0.0 seconds
filtered value counts took: 0.016000000000531145 seconds
filtered groupby statistics took: 0.03099999999994907 seconds
filtered join took: 0.016000000000531145 seconds
filtered join count took: 10.45299999999952 seconds


##### Small dataset

In [None]:
def read_file_parquet(df=None):
    return spark.read.parquet(small_df_url)

koalas_data = read_file_parquet()

In [None]:
expr_filter = (koalas_data.tip_amount >= 1) & (koalas_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

koalas_filtered = filter_data(koalas_data)

In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered count')
benchmark(mean, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered groupby statistics')

other_pd = groupby_statistics(koalas_filtered).toPandas()
other_pd.columns = [f"{c[0]}_{c[1]}" if isinstance(c, tuple) else c for c in other_pd.columns]
other = spark.createDataFrame(other_pd)

benchmark(join_data, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered join count', other=other)

filtered count took: 0.125 seconds
filtered mean took: 0.18800000000010186 seconds
filtered standard deviation took: 0.15599999999994907 seconds
filtered mean of columns addition took: 0.14100000000053114 seconds
filtered addition of columns took: 0.0 seconds
filtered mean of columns multiplication took: 0.15599999999994907 seconds
filtered multiplication of columns took: 0.0 seconds
filtered mean of complex arithmetic ops took: 0.0 seconds
filtered complex arithmetic ops took: 0.0 seconds
filtered value counts took: 0.01599999999962165 seconds
filtered groupby statistics took: 0.046000000000276486 seconds
filtered join took: 0.014999999999417923 seconds
filtered join count took: 10.532000000000153 seconds


##### Large dataset

In [None]:
def read_file_parquet(df=None):
    return spark.read.parquet(large_df_url)

koalas_data = read_file_parquet()

In [None]:
expr_filter = (koalas_data.tip_amount >= 1) & (koalas_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

koalas_filtered = filter_data(koalas_data)

In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered count')
benchmark(mean, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered groupby statistics')

other_pd = groupby_statistics(koalas_filtered).toPandas()
other_pd.columns = [f"{c[0]}_{c[1]}" if isinstance(c, tuple) else c for c in other_pd.columns]
other = spark.createDataFrame(other_pd)

benchmark(join_data, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_large_df_benchmarks, name='filtered join count', other=other)

filtered count took: 0.75 seconds
filtered mean took: 1.2039999999997235 seconds
filtered standard deviation took: 1.0 seconds
filtered mean of columns addition took: 1.0 seconds
filtered addition of columns took: 0.0 seconds
filtered mean of columns multiplication took: 1.0 seconds
filtered multiplication of columns took: 0.0 seconds
filtered mean of complex arithmetic ops took: 0.0 seconds
filtered complex arithmetic ops took: 0.0 seconds
filtered value counts took: 0.015000000000327418 seconds
filtered groupby statistics took: 0.03099999999994907 seconds
filtered join took: 0.0 seconds
filtered join count took: 11.108999999999469 seconds


#### Operations with filtering and caching

##### Regular dataset

In [None]:
def read_file_parquet(df=None):
    return spark.read.parquet(regular_df_url)

koalas_data = read_file_parquet()

In [None]:
expr_filter = (koalas_data.tip_amount >= 1) & (koalas_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

koalas_filtered = filter_data(koalas_data)

In [None]:
koalas_filtered_cached = koalas_filtered.cache()
print(f'Enforce caching: {koalas_filtered_cached.count()} rows of filtered data')

Enforce caching: 508740 rows of filtered data


In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached count')
benchmark(mean, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name ='filtered and cached mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name ='filtered and cached mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name ='filtered and cached value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached groupby statistics')

other_pd = groupby_statistics(koalas_filtered).toPandas()
other_pd.columns = [f"{c[0]}_{c[1]}" if isinstance(c, tuple) else c for c in other_pd.columns]
other = spark.createDataFrame(other_pd)

benchmark(join_data, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_regular_df_benchmarks, name='filtered and cached join count', other=other)

filtered and cached count took: 0.125 seconds
filtered and cached mean took: 0.1570000000001528 seconds
filtered and cached standard deviation took: 0.18699999999989814 seconds
filtered and cached mean of columns addition took: 0.21900000000005093 seconds
filtered and cached addition of columns took: 0.01599999999962165 seconds
filtered and cached mean of columns multiplication took: 0.20300000000042928 seconds
filtered and cached multiplication of columns took: 0.0 seconds
filtered and cached mean of complex arithmetic ops took: 0.0 seconds
filtered and cached complex arithmetic ops took: 0.0 seconds
filtered and cached value counts took: 0.03099999999994907 seconds
filtered and cached groupby statistics took: 0.03099999999994907 seconds
filtered and cached join took: 0.01599999999962165 seconds
filtered and cached join count took: 11.17200000000048 seconds


##### Small dataset

In [None]:
def read_file_parquet(df=None):
    return spark.read.parquet(small_df_url)

koalas_data = read_file_parquet()

In [None]:
expr_filter = (koalas_data.tip_amount >= 1) & (koalas_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

koalas_filtered = filter_data(koalas_data)

In [None]:
koalas_filtered_cached = koalas_filtered.cache()
print(f'Enforce caching: {koalas_filtered_cached.count()} rows of filtered data')

Enforce caching: 15100 rows of filtered data


In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached count')
benchmark(mean, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name ='filtered and cached mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name ='filtered and cached mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name ='filtered and cached value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached groupby statistics')

other_pd = groupby_statistics(koalas_filtered).toPandas()
other_pd.columns = [f"{c[0]}_{c[1]}" if isinstance(c, tuple) else c for c in other_pd.columns]
other = spark.createDataFrame(other_pd)

benchmark(join_data, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_small_df_benchmarks, name='filtered and cached join count', other=other)

filtered and cached count took: 0.09400000000005093 seconds
filtered and cached mean took: 0.07899999999972351 seconds
filtered and cached standard deviation took: 0.125 seconds
filtered and cached mean of columns addition took: 0.1710000000002765 seconds
filtered and cached addition of columns took: 0.0 seconds
filtered and cached mean of columns multiplication took: 0.125 seconds
filtered and cached multiplication of columns took: 0.0 seconds
filtered and cached mean of complex arithmetic ops took: 0.0 seconds
filtered and cached complex arithmetic ops took: 0.0 seconds
filtered and cached value counts took: 0.01599999999962165 seconds
filtered and cached groupby statistics took: 0.03099999999994907 seconds
filtered and cached join took: 0.01599999999962165 seconds
filtered and cached join count took: 12.42200000000048 seconds


##### Large dataset

In [None]:
def read_file_parquet(df=None):
    return spark.read.parquet(large_df_url)

koalas_data = read_file_parquet()

koalas_benchmarks = {
    'duration': [],     # in seconds
    'task': []
}

In [None]:
expr_filter = (koalas_data.tip_amount >= 1) & (koalas_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

koalas_filtered = filter_data(koalas_data)

In [None]:
koalas_filtered_cached = koalas_filtered.cache()
print(f'Enforce caching: {koalas_filtered_cached.count()} rows of filtered data')

Enforce caching: 4826147 rows of filtered data


In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached count')
benchmark(mean, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered and cached mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered and cached mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered and cached value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached groupby statistics')

other_pd = groupby_statistics(koalas_filtered).toPandas()
other_pd.columns = [f"{c[0]}_{c[1]}" if isinstance(c, tuple) else c for c in other_pd.columns]
other = spark.createDataFrame(other_pd)

benchmark(join_data, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered and cached join count', other=other)

filtered and cached count took: 2.233999999999469 seconds
filtered and cached mean took: 2.860000000000582 seconds
filtered and cached standard deviation took: 3.092999999999847 seconds
filtered and cached mean of columns addition took: 3.032000000000153 seconds
filtered and cached addition of columns took: 0.0 seconds
filtered and cached mean of columns multiplication took: 2.592999999999847 seconds
filtered and cached multiplication of columns took: 0.0 seconds
filtered and cached mean of complex arithmetic ops took: 0.0 seconds
filtered and cached complex arithmetic ops took: 0.0 seconds
filtered and cached value counts took: 0.032000000000152795 seconds
filtered and cached groupby statistics took: 0.03099999999994907 seconds
filtered and cached join took: 0.01599999999962165 seconds
filtered and cached join count took: 13.780999999999949 seconds


### Dask

In [None]:
cluster = LocalCluster(memory_limit='6GB')
client = Client(cluster)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 54798 instead


2025-05-29 17:07:23,117 - distributed.scheduler - ERROR - Task ('read_parquet-880d90cb855161ccab6f8dc3328392d7', 0) marked as failed because 4 workers died while trying to run it
Future exception was never retrieved
future: <Future finished exception=PermissionError(13, 'Acesso negado', None, 5, None)>
Traceback (most recent call last):
  File "C:\Users\PC\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\distributed\process.py", line 55, in _call_and_set_future
    res = func(*args, **kwargs)
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.3056.0_x64__qbz5n2kfra8p0\lib\multiprocessing\process.py", line 140, in kill
    self._popen.kill()
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.3056.0_x64__qbz5n2kfra8p0\lib\multiprocessing\popen_spawn_win32.py", line 123, in terminate
    _winapi.TerminateProcess(int(self._handle), TERMINATE)
PermissionError: [

#### Standard operations

In [None]:
def count(df=None):
    return len(df)

def count_index_length(df=None):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean().compute()

def standard_deviation(df):
    return df.fare_amount.std().compute()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean().compute()

def sum_columns(df):
    return (df.fare_amount + df.tip_amount).compute()

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean().compute()

def product_columns(df):
    return (df.fare_amount * df.tip_amount).compute()

def value_counts(df):
    return df.fare_amount.value_counts().compute()

#   In the original experiment, the following two functions used the longitude and latitude values of the pickup and the dropout places.
#   Since the datasets provided by NYC TLC Trip Record Data no longer have longitude and latitude values (only the pickup and dropout places IDs),
# we used arbitrary longitude and latitude values. The goal of this experiment is to compare the computational cost of the calculations, hence
# the values are not relevant.
def mean_of_complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)
    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean()

def complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)
    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret

def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg({
        'fare_amount': ['mean', 'std'],
        'tip_amount': ['mean', 'std']
    })

dask_data = dd.read_parquet(regular_df_url)
other = groupby_statistics(dask_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))

def join_data(df, other):
    return dd.merge(df, other, left_index=True, right_index=True).compute()

##### Regular dataset

In [None]:
dask_regular_df_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}

In [None]:
def read_file_parquet(df=None):
    return dd.read_parquet(regular_df_url)

dask_data = read_file_parquet()

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=dask_regular_df_benchmarks, name='read file')
benchmark(count, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='count')
benchmark(count_index_length, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='count index length')
benchmark(mean, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_data, benchmarks=dask_regular_df_benchmarks, name='groupby statistics')
benchmark(join_count, dask_data, benchmarks=dask_regular_df_benchmarks, name='join count', other=other)
benchmark(join_data, dask_data, benchmarks=dask_regular_df_benchmarks, name='join', other=other)

read file took: 0.031000000000858563 seconds
count took: 0.01599999999962165 seconds
count index length took: 1.8119999999998981 seconds
mean took: 0.21899999999914144 seconds
standard deviation took: 0.18800000000010186 seconds
mean of columns addition took: 0.23400000000037835 seconds
addition of columns took: 0.18800000000010186 seconds
mean of columns multiplication took: 0.2029999999995198 seconds
multiplication of columns took: 0.18699999999989814 seconds
value counts took: 1.8130000000001019 seconds
mean of complex arithmetic ops took: 0.0 seconds
complex arithmetic ops took: 0.0 seconds
groupby statistics took: 0.0 seconds
join count took: 0.5930000000007567 seconds
join took: 1.2819999999992433 seconds


##### Small dataset

In [None]:
dask_small_df_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}

In [None]:
def read_file_parquet(df=None):
    return dd.read_parquet(small_df_url)

dask_data = read_file_parquet()

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=dask_small_df_benchmarks, name='read file')
benchmark(count, df=dask_data, benchmarks=dask_small_df_benchmarks, name='count')
benchmark(count_index_length, df=dask_data, benchmarks=dask_small_df_benchmarks, name='count index length')
benchmark(mean, df=dask_data, benchmarks=dask_small_df_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_data, benchmarks=dask_small_df_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_data, benchmarks=dask_small_df_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_data, benchmarks=dask_small_df_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_data, benchmarks=dask_small_df_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_data, benchmarks=dask_small_df_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_data, benchmarks=dask_small_df_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_data, benchmarks=dask_small_df_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_data, benchmarks=dask_small_df_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_data, benchmarks=dask_small_df_benchmarks, name='groupby statistics')
benchmark(join_count, dask_data, benchmarks=dask_small_df_benchmarks, name='join count', other=other)
benchmark(join_data, dask_data, benchmarks=dask_small_df_benchmarks, name='join', other=other)

read file took: 0.01599999999962165 seconds
count took: 0.01599999999962165 seconds
count index length took: 0.07799999999951979 seconds
mean took: 0.04700000000048021 seconds
standard deviation took: 0.046000000000276486 seconds
mean of columns addition took: 0.06300000000010186 seconds
addition of columns took: 0.06199999999989814 seconds
mean of columns multiplication took: 0.07899999999972351 seconds
multiplication of columns took: 0.046000000000276486 seconds
value counts took: 1.2039999999997235 seconds
mean of complex arithmetic ops took: 0.0 seconds
complex arithmetic ops took: 0.0 seconds
groupby statistics took: 0.031000000000858563 seconds
join count took: 0.43699999999989814 seconds
join took: 0.4069999999992433 seconds


##### Large dataset

In [None]:
dask_large_df_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}

In [None]:
def read_file_parquet(df=None):
    return dd.read_parquet(large_df_url)

dask_data = read_file_parquet()

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=dask_large_df_benchmarks, name='read file')
benchmark(count, df=dask_data, benchmarks=dask_large_df_benchmarks, name='count')
benchmark(count_index_length, df=dask_data, benchmarks=dask_large_df_benchmarks, name='count index length')
benchmark(mean, df=dask_data, benchmarks=dask_large_df_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_data, benchmarks=dask_large_df_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_data, benchmarks=dask_large_df_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_data, benchmarks=dask_large_df_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_data, benchmarks=dask_large_df_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_data, benchmarks=dask_large_df_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_data, benchmarks=dask_large_df_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_data, benchmarks=dask_large_df_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_data, benchmarks=dask_large_df_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_data, benchmarks=dask_large_df_benchmarks, name='groupby statistics')
benchmark(join_count, dask_data, benchmarks=dask_large_df_benchmarks, name='join count', other=other)
benchmark(join_data, dask_data, benchmarks=dask_large_df_benchmarks, name='join', other=other)

read file took: 0.015000000001236913 seconds
count took: 0.01599999999962165 seconds
count index length took: 0.030999999999039574 seconds
mean took: 0.6560000000008586 seconds
standard deviation took: 0.9850000000005821 seconds
mean of columns addition took: 1.0939999999991414 seconds
addition of columns took: 1.5619999999998981 seconds
mean of columns multiplication took: 1.0630000000001019 seconds
multiplication of columns took: 1.4369999999998981 seconds
value counts took: 2.561999999999898 seconds
mean of complex arithmetic ops took: 0.0 seconds
complex arithmetic ops took: 0.0 seconds
groupby statistics took: 0.01600000000144064 seconds
join count took: 0.6719999999986612 seconds
join took: 26.968000000000757 seconds


#### Operations with filtering

##### Regular dataset

In [None]:
def read_file_parquet(df=None):
    return dd.read_parquet(regular_df_url)

dask_data = read_file_parquet()

In [None]:
expr_filter = (dask_data.tip_amount >= 1) & (dask_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

dask_filtered = filter_data(dask_data)

In [None]:
benchmark(count, dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered count index length')
benchmark(mean, dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_regular_df_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_regular_df_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_regular_df_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0] +'_'+ e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_filtered, benchmarks=dask_regular_df_benchmarks, name='filtered join', other=other)

filtered count took: 0.2039999999997235 seconds
filtered count index length took: 0.1710000000002765 seconds
filtered mean took: 0.21899999999914144 seconds
filtered standard deviation took: 0.26600000000144064 seconds
filtered mean of columns addition took: 0.21899999999914144 seconds
filtered addition of columns took: 0.25 seconds
filtered mean of columns multiplication took: 0.18699999999989814 seconds
filtered multiplication of columns took: 0.25 seconds
filtered mean of complex arithmetic ops took: 0.0 seconds
filtered complex arithmetic ops took: 0.0 seconds
filtered value counts took: 0.2970000000004802 seconds
filtered groupby statistics took: 0.01599999999962165 seconds
filtered join count took: 0.4850000000005821 seconds
filtered join took: 1.3590000000003783 seconds


##### Small dataset

In [None]:
def read_file_parquet(df=None):
    return dd.read_parquet(small_df_url)

dask_data = read_file_parquet()

In [None]:
expr_filter = (dask_data.tip_amount >= 1) & (dask_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

dask_filtered = filter_data(dask_data)

In [None]:
benchmark(count, dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered count index length')
benchmark(mean, dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_small_df_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_small_df_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_small_df_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0] +'_'+ e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_filtered, benchmarks=dask_small_df_benchmarks, name='filtered join', other=other)

filtered count took: 0.09399999999914144 seconds
filtered count index length took: 0.0930000000007567 seconds
filtered mean took: 0.07899999999972351 seconds
filtered standard deviation took: 0.0930000000007567 seconds
filtered mean of columns addition took: 0.125 seconds
filtered addition of columns took: 0.10999999999876309 seconds
filtered mean of columns multiplication took: 0.15600000000085856 seconds
filtered multiplication of columns took: 0.07799999999951979 seconds
filtered mean of complex arithmetic ops took: 0.0 seconds
filtered complex arithmetic ops took: 0.0 seconds
filtered value counts took: 0.2970000000004802 seconds
filtered groupby statistics took: 0.01599999999962165 seconds
filtered join count took: 0.23500000000058208 seconds
filtered join took: 0.21899999999914144 seconds


#### Large dataset

In [None]:
def read_file_parquet(df=None):
    return dd.read_parquet(large_df_url)

dask_data = read_file_parquet()

In [None]:
expr_filter = (dask_data.tip_amount >= 1) & (dask_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

dask_filtered = filter_data(dask_data)

In [None]:
benchmark(count, dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered count index length')
benchmark(mean, dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_large_df_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_large_df_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_large_df_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0] +'_'+ e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_filtered, benchmarks=dask_large_df_benchmarks, name='filtered join', other=other)

filtered count took: 0.8430000000007567 seconds
filtered count index length took: 0.0319999999992433 seconds
filtered mean took: 1.25 seconds
filtered standard deviation took: 1.5619999999998981 seconds
filtered mean of columns addition took: 1.25 seconds
filtered addition of columns took: 1.7810000000008586 seconds
filtered mean of columns multiplication took: 1.2189999999991414 seconds
filtered multiplication of columns took: 1.735000000000582 seconds
filtered mean of complex arithmetic ops took: 0.0 seconds
filtered complex arithmetic ops took: 0.0 seconds
filtered value counts took: 1.375 seconds
filtered groupby statistics took: 0.01600000000144064 seconds
filtered join count took: 2.3289999999997235 seconds
filtered join took: 28.03100000000086 seconds


#### Operations with filtering and caching

In [None]:
dask_filtered = client.persist(dask_filtered)

from distributed import wait
print('Waiting until all futures are finished')
wait(dask_filtered)
print('All futures are finished')

dask_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}

Waiting until all futures are finished
All futures are finished


In [None]:
benchmark(count, dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached count index length')
benchmark(mean, dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_benchmarks, name ='filtered and cached mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_benchmarks, name ='filtered and cached mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_benchmarks, name ='filtered and cached value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached groupby statistics')

other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached join count', other=other)
benchmark(join_data, dask_filtered, benchmarks=dask_benchmarks, name='filtered and cached join', other=other)

KilledWorker: Attempted to run task ('read_parquet-880d90cb855161ccab6f8dc3328392d7', 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:55467. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

In [None]:
client.restart()

## EXPERIMENT 2: Running datasets with different combinations

### Dask + JobLib

#### Regular Dataset

In [None]:
import dask
from joblib import Parallel, delayed, parallel_backend
from dask.distributed import Client

client = Client()

In [None]:
large_dask_joblib_benchmarks = {
    'duration': [],     # in seconds
    'task': []
}

In [None]:
def read_file_parquet():
    return pd.read_parquet(large_df_url)

joblib_dask_data = read_file_parquet()

def count(df):
    return len(df)

def count_index_length(df):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean()

def standard_deviation(df):
    return df.fare_amount.std()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean()

def sum_columns(df):
    return df.fare_amount + df.tip_amount

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean()

def product_columns(df):
    return df.fare_amount * df.tip_amount

def value_counts(df):
    return df.fare_amount.value_counts()

def complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)

    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat

    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2 +
            np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) *
            np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)

    return np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1 - temp)), 2)

def mean_of_complicated_arithmetic_operation(df):
    values = [complicated_arithmetic_operation(df) for _ in range(1000)]
    return np.mean(values)

def groupby_statistics(df):
    gb = df.groupby(by='passenger_count').agg({
        'fare_amount': ['mean', 'std'],
        'tip_amount': ['mean', 'std']
    })
    return gb

other = groupby_statistics(joblib_dask_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(pd.merge(df, other, left_index=True, right_index=True))

def join_data(df, other):
    return pd.merge(df, other, left_index=True, right_index=True)

In [None]:
benchmark(count, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='count')
benchmark(count_index_length, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='count index length')
benchmark(mean, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='mean')
benchmark(standard_deviation, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='groupby statistics')
benchmark(join_count, joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='join count', other=other)
benchmark(join_data, joblib_dask_data, benchmarks=large_dask_joblib_benchmarks, name='join', other=other)


count took: 0.0 seconds
count index length took: 0.0 seconds
mean took: 0.07800000000133878 seconds
standard deviation took: 0.31199999999989814 seconds
mean of columns addition took: 0.1569999999992433 seconds
addition of columns took: 0.07799999999951979 seconds
mean of columns multiplication took: 0.125 seconds
multiplication of columns took: 0.07800000000133878 seconds
value counts took: 0.26599999999962165 seconds
mean of complex arithmetic ops took: 0.031000000000858563 seconds
complex arithmetic ops took: 0.0 seconds
groupby statistics took: 0.625 seconds
join count took: 0.06300000000010186 seconds
join took: 0.0 seconds


### Dask + Modin

In [None]:
import modin.pandas as mpd
from modin.config import Engine
import os
os.environ["MODIN_ENGINE"] = "dask"

print('modin version: %s' % mpd.__version__)

Engine.put("dask")

modin version: 0.32.0


In [None]:
dask_modin_benchmarks = {
    'duration': [],     # in seconds
    'task': []
}

#### Standard Operations

In [None]:
def read_file_parquet(df=None):
    return mpd.read_parquet(large_df_url)

dask_modin_data = read_file_parquet()

def count(df=None):
    return len(df)

def count_index_length(df=None):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean()

def standard_deviation(df):
    return df.fare_amount.std()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean()

def sum_columns(df):
    x = df.fare_amount + df.tip_amount
    return x

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean()

def product_columns(df):
    x = df.fare_amount * df.tip_amount
    return x

def value_counts(df):
    val_counts = df.fare_amount.value_counts()
    return val_counts


#   In the original experiment, the following two functions used the longitude and latitude values of the pickup and the dropout places.
#   Since the datasets provided by NYC TLC Trip Record Data no longer have longitude and latitude values (only the pickup and dropout places IDs),
# we used arbitrary longitude and latitude values. The goal of this experiment is to compare the computational cost of the calculations, hence
# the values are not relevant.
def complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)

    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    return ret

def mean_of_complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)

    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    return ret.mean()

def groupby_statistics(df):
    gb = df.groupby(by='passenger_count').agg(
      {
        'fare_amount': ['mean', 'std'],
        'tip_amount': ['mean', 'std']
      }
    )
    return gb

other = groupby_statistics(dask_modin_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(mpd.merge(df, other, left_index=True, right_index=True))

def join_data(df, other):
    return mpd.merge(df, other, left_index=True, right_index=True)

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=dask_modin_benchmarks, name='read file')
benchmark(count, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='count')
benchmark(count_index_length, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='count index length')
benchmark(mean, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_modin_data, benchmarks=dask_modin_benchmarks, name='groupby statistics')
benchmark(join_count, dask_modin_data, benchmarks=dask_modin_benchmarks, name='join count', other=other)
benchmark(join_data, dask_modin_data, benchmarks=dask_modin_benchmarks, name='join', other=other)

read file took: 1.1719999999913853 seconds
count took: 0.0 seconds
count index length took: 0.0 seconds
mean took: 0.10899999999674037 seconds
standard deviation took: 0.15600000001722947 seconds
mean of columns addition took: 0.1870000000053551 seconds
addition of columns took: 0.03200000000651926 seconds
mean of columns multiplication took: 0.375 seconds
multiplication of columns took: 0.046000000002095476 seconds


the groupby keys will be sorted anyway, although the 'sort=False' was passed. See the following issue for more details: https://github.com/modin-project/modin/issues/3571.


value counts took: 0.42199999999138527 seconds
mean of complex arithmetic ops took: 0.0 seconds
complex arithmetic ops took: 0.0 seconds
groupby statistics took: 3.062999999994645 seconds
join count took: 2.9690000000118744 seconds
join took: 2.812000000005355 seconds


#### Operations with filtering

In [None]:
expr_filter = (dask_modin_data.tip_amount >= 1) & (dask_modin_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

dask_modin_filtered = filter_data(dask_modin_data)

In [None]:
benchmark(count, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered count')
benchmark(count_index_length, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered count index length')
benchmark(mean, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_modin_filtered)
other.columns = pd.Index([e[0] +'_'+ e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered join', other=other)

filtered count took: 0.09399999998277053 seconds
filtered count index length took: 0.0 seconds
filtered mean took: 0.07800000000861473 seconds
filtered standard deviation took: 0.125 seconds
filtered mean of columns addition took: 0.1870000000053551 seconds
filtered addition of columns took: 0.046999999991385266 seconds
filtered mean of columns multiplication took: 0.235000000015134 seconds
filtered multiplication of columns took: 0.030999999988125637 seconds
filtered mean of complex arithmetic ops took: 0.0 seconds
filtered complex arithmetic ops took: 0.0 seconds
filtered value counts took: 0.7179999999934807 seconds
filtered groupby statistics took: 1.1410000000032596 seconds
filtered join count took: 1.0780000000086147 seconds
filtered join took: 1.0780000000086147 seconds


#### Operations with filtering and caching

In [None]:
df = dd.read_parquet(large_df_url)
filtered = df[df.fare_amount > 10]
filtered = client.persist(filtered)
wait(filtered)


DoneAndNotDoneFutures(done={<Future: finished, type: pandas.core.frame.DataFrame, key: ('getitem-2951f6bd6cf190ee8e7888115d853521', 0)>}, not_done=set())

In [None]:
benchmark(count, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered count')
benchmark(count_index_length, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered count index length')
benchmark(mean, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_modin_filtered)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_modin_filtered, benchmarks=dask_modin_benchmarks, name='filtered join', other=other)

filtered count took: 0.0 seconds
filtered count index length took: 0.0 seconds
filtered mean took: 0.10899999999674037 seconds
filtered standard deviation took: 0.07800000000861473 seconds
filtered mean of columns addition took: 0.15599999998812564 seconds
filtered addition of columns took: 0.0629999999946449 seconds
filtered mean of columns multiplication took: 0.21900000001187436 seconds
filtered multiplication of columns took: 0.030999999988125637 seconds
filtered mean of complex arithmetic ops took: 0.0 seconds
filtered complex arithmetic ops took: 0.0 seconds
filtered value counts took: 0.40600000001722947 seconds
filtered groupby statistics took: 1.1410000000032596 seconds
filtered join count took: 1.125 seconds
filtered join took: 1.062000000005355 seconds


### Dask + Rapids

In [None]:
import dask
from dask.distributed import Client
import dask.dataframe as dd

dask.config.set({"dataframe.backend": "cudf"})

<dask.config.set at 0x1a23e0f7910>

In [None]:
dask_rapids_benchmarks = {
    'duration': [],     # in seconds
    'task': []
}

#### Standard Operations

In [None]:
def read_file_parquet(df=None):
    return dd.read_parquet(large_df_url)

dask_rapids_data = read_file_parquet()

def count(df=None):
    return len(df)

def count_index_length(df=None):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean()

def standard_deviation(df):
    return df.fare_amount.std()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean()

def sum_columns(df):
    x = df.fare_amount + df.tip_amount
    return x

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean()

def product_columns(df):
    x = df.fare_amount * df.tip_amount
    return x

def value_counts(df):
    val_counts = df.fare_amount.value_counts()
    return val_counts


#   In the original experiment, the following two functions used the longitude and latitude values of the pickup and the dropout places.
#   Since the datasets provided by NYC TLC Trip Record Data no longer have longitude and latitude values (only the pickup and dropout places IDs),
# we used arbitrary longitude and latitude values. The goal of this experiment is to compare the computational cost of the calculations, hence
# the values are not relevant.
def complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)

    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    return ret

def mean_of_complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)

    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    return ret.mean()

def groupby_statistics(df):
    gb = df.groupby(by='passenger_count').agg(
      {
        'fare_amount': ['mean', 'std'],
        'tip_amount': ['mean', 'std']
      }
    )
    return gb

other = groupby_statistics(dask_rapids_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))

def join_data(df, other):
    return dd.merge(df, other, left_index=True, right_index=True)

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=dask_rapids_benchmarks, name='read file')
benchmark(count, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='count')
benchmark(count_index_length, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='count index length')
benchmark(mean, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='groupby statistics')
benchmark(join_count, dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='join count', other=other)
benchmark(join_data, dask_rapids_data, benchmarks=dask_rapids_benchmarks, name='join', other=other)

#### Operations with filtering

In [None]:
expr_filter = (dask_rapids_data.tip_amount >= 1) & (dask_rapids_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

dask_rapids_filtered = filter_data(dask_rapids_data)

In [None]:
benchmark(count, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered count')
benchmark(count_index_length, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered count index length')
benchmark(mean, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_rapids_filtered)
other.columns = pd.Index([e[0] +'_'+ e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered join', other=other)

#### Operations with filtering and caching

In [None]:
dask_rapids_filtered = client.persist(dask_rapids_filtered)

from distributed import wait
print('Waiting until all futures are finished')
wait(dask_rapids_filtered)
print('All futures are finished')

In [None]:
benchmark(count, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered count')
benchmark(count_index_length, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered count index length')
benchmark(mean, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_rapids_filtered)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_rapids_filtered, benchmarks=dask_rapids_benchmarks, name='filtered join', other=other)

### Dask + Modin + Rapids

In [None]:
import modin.pandas as mpd
from modin.config import Engine
import dask
from dask.distributed import Client
import dask.dataframe as dd

dask.config.set({"dataframe.backend": "cudf"})
Engine.put("dask")

In [None]:
dask_modin_rapids_data = mpd.read_parquet("data.parquet",paths[0])

In [None]:
dask_modin_rapids_benchmarks = {
    'duration': [],     # in seconds
    'task': []
}

#### Standard Operations

In [None]:
def read_file_parquet(df=None):
    return mpd.read_parquet(large_df_url)

def count(df=None):
    return len(df)

def count_index_length(df=None):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean()

def standard_deviation(df):
    return df.fare_amount.std()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean()

def sum_columns(df):
    x = df.fare_amount + df.tip_amount
    return x

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean()

def product_columns(df):
    x = df.fare_amount * df.tip_amount
    return x

def value_counts(df):
    val_counts = df.fare_amount.value_counts()
    return val_counts


#   In the original experiment, the following two functions used the longitude and latitude values of the pickup and the dropout places.
#   Since the datasets provided by NYC TLC Trip Record Data no longer have longitude and latitude values (only the pickup and dropout places IDs),
# we used arbitrary longitude and latitude values. The goal of this experiment is to compare the computational cost of the calculations, hence
# the values are not relevant.
def complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)

    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    return ret

def mean_of_complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)

    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    return ret.mean()

def groupby_statistics(df):
    gb = df.groupby(by='passenger_count').agg(
      {
        'fare_amount': ['mean', 'std'],
        'tip_amount': ['mean', 'std']
      }
    )
    return gb

other = groupby_statistics(dask_modin_rapids_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))

def join_data(df, other):
    return dd.merge(df, other, left_index=True, right_index=True)

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=dask_modin_rapids_benchmarks, name='read file')
benchmark(count, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='count')
benchmark(count_index_length, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='count index length')
benchmark(mean, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='groupby statistics')
benchmark(join_count, dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='join count', other=other)
benchmark(join_data, dask_modin_rapids_data, benchmarks=dask_modin_rapids_benchmarks, name='join', other=other)

#### Operations with filtering

In [None]:
expr_filter = (dask_modin_rapids_data.tip_amount >= 1) & (dask_modin_rapids_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

dask_modin_rapids_filtered = filter_data(dask_modin_rapids_data)

In [None]:
benchmark(count, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered count')
benchmark(count_index_length, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered count index length')
benchmark(mean, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_modin_rapids_filtered)
other.columns = pd.Index([e[0] +'_'+ e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered join', other=other)

#### Operations with filtering and caching

In [None]:
dask_modin_rapids_filtered = client.persist(dask_modin_rapids_filtered)

from distributed import wait
print('Waiting until all futures are finished')
wait(dask_modin_rapids_filtered)
print('All futures are finished')

In [None]:
benchmark(count, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered count')
benchmark(count_index_length, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered count index length')
benchmark(mean, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_modin_rapids_filtered)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_modin_rapids_filtered, benchmarks=dask_modin_rapids_benchmarks, name='filtered join', other=other)