# Benchmark: Koalas (PySpark) and Dask - Local execution
The benchmark was performed against the 2009 - 2013 Yellow Taxi Trip Records (157 GB) from NYC Taxi and Limousine Commission (TLC) Trip Record Data. We identified common operations from our pandas workloads such as basic calculations of statistics, join, filtering and grouping on this dataset.

The operations were measured with/without filter operations to consider real world workloads.

## Set-up


In [1]:
from pyspark.sql import SparkSession

# Initialize the Spark session (if not already initialized)
spark = SparkSession.builder.appName("bdcc").getOrCreate()

spark.conf.set("spark.databricks.io.cache.enabled", "false")
print("spark.databricks.io.cache.enabled is %s" % spark.conf.get("spark.databricks.io.cache.enabled"))



spark.databricks.io.cache.enabled is false


In [2]:
# %pip install -U koalas dask[complete] numpy pandas pyarrow

In [1]:
import pandas as pd
import numpy as np
import databricks.koalas as ks
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
 
print('pandas version: %s' % pd.__version__)
print('numpy version: %s' % np.__version__)
print('koalas version: %s' % ks.__version__)
import dask
print('dask version: %s' % dask.__version__)
 
import time
 
def benchmark(f, df, benchmarks, name, **kwargs):
    """Benchmark the given function against the given DataFrame.
    
    Parameters
    ----------
    f: function to benchmark
    df: data frame
    benchmarks: container for benchmark results
    name: task name
    
    Returns
    -------
    Duration (in seconds) of the given operation
    """
    start_time = time.time()
    ret = f(df, **kwargs)
    benchmarks['duration'].append(time.time() - start_time)
    benchmarks['task'].append(name)
    print(f"{name} took: {benchmarks['duration'][-1]} seconds")
    return benchmarks['duration'][-1]
 
def get_results(benchmarks):
    """Return a pandas DataFrame containing benchmark results."""
    return pd.DataFrame.from_dict(benchmarks)



pandas version: 1.3.5
numpy version: 1.21.6
koalas version: 1.8.2
dask version: 2022.02.0


## Dask

### Preparation

In [2]:
cluster = LocalCluster(n_workers=1, threads_per_worker=3, memory_limit='15GiB')
client = Client(cluster)

dask_data = dd.read_parquet('../../datasets/ks_taxi_parquet')

dask_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 1
Total threads: 3,Total memory: 15.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:50245,Workers: 1
Dashboard: http://127.0.0.1:8787/status,Total threads: 3
Started: Just now,Total memory: 15.00 GiB

0,1
Comm: tcp://127.0.0.1:50254,Total threads: 3
Dashboard: http://127.0.0.1:50255/status,Memory: 15.00 GiB
Nanny: tcp://127.0.0.1:50248,
Local directory: c:\Users\yakim\Documents\MEGA\03. Vida Académica\03. Mestrado Ciencias Computadores\2 Ano\Semestre 2\Big Data & Cloud Computing\Projetos\Project 02\00_git_code\code\nyc_taxi_dataset_study\dask-worker-space\worker-qh7x0eh8,Local directory: c:\Users\yakim\Documents\MEGA\03. Vida Académica\03. Mestrado Ciencias Computadores\2 Ano\Semestre 2\Big Data & Cloud Computing\Projetos\Project 02\00_git_code\code\nyc_taxi_dataset_study\dask-worker-space\worker-qh7x0eh8


### Standard operations

In [3]:
def read_file_parquet(df=None):
    return dd.read_parquet('../../datasets/ks_taxi_parquet')
  
def count(df=None):
    return len(df)
 
def count_index_length(df=None):
    return len(df.index)
 
def mean(df):
    return df.fare_amt.mean().compute()
 
def standard_deviation(df):
    return df.fare_amt.std().compute()
 
def mean_of_sum(df):
    return (df.fare_amt + df.tip_amt).mean().compute()
 
def sum_columns(df):
    return (df.fare_amt + df.tip_amt).compute()
 
def mean_of_product(df):
    return (df.fare_amt * df.tip_amt).mean().compute()
 
def product_columns(df):
    return (df.fare_amt * df.tip_amt).compute()
  
def value_counts(df):
    return df.fare_amt.value_counts().compute()
  
def mean_of_complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean().compute()
  
def complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.compute()
  
def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg(
      {
        'fare_amt': ['mean', 'std'], 
        'tip_amt': ['mean', 'std']
      }
    ).compute()
  
other = groupby_statistics(dask_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
 
def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))
 
def join_data(df, other):
    return dd.merge(df, other, left_index=True, right_index=True).compute()

In [4]:
benchmark(read_file_parquet, df=None, benchmarks=dask_benchmarks, name='read file')
benchmark(count, df=dask_data, benchmarks=dask_benchmarks, name='count')
benchmark(count_index_length, df=dask_data, benchmarks=dask_benchmarks, name='count index length')
benchmark(mean, df=dask_data, benchmarks=dask_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_data, benchmarks=dask_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_data, benchmarks=dask_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_data, benchmarks=dask_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_data, benchmarks=dask_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_data, benchmarks=dask_benchmarks, name='groupby statistics')
benchmark(join_count, dask_data, benchmarks=dask_benchmarks, name='join count', other=other)
benchmark(join_data, dask_data, benchmarks=dask_benchmarks, name='join', other=other)

read file took: 0.013374090194702148 seconds
count took: 4.527094125747681 seconds
count index length took: 31.045822620391846 seconds
mean took: 6.072950124740601 seconds
standard deviation took: 7.277131795883179 seconds
mean of columns addition took: 8.592138528823853 seconds
addition of columns took: 58.856536626815796 seconds
mean of columns multiplication took: 9.39152979850769 seconds
multiplication of columns took: 52.26193833351135 seconds
value counts took: 11.063629388809204 seconds
mean of complex arithmetic ops took: 27.78617787361145 seconds
complex arithmetic ops took: 82.74341011047363 seconds
groupby statistics took: 56.64373421669006 seconds
join count took: 53.347198247909546 seconds
join took: 53.63099265098572 seconds


53.63099265098572

### Operations with filtering

In [5]:
expr_filter = (dask_data.tip_amt >= 1) & (dask_data.tip_amt <= 5)
 
def filter_data(df):
    return df[expr_filter]
  
dask_filtered = filter_data(dask_data)

In [6]:
benchmark(count, dask_filtered, benchmarks=dask_benchmarks, name='filtered count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_benchmarks, name='filtered count index length')
benchmark(mean, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_benchmarks, name='filtered groupby statistics')
 
other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
 
benchmark(join_count, dask_filtered, benchmarks=dask_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_filtered, benchmarks=dask_benchmarks, name='filtered join', other=other)

filtered count took: 38.05749750137329 seconds
filtered count index length took: 37.3171284198761 seconds
filtered mean took: 37.608333349227905 seconds
filtered standard deviation took: 37.790027379989624 seconds
filtered mean of columns addition took: 37.60324001312256 seconds
filtered addition of columns took: 48.09047842025757 seconds
filtered mean of columns multiplication took: 37.49120903015137 seconds
filtered multiplication of columns took: 49.33941030502319 seconds
filtered mean of complex arithmetic ops took: 42.8966019153595 seconds
filtered complex arithmetic ops took: 54.844189405441284 seconds
filtered value counts took: 39.08409762382507 seconds
filtered groupby statistics took: 39.94798517227173 seconds
filtered join count took: 38.06303954124451 seconds
filtered join took: 38.20121240615845 seconds


38.20121240615845

In [7]:
client.restart()

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 1
Total threads: 3,Total memory: 15.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:50245,Workers: 1
Dashboard: http://127.0.0.1:8787/status,Total threads: 3
Started: 19 minutes ago,Total memory: 15.00 GiB

0,1
Comm: tcp://127.0.0.1:50606,Total threads: 3
Dashboard: http://127.0.0.1:50607/status,Memory: 15.00 GiB
Nanny: tcp://127.0.0.1:50248,
Local directory: c:\Users\yakim\Documents\MEGA\03. Vida Académica\03. Mestrado Ciencias Computadores\2 Ano\Semestre 2\Big Data & Cloud Computing\Projetos\Project 02\00_git_code\code\nyc_taxi_dataset_study\dask-worker-space\worker-r_u_a2zx,Local directory: c:\Users\yakim\Documents\MEGA\03. Vida Académica\03. Mestrado Ciencias Computadores\2 Ano\Semestre 2\Big Data & Cloud Computing\Projetos\Project 02\00_git_code\code\nyc_taxi_dataset_study\dask-worker-space\worker-r_u_a2zx


## Koalas

### Preparation

In [8]:
koalas_data = ks.read_parquet('../../datasets/ks_taxi_parquet', index_col='index')
 
koalas_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}



Py4JJavaError: An error occurred while calling o29.load.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:180)
	at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:162)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:133)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:96)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:68)
	at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:539)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:405)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


### Standard Operations

In [None]:
def read_file_parquet(df=None):
    return ks.read_parquet('../../datasets/ks_taxi_parquet', index_col='index')
  
def count(df=None):
    return len(df)
 
def count_index_length(df=None):
    return len(df.index)
 
def mean(df):
    return df.fare_amt.mean()
 
def standard_deviation(df):
    return df.fare_amt.std()
 
def mean_of_sum(df):
    return (df.fare_amt + df.tip_amt).mean()
 
def sum_columns(df):
    x = df.fare_amt + df.tip_amt
    x.to_pandas()
    return x
 
def mean_of_product(df):
    return (df.fare_amt * df.tip_amt).mean()
 
def product_columns(df):
    x = df.fare_amt * df.tip_amt
    x.to_pandas()
    return x
 
def value_counts(df):
    val_counts = df.fare_amt.value_counts()
    val_counts.to_pandas()
    return val_counts
  
def complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    ret.to_pandas()
    return ret
  
def mean_of_complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2) 
    return ret.mean()
  
def groupby_statistics(df):
    gb = df.groupby(by='passenger_count').agg(
      {
        'fare_amt': ['mean', 'std'], 
        'tip_amt': ['mean', 'std']
      }
    )
    gb.to_pandas()
    return gb
  
other = ks.DataFrame(groupby_statistics(koalas_data).to_pandas())
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
 
def join_count(df, other):
    return len(df.merge(other.spark.hint("broadcast"), left_index=True, right_index=True))
 
def join_data(df, other):
    ret = df.merge(other.spark.hint("broadcast"), left_index=True, right_index=True)
    ret.to_pandas()
    return ret


In [None]:
benchmark(read_file_parquet, df=None, benchmarks=koalas_benchmarks, name='read file')
benchmark(count, df=koalas_data, benchmarks=koalas_benchmarks, name='count')
benchmark(count_index_length, df=koalas_data, benchmarks=koalas_benchmarks, name='count index length')
benchmark(mean, df=koalas_data, benchmarks=koalas_benchmarks, name='mean')
benchmark(standard_deviation, df=koalas_data, benchmarks=koalas_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=koalas_data, benchmarks=koalas_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=koalas_data, benchmarks=koalas_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=koalas_data, benchmarks=koalas_benchmarks, name='value counts')
benchmark(complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_benchmarks, name='complex arithmetic ops')
benchmark(mean_of_complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of complex arithmetic ops')
benchmark(groupby_statistics, df=koalas_data, benchmarks=koalas_benchmarks, name='groupby statistics')
benchmark(join_count, koalas_data, benchmarks=koalas_benchmarks, name='join count', other=other)
benchmark(join_data, koalas_data, benchmarks=koalas_benchmarks, name='join', other=other)

### Operations with filtering

In [None]:
expr_filter = (koalas_data.tip_amt >= 1) & (koalas_data.tip_amt <= 5)
 
def filter_data(df):
    return df[expr_filter]
 
koalas_filtered = filter_data(koalas_data)

In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count')
benchmark(count_index_length, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count index length')
benchmark(mean, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered groupby statistics')
 
other = ks.DataFrame(groupby_statistics(koalas_filtered).to_pandas())
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
benchmark(join_data, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join count', other=other)

## Result

In [None]:
koalas_res_temp = get_results(koalas_benchmarks).set_index('task')
dask_res_temp = get_results(dask_benchmarks).set_index('task')
df = pd.concat([koalas_res_temp.duration, dask_res_temp.duration], axis=1, keys=['koalas', 'dask'])

In [None]:
from datetime import datetime
 
filename = '../../datasets/koalas-benchmark-no-parquet-cache/single_node_' + datetime.now().strftime("%H%M%S")
print(filename)
 
df.to_parquet(filename)