In [2]:
from pyspark.sql.functions import col, avg, stddev, sum, row_number, lit
from pyspark.sql.functions import radians, sin, cos, sqrt, atan2
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
from pyspark.sql.window import Window

In [3]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from pyspark.sql import SparkSession
import pyspark
 
print('pandas version: %s' % pd.__version__)
print('numpy version: %s' % np.__version__)
print('pyspark version: %s' % pyspark.__version__)
import dask
print('dask version: %s' % dask.__version__)
 
import time
 
def benchmark(f, df, benchmarks, name, **kwargs):
    """Benchmark the given function against the given DataFrame.
    
    Parameters
    ----------
    f: function to benchmark
    df: data frame
    benchmarks: container for benchmark results
    name: task name
    
    Returns
    -------
    Duration (in seconds) of the given operation
    """
    start_time = time.time()
    ret = f(df, **kwargs)
    benchmarks['duration'].append(time.time() - start_time)
    benchmarks['task'].append(name)
    print(f"{name} took: {benchmarks['duration'][-1]} seconds")
    return benchmarks['duration'][-1]
 
def get_results(benchmarks):
    """Return a pandas DataFrame containing benchmark results."""
    return pd.DataFrame.from_dict(benchmarks)

pandas version: 1.4.4
numpy version: 1.22.4
pyspark version: 3.3.2
dask version: 2022.01.1


In [10]:
dask_data = dd.read_parquet("gs://bucket-for-cluster-dataproc/data/*.parquet")
 
dask_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}
dask_data

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
,Int32,datetime64[ns],datetime64[ns],Int64,float64,Int64,object,Int32,Int32,Int64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [11]:
def read_file_parquet(df=None):
    return dd.read_parquet("gs://bucket-for-cluster-dataproc/data/*.parquet")
  
def count(df=None):
    return len(df)
 
def mean(df):
    return df.fare_amount.mean().compute()
 
def standard_deviation(df):
    return df.fare_amount.std().compute()
 
def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean().compute()
 
def sum_columns(df):
    return (df.fare_amount + df.tip_amount).compute()
 
def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean().compute()
 
def product_columns(df):
    return (df.fare_amount * df.tip_amount).compute()
  
def value_counts(df):
    return df.fare_amount.value_counts().compute()
  
def mean_of_complicated_arithmetic_operation(df):
    theta_1 = df.PULocationID
    phi_1 = df.PULocationID
    theta_2 = df.DOLocationID
    phi_2 = df.DOLocationID
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean().compute()
  
def complicated_arithmetic_operation(df):
    theta_1 = df.PULocationID
    phi_1 = df.PULocationID
    theta_2 = df.DOLocationID
    phi_2 = df.DOLocationID
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.compute()
  
def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg(
      {
        'fare_amount': ['mean', 'std'], 
        'tip_amount': ['mean', 'std']
      }
    ).compute()
  
other = groupby_statistics(dask_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
 
def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))
 
def join_data(df, other):
    return dd.merge(df, other, left_index=True, right_index=True).compute()

In [12]:
benchmark(read_file_parquet, df=None, benchmarks=dask_benchmarks, name='read file')
benchmark(count, df=dask_data, benchmarks=dask_benchmarks, name='count')
benchmark(mean, df=dask_data, benchmarks=dask_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_data, benchmarks=dask_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_data, benchmarks=dask_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_data, benchmarks=dask_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_data, benchmarks=dask_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_data, benchmarks=dask_benchmarks, name='groupby statistics')
benchmark(join_count, dask_data, benchmarks=dask_benchmarks, name='join count', other=other)
benchmark(join_data, dask_data, benchmarks=dask_benchmarks, name='join', other=other)

read file took: 0.1979684829711914 seconds
count took: 1.1088001728057861 seconds
mean took: 0.4638330936431885 seconds
standard deviation took: 0.5320038795471191 seconds
mean of columns addition took: 0.5899946689605713 seconds
addition of columns took: 0.6646292209625244 seconds
mean of columns multiplication took: 0.5967772006988525 seconds
multiplication of columns took: 0.6533539295196533 seconds
value counts took: 0.646568775177002 seconds
mean of complex arithmetic ops took: 2.3988208770751953 seconds
complex arithmetic ops took: 2.4099225997924805 seconds
groupby statistics took: 8.0478835105896 seconds
join count took: 9.842153549194336 seconds
join took: 9.674828290939331 seconds


9.674828290939331

In [13]:
expr_filter = (dask_data.tip_amount >= 1) & (dask_data.tip_amount <= 5)
 
def filter_data(df):
    return df[expr_filter]
  
dask_filtered = filter_data(dask_data)

removed the count_index_lenght operation since the spark does not make use of these indexes in parquet files. So I have decided to remove it both from Dask and Pyspark. 

In [14]:
benchmark(count, dask_filtered, benchmarks=dask_benchmarks, name='filtered count')
benchmark(mean, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_benchmarks, name='filtered groupby statistics')
 
other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
 
benchmark(join_count, dask_filtered, benchmarks=dask_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_filtered, benchmarks=dask_benchmarks, name='filtered join', other=other)

filtered count took: 8.17177677154541 seconds
filtered mean took: 8.295343399047852 seconds
filtered standard deviation took: 8.105826139450073 seconds
filtered mean of columns addition took: 8.274187326431274 seconds
filtered addition of columns took: 8.354636430740356 seconds
filtered mean of columns multiplication took: 8.236481666564941 seconds
filtered multiplication of columns took: 8.065267562866211 seconds
filtered mean of complex arithmetic ops took: 8.513192653656006 seconds
filtered complex arithmetic ops took: 8.402990579605103 seconds
filtered value counts took: 8.176492691040039 seconds
filtered groupby statistics took: 8.32035231590271 seconds
filtered join count took: 9.742776870727539 seconds
filtered join took: 9.847952127456665 seconds


9.847952127456665

In [4]:
pyspark_data = spark.read.parquet("gs://bucket-for-cluster-dataproc/data/*.parquet")


pyspark_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}
pyspark_data.count()

                                                                                

12931345

In [5]:
def read_file_parquet(df=None):
    return spark.read.parquet("gs://bucket-for-cluster-dataproc/data/*.parquet")
  
def count(df=None):
    return df.count()
 

def mean(df):
    return df.agg(avg(df.fare_amount)).collect()
 
def standard_deviation(df):
    return df.agg(stddev(df.fare_amount)).collect()
 
def mean_of_sum(df):
    return df.select((col("fare_amount") + col("tip_amount")).alias("total_amount")).agg(avg("total_amount")).collect()
 
def sum_columns(df):
    result = df.select((col("fare_amount") + col("tip_amount")).alias("total_amount")).collect()
    return result
 
def mean_of_product(df):
    return df.select((col("fare_amount") * col("tip_amount")).alias("total_amount")).agg(avg("total_amount")).collect()
 
def product_columns(df):
    result = df.select((col("fare_amount") * col("tip_amount")).alias("total_prod")).agg(avg("total_prod")).collect()
    return result
 
def value_counts(df):
    val_counts = df.groupBy("fare_amount").count()
    return val_counts.collect()

def complicated_arithmetic_operation(df):
    temp = (
        (sin(radians(df['DOLocationID'] - df['PULocationID']) / 2) ** 2) +
        (cos(radians(df['PULocationID'])) * cos(radians(df['DOLocationID'])) * (sin(radians(df['DOLocationID'] - df['PULocationID']) / 2) ** 2))
    )
    ret = 2 * atan2(sqrt(temp), sqrt(1 - temp))
    return df.withColumn('result', ret).select('result').collect()

def mean_of_complicated_arithmetic_operation(df):
    temp = (
        (sin(radians(df['DOLocationID'] - df['PULocationID']) / 2) ** 2) +
        (cos(radians(df['PULocationID'])) * cos(radians(df['DOLocationID'])) * (sin(radians(df['DOLocationID'] - df['PULocationID']) / 2) ** 2))
    )
    ret = 2 * atan2(sqrt(temp), sqrt(1 - temp))
    return df.withColumn('result', ret).agg(F.mean('result')).collect()[0][0]


def groupby_statistics(df):
    gb = df.groupBy('passenger_count').agg(
        avg("fare_amount"), stddev("fare_amount"),
        avg("tip_amount"), stddev("tip_amount")
    )
    return gb.toPandas()


windowSpec = Window.orderBy(lit(1))
other_spark = spark.createDataFrame(groupby_statistics(pyspark_data))
other_spark = other_spark.withColumn("index", row_number().over(windowSpec))
#other_spark.columns = pd.Index([e[0]+'_' + e[1] for e in other_spark.columns.tolist()])
pyspark_data_with_index = pyspark_data.withColumn("index", row_number().over(windowSpec))

def join_count(df, other):
    joined_df = df.join(broadcast(other), on="index")
    # Count the number of rows in the joined DataFrame
    count = joined_df.count()
    return count

def join_data(df, other):
    # Use broadcast hint to optimize the join
    ret = df.join(broadcast(other), on="index")
    return ret

                                                                                

In [6]:
benchmark(read_file_parquet, df=None, benchmarks=pyspark_benchmarks, name='read file')
benchmark(count, df=pyspark_data, benchmarks=pyspark_benchmarks, name='count')
benchmark(mean, df=pyspark_data, benchmarks=pyspark_benchmarks, name='mean')
benchmark(standard_deviation, df=pyspark_data, benchmarks=pyspark_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=pyspark_data, benchmarks=pyspark_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=pyspark_data, benchmarks=pyspark_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=pyspark_data, benchmarks=pyspark_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=pyspark_data, benchmarks=pyspark_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=pyspark_data, benchmarks=pyspark_benchmarks, name='value counts')
benchmark(complicated_arithmetic_operation, df=pyspark_data, benchmarks=pyspark_benchmarks, name='complex arithmetic ops')
benchmark(mean_of_complicated_arithmetic_operation, df=pyspark_data, benchmarks=pyspark_benchmarks, name='mean of complex arithmetic ops')
benchmark(groupby_statistics, df=pyspark_data, benchmarks=pyspark_benchmarks, name='groupby statistics')
benchmark(join_data, pyspark_data_with_index, benchmarks=pyspark_benchmarks, name='join', other=other_spark)
benchmark(join_count, pyspark_data_with_index, benchmarks=pyspark_benchmarks, name='join count', other=other_spark)

                                                                                

read file took: 2.2425272464752197 seconds


                                                                                

count took: 1.7583775520324707 seconds


                                                                                

mean took: 1.246199131011963 seconds


                                                                                

standard deviation took: 1.4145667552947998 seconds


                                                                                

mean of columns addition took: 1.541130542755127 seconds


                                                                                

addition of columns took: 33.94906544685364 seconds




mean of columns multiplication took: 0.9470608234405518 seconds


                                                                                

multiplication of columns took: 0.9508183002471924 seconds


                                                                                

value counts took: 2.2307310104370117 seconds


                                                                                

complex arithmetic ops took: 30.635658025741577 seconds


                                                                                

mean of complex arithmetic ops took: 1.8361237049102783 seconds


                                                                                

groupby statistics took: 1.983147144317627 seconds
join took: 0.047533273696899414 seconds


24/05/30 15:00:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:00:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:00:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:00:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:00:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:00:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 1

join count took: 7.361225366592407 seconds


                                                                                

7.361225366592407

In [7]:
expr_filter = (pyspark_data.tip_amount >= 1) & (pyspark_data.tip_amount <= 5)
 
def filter_data(df):
    return df[expr_filter]
 
pyspark_filtered = filter_data(pyspark_data)

In [8]:
pyspark_data.unpersist()

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: bigint, trip_distance: double, RatecodeID: bigint, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double]

In [9]:
benchmark(count, pyspark_filtered, benchmarks=pyspark_benchmarks, name='filtered count')
benchmark(mean, pyspark_filtered, benchmarks=pyspark_benchmarks, name='filtered mean')
benchmark(standard_deviation, pyspark_filtered, benchmarks=pyspark_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, pyspark_filtered, benchmarks=pyspark_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=pyspark_filtered, benchmarks=pyspark_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, pyspark_filtered, benchmarks=pyspark_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=pyspark_filtered, benchmarks=pyspark_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, pyspark_filtered, benchmarks=pyspark_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, pyspark_filtered, benchmarks=pyspark_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, pyspark_filtered, benchmarks=pyspark_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, pyspark_filtered, benchmarks=pyspark_benchmarks, name='filtered groupby statistics')
 
other_spark = spark.createDataFrame(groupby_statistics(pyspark_filtered))
other_spark = other_spark.withColumn("index", row_number().over(windowSpec))
#other_spark.columns = pd.Index([e[0]+'_' + e[1] for e in other_spark.columns.tolist()])
pyspark_data_with_index_filtered = pyspark_filtered.withColumn("index", row_number().over(windowSpec))
    
benchmark(join_data, pyspark_data_with_index_filtered, benchmarks=pyspark_benchmarks, name='filtered join', other=other_spark)
benchmark(join_count, pyspark_data_with_index_filtered, benchmarks=pyspark_benchmarks, name='filtered join count', other=other_spark)

                                                                                

filtered count took: 1.5542008876800537 seconds


                                                                                

filtered mean took: 1.7325758934020996 seconds


                                                                                

filtered standard deviation took: 1.5913488864898682 seconds


                                                                                

filtered mean of columns addition took: 1.6500835418701172 seconds


                                                                                

filtered addition of columns took: 15.418529272079468 seconds


                                                                                

filtered mean of columns multiplication took: 1.5014209747314453 seconds


                                                                                

filtered multiplication of columns took: 1.4281108379364014 seconds


                                                                                

filtered mean of complex arithmetic ops took: 2.0476768016815186 seconds


                                                                                

filtered complex arithmetic ops took: 18.683204174041748 seconds


                                                                                

filtered value counts took: 2.228614568710327 seconds


                                                                                

filtered groupby statistics took: 2.413958787918091 seconds


24/05/30 15:02:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:02:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:02:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:02:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


filtered join took: 0.015772581100463867 seconds


24/05/30 15:02:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:02:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:02:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:02:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:02:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 15:02:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/30 1

filtered join count took: 3.854327440261841 seconds


                                                                                

3.854327440261841

In [23]:
pyspark_res_temp = get_results(pyspark_benchmarks).set_index('task')
dask_res_temp = get_results(dask_benchmarks).set_index('task')
pyspark_res_temp

Unnamed: 0_level_0,duration
task,Unnamed: 1_level_1
read file,0.187303
count,0.19618
mean,0.271599
standard deviation,0.353475
mean of columns addition,0.410502
addition of columns,7.157562
mean of columns multiplication,0.384705
multiplication of columns,0.401257
value counts,0.554913
complex arithmetic ops,7.668461


In [24]:
df = pd.concat([pyspark_res_temp.duration, dask_res_temp.duration],axis=1,keys=['pyspark', 'dask'])
df

Unnamed: 0_level_0,pyspark,dask
task,Unnamed: 1_level_1,Unnamed: 2_level_1
read file,0.187303,0.136477
count,0.19618,0.442867
mean,0.271599,0.312409
standard deviation,0.353475,0.330494
mean of columns addition,0.410502,0.371634
addition of columns,7.157562,0.352466
mean of columns multiplication,0.384705,0.354498
multiplication of columns,0.401257,0.353019
value counts,0.554913,0.369631
complex arithmetic ops,7.668461,0.881192


In [30]:
from datetime import datetime
from os import getcwd
 
filename = "gs://bucket-for-cluster-dataproc/single_node_results_" + datetime.now().strftime("%H%M%S") *"_4files"
print(filename)
 
df.to_parquet(path=filename)

gs://bucket-for-cluster-dataproc/single_node_results_120930
