# Performance Analysis Project

Work by: Gonçalo Dias and Vicente Bandeira

## Introduction

This notebook serves as both an implementation and a report for the work proposed to us in the CDLE cadeira in IACD. In this report, we present the findings of our comprehensive analysis of machine learning performance using various Python libraries for big data processing. As the scale and complexity of data continue to grow, the choice of tools and libraries becomes critical in ensuring efficient and effective machine learning workflows. To address this, we executed a full Machine Learning Pipeline using several prominent big data libraries: PySpark, Dask, Modin, JobLib, Rapids and Koalas

## Repeating the NYC taxi driver dataset study

Source: https://www.databricks.com/blog/2021/04/07/benchmark-koalas-pyspark-and-dask.html

In [20]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [21]:
import databricks.koalas as ks
import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import pandas as pd
import numpy as np
import time

print('pandas version: %s' % pd.__version__)
print('numpy version: %s' % np.__version__)
print('koalas version: %s' % ks.__version__)
print('dask version: %s' % dask.__version__)

ImportError: cannot import name 'Iterable' from 'collections' (C:\Users\vicen\AppData\Local\Programs\Python\Python311\Lib\collections\__init__.py)

### Dataset information

In [None]:
url = "../yellow_tripdata_2011-01.parquet"
koalas_data = ks.read_parquet(url)

In [None]:
print("Number of rows:", len(koalas_data))
print()
print(koalas_data.head(20))

In [None]:
koalas_data.dtypes

In [None]:
expr_filter = (koalas_data['tip_amount'] >= 1) & (koalas_data['tip_amount'] <= 5)
 
print(f'Filtered data is {len(koalas_data[expr_filter]) / len(koalas_data) * 100}% of total data')

### Experiment preparation

In [None]:
def benchmark(f, df, benchmarks, name, **kwargs):
    """Benchmark the given function against the given DataFrame.

    Parameters
    ----------
    f: function to benchmark
    df: data frame
    benchmarks: container for benchmark results
    name: task name


    Returns
    -------
    Duration (in seconds) of the given operation
    """

    start_time = time.time()
    ret = f(df, **kwargs)
    benchmarks['duration'].append(time.time() - start_time)
    benchmarks['task'].append(name)
    print(f"{name} took: {benchmarks['duration'][-1]} seconds")

def get_results(benchmarks):
    """Return a pandas DataFrame containing benchmark results."""
    return pd.DataFrame.from_dict(benchmarks)

In [None]:
paths = ["../yellow_tripdata_201"+str(i)+"-01.parquet" for i in range(1,4)]

## Koalas

In this section we utilized the Koalas library for our experiments, trying it on Standard operations, operations with filtering and operations with filtering and caching.

In [None]:
koalas_data = ks.read_parquet(paths[0])

In [None]:
koalas_benchmarks = {
    'duration': [],     # in seconds
    'task': []
}

#### Standard operations

In [None]:
def read_file_parquet(df=None):
    return ks.read_parquet("../yellow_tripdata_2011-01.parquet")

def count(df=None):
    return len(df)

def count_index_length(df=None):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean()

def standard_deviation(df):
    return df.fare_amount.std()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean()

def sum_columns(df):
    x = df.fare_amount + df.tip_amount
    return x

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean()

def product_columns(df):
    x = df.fare_amount * df.tip_amount
    return x

def value_counts(df):
    val_counts = df.fare_amount.value_counts()
    return val_counts


#   In the original experiment, the following two functions used the longitude and latitude values of the pickup and the dropout places.
#   Since the datasets provided by NYC TLC Trip Record Data no longer have longitude and latitude values (only the pickup and dropout places IDs),
# we used arbitrary longitude and latitude values. The goal of this experiment is to compare the computational cost of the calculations, hence
# the values are not relevant.
def complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)
    
    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    return ret

def mean_of_complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)
    
    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2) 
    return ret.mean()

def groupby_statistics(df):
    gb = df.groupby(by='passenger_count').agg(
      {
        'fare_amount': ['mean', 'std'], 
        'tip_amount': ['mean', 'std']
      }
    )
    return gb

other = ks.DataFrame(groupby_statistics(koalas_data).to_pandas())
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(df.merge(other.spark.hint("broadcast"), left_index=True, right_index=True))

def join_data(df, other):
    ret = df.merge(other.spark.hint("broadcast"), left_index=True, right_index=True)
    return ret

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=koalas_benchmarks, name='read file')
benchmark(count, df=koalas_data, benchmarks=koalas_benchmarks, name='count')
benchmark(count_index_length, df=koalas_data, benchmarks=koalas_benchmarks, name='count index length')
benchmark(mean, df=koalas_data, benchmarks=koalas_benchmarks, name='mean')
benchmark(standard_deviation, df=koalas_data, benchmarks=koalas_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=koalas_data, benchmarks=koalas_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=koalas_data, benchmarks=koalas_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=koalas_data, benchmarks=koalas_benchmarks, name='value counts')
benchmark(complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_benchmarks, name='complex arithmetic ops')
benchmark(mean_of_complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of complex arithmetic ops')
benchmark(groupby_statistics, df=koalas_data, benchmarks=koalas_benchmarks, name='groupby statistics')
benchmark(join_count, koalas_data, benchmarks=koalas_benchmarks, name='join count', other=other)
benchmark(join_data, koalas_data, benchmarks=koalas_benchmarks, name='join', other=other)

#### Operations with filtering

In [None]:
expr_filter = (koalas_data.tip_amount >= 1) & (koalas_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

koalas_filtered = filter_data(koalas_data)

In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count')
benchmark(count_index_length, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count index length')
benchmark(mean, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered groupby statistics')

other = ks.DataFrame(groupby_statistics(koalas_filtered).to_pandas())
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

benchmark(join_data, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join count', other=other)

#### Operations with filtering and caching

In [None]:
koalas_filtered_cached = koalas_filtered.spark.cache()
print(f'Enforce caching: {len(koalas_filtered_cached)} rows of filtered data')

In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count')
benchmark(count_index_length, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count index length')
benchmark(mean, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered groupby statistics')

other = ks.DataFrame(groupby_statistics(koalas_filtered).to_pandas())
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

benchmark(join_data, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join count', other=other)

# Dask

We repeated this experiment with Dask, on the same 3 sets of operations: standard, with filtering and with filtering and caching.

In [None]:
cluster = LocalCluster(memory_limit='8GB')
client = Client(cluster)

dask_data = dd.read_parquet(paths[0])

In [None]:
dask_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}

#### Standard operations

In [None]:
def read_file_parquet(df=None):
    return dd.read_parquet("../yellow_tripdata_2011-01.parquet")

def count(df=None):
    return len(df)

def count_index_length(df=None):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean().compute()

def standard_deviation(df):
    return df.fare_amount.std().compute()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean().compute()

def sum_columns(df):
    return (df.fare_amount + df.tip_amount).compute()

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean().compute()

def product_columns(df):
    return (df.fare_amount * df.tip_amount).compute()

def value_counts(df):
    return df.fare_amount.value_counts().compute()

#   In the original experiment, the following two functions used the longitude and latitude values of the pickup and the dropout places.
#   Since the datasets provided by NYC TLC Trip Record Data no longer have longitude and latitude values (only the pickup and dropout places IDs),
# we used arbitrary longitude and latitude values. The goal of this experiment is to compare the computational cost of the calculations, hence
# the values are not relevant.
def mean_of_complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)
    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean()

def complicated_arithmetic_operation(df):
    start_lon,end_lon = np.random.randint(-180,180),np.random.randint(-180,180)
    start_lat,end_lat = np.random.randint(-90,90),np.random.randint(-90,90)
    theta_1 = start_lon
    phi_1 = start_lat
    theta_2 = end_lon
    phi_2 = end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret

def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg({
        'fare_amount': ['mean', 'std'],
        'tip_amount': ['mean', 'std'] 
    })

other = groupby_statistics(dask_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))

def join_data(df, other):
    return dd.merge(df, other, left_index=True, right_index=True).compute()

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=dask_benchmarks, name='read file')
benchmark(count, df=dask_data, benchmarks=dask_benchmarks, name='count')
benchmark(count_index_length, df=dask_data, benchmarks=dask_benchmarks, name='count index length')
benchmark(mean, df=dask_data, benchmarks=dask_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_data, benchmarks=dask_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_data, benchmarks=dask_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_data, benchmarks=dask_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_data, benchmarks=dask_benchmarks, name='value counts')
benchmark(mean_of_complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_data, benchmarks=dask_benchmarks, name='groupby statistics')
benchmark(join_count, dask_data, benchmarks=dask_benchmarks, name='join count', other=other)
benchmark(join_data, dask_data, benchmarks=dask_benchmarks, name='join', other=other)

#### Operations with filtering

In [None]:
expr_filter = (dask_data.tip_amount >= 1) & (dask_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]

dask_filtered = filter_data(dask_data)

In [None]:
benchmark(count, dask_filtered, benchmarks=dask_benchmarks, name='filtered count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_benchmarks, name='filtered count index length')
benchmark(mean, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0] +'_'+ e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_filtered, benchmarks=dask_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_filtered, benchmarks=dask_benchmarks, name='filtered join', other=other)

#### Operations with filtering and caching

In [None]:
dask_filtered = client.persist(dask_filtered)

from distributed import wait
print('Waiting until all futures are finished')
wait(dask_filtered)
print('All futures are finished')

In [None]:
benchmark(count, dask_filtered, benchmarks=dask_benchmarks, name='filtered count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_benchmarks, name='filtered count index length')
benchmark(mean, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_filtered, benchmarks=dask_benchmarks, name='filtered join count', other=other)
benchmark(join_data, dask_filtered, benchmarks=dask_benchmarks, name='filtered join', other=other)

In [None]:
client.restart()

## Result Analysis

In [None]:
print(koalas_benchmarks)
print(dask_benchmarks)

In [None]:
dask_koalas_duration_ratio = []
i=0
for task in dask_benchmarks['task']:
    ratio = dask_benchmarks['duration'][i]/koalas_benchmarks['duration'][i]
    dask_koalas_duration_ratio.append(ratio)
    if ratio >= 1:
        print(f"Task {task}: Koalas performs {ratio}x better than Dask")
    else:
        print(f"Task {task}: Dask performs {1/ratio}x better than Koalas")
    i+=1

## Conclusions