In [1]:
import pandas as pd
import os


# Start timer

# get current directory
path = os.getcwd()
print("Current Directory", path)
 
# prints parent directory
parent_dir = os.path.abspath(os.path.join(path, os.pardir))
input_dir = f"{parent_dir}/data"

Current Directory /Users/NachoCorcuera/PycharmProjects/Pandas-Polars-PySpark-BenchMark/notebooks


# PySpark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

def join_df(spark):
    # List of regions and their aliases
    regions = ["Europe", "North America", "Asia", "Sub-Saharan Africa",
               "Central America and the Caribbean", "Middle East and North Africa",
               "Australia and Oceania"]
    aliases = ["EU", "NA", "AS", "SSA", "CA", "MENA", "AUS"]

    # Create a DataFrame using Spark
    rows = list(zip(regions, aliases))
    df_regions = spark.createDataFrame(rows, ["Region", "Alias"])

    return df_regions


In [9]:
import time
def processing_df(spark, file):
    times = {}
    start_time_full = time.time()

    # Reading time
    df = spark.read.csv(f"{input_dir}/{file}.csv", header=True, inferSchema=True)
    end_time = time.time()
    times["read_csv"] = end_time - start_time_full

    # Filtering time
    start_time = time.time()
    filtered_pyspark = df.filter(df['Total Profit'] > 2000)
    end_time = time.time()
    times["filter"] = end_time - start_time

    # Aggregation
    start_time = time.time()
    aggregated_pyspark = df.groupBy('Region').agg(
        {"Total Profit": "sum",
         "Total Profit": "avg",
         "Total Profit": "max",
         "Total Profit": "min",
         "Total Profit": "median"}
    ).withColumnRenamed("sum(Total Profit)", "sales") \
      .withColumnRenamed("avg(Total Profit)", "sales_mean") \
      .withColumnRenamed("max(Total Profit)", "sales_max") \
      .withColumnRenamed("min(Total Profit)", "sales_min") \
      .withColumnRenamed("percentile_approx(Total Profit, 0.5)", "sales_median")
    end_time = time.time()
    times["aggregation"] = end_time - start_time

    # Joining time
    start_time = time.time()
    df_regions = join_df(spark)  # Assuming join_df returns a DataFrame to join
    df_joined = df.join(df_regions, on="Region", how="left")
    end_time = time.time()
    times["join"] = end_time - start_time

    # Writing time
    start_time = time.time()
    df_joined.write.csv("testing_write.csv", mode="overwrite", header=True)
    end_time = time.time()
    times["write"] = end_time - start_time

    return times

In [4]:
def init_spark(app_name="My Application", master_config="local[*]"):
    """
    Initialize a Spark session for use in PySpark applications.
    :param app_name: Name of the application.
    :param master_config: Configuration for Spark master. Default is local mode using all available cores.
    :return: A SparkSession object.
    """
    spark = SparkSession.builder \
        .appName(app_name) \
        .master(master_config) \
        .getOrCreate()
    return spark

# Example usage:
spark = init_spark("Enhanced Data Processing")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/01 11:05:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54246)
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.11/3.11.7/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/homebrew/Cellar/python@3.11/3.11.7/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/homebrew/Cellar/python@3.11/3.11.7/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/homebrew/Cellar/python@3.11/3.11.7/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/accumu

In [10]:
file_list = ["sales_50000",
             "sales_250000",
             "sales_1000000",
             "sales_5000000",
             "sales_25000000"]


times = {file: processing_df(spark, file) for file in file_list}



CodeCache: size=131072Kb used=19606Kb max_used=19615Kb free=111465Kb
 bounds [0x00000001099e0000, 0x000000010ad30000, 0x00000001119e0000]
 total_blobs=8014 nmethods=7086 adapters=838
 compilation: disabled (not enough contiguous free space left)


                                                                                

In [11]:
times

{'sales_50000': {'read_csv': 2.824773073196411,
  'filter': 0.0072171688079833984,
  'aggregation': 0.0379641056060791,
  'join': 0.06410336494445801,
  'write': 1.2163400650024414},
 'sales_250000': {'read_csv': 0.5960338115692139,
  'filter': 0.003153085708618164,
  'aggregation': 0.01168203353881836,
  'join': 0.0364987850189209,
  'write': 0.9168949127197266},
 'sales_1000000': {'read_csv': 0.4560251235961914,
  'filter': 0.0028700828552246094,
  'aggregation': 0.012090206146240234,
  'join': 0.02053999900817871,
  'write': 1.8498010635375977},
 'sales_5000000': {'read_csv': 1.9299159049987793,
  'filter': 0.002227783203125,
  'aggregation': 0.010584115982055664,
  'join': 0.02206707000732422,
  'write': 7.372208833694458},
 'sales_25000000': {'read_csv': 11.013280153274536,
  'filter': 0.0033180713653564453,
  'aggregation': 0.009418010711669922,
  'join': 0.019010066986083984,
  'write': 44.7241747379303}}

In [None]:
times

In [12]:
df = spark.read.csv(f"{input_dir}/sales_25000000.csv", header=True, inferSchema=True)

                                                                                

# Parquet test

In [7]:
import time
def processing_df_parquet(spark, file):
    times = {}
    start_time_full = time.time()

    # Reading time
    df = spark.read.parquet(f"{input_dir}/{file}", header=True, inferSchema=True)
    end_time = time.time()
    times["read_csv"] = end_time - start_time_full

    # Filtering time
    start_time = time.time()
    filtered_pyspark = df.filter(df['Total Profit'] > 2000)
    end_time = time.time()
    times["filter"] = end_time - start_time

    # Aggregation
    start_time = time.time()
    aggregated_pyspark = df.groupBy('Region').agg(
        {"Total Profit": "sum",
         "Total Profit": "avg",
         "Total Profit": "max",
         "Total Profit": "min",
         "Total Profit": "median"}
    ).withColumnRenamed("sum(Total Profit)", "sales") \
      .withColumnRenamed("avg(Total Profit)", "sales_mean") \
      .withColumnRenamed("max(Total Profit)", "sales_max") \
      .withColumnRenamed("min(Total Profit)", "sales_min") \
      .withColumnRenamed("percentile_approx(Total Profit, 0.5)", "sales_median")
    end_time = time.time()
    times["aggregation"] = end_time - start_time

    # Joining time
    start_time = time.time()
    df_regions = join_df(spark)  # Assuming join_df returns a DataFrame to join
    df_joined = df.join(df_regions, on="Region", how="left")
    end_time = time.time()
    times["join"] = end_time - start_time
    
    ## Writting time
    start_time = time.time()
    df_regions = join_df(spark)
    df_joined.write.parquet("testing_write_spark_2")
    end_time = time.time()
    times["write"] = end_time - start_time
    return times


In [8]:
file = "parquet/sales_25000000"
processing_df_parquet(spark, file)



CodeCache: size=131072Kb used=18097Kb max_used=18097Kb free=112974Kb
 bounds [0x00000001071e0000, 0x00000001083b0000, 0x000000010f1e0000]
 total_blobs=7472 nmethods=6511 adapters=874
 compilation: disabled (not enough contiguous free space left)


24/05/01 11:06:10 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

{'read_csv': 0.08439183235168457,
 'filter': 0.0056247711181640625,
 'aggregation': 0.014968156814575195,
 'join': 0.02739691734313965,
 'write': 65.57280802726746}

24/05/01 11:51:18 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 926013 ms exceeds timeout 120000 ms
24/05/01 11:51:18 WARN SparkContext: Killing executors is not supported by current scheduler.
24/05/01 11:51:19 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [19]:
times

{'sales_50000': {'read_csv': 2.824773073196411,
  'filter': 0.0072171688079833984,
  'aggregation': 0.0379641056060791,
  'join': 0.06410336494445801,
  'write': 1.2163400650024414},
 'sales_250000': {'read_csv': 0.5960338115692139,
  'filter': 0.003153085708618164,
  'aggregation': 0.01168203353881836,
  'join': 0.0364987850189209,
  'write': 0.9168949127197266},
 'sales_1000000': {'read_csv': 0.4560251235961914,
  'filter': 0.0028700828552246094,
  'aggregation': 0.012090206146240234,
  'join': 0.02053999900817871,
  'write': 1.8498010635375977},
 'sales_5000000': {'read_csv': 1.9299159049987793,
  'filter': 0.002227783203125,
  'aggregation': 0.010584115982055664,
  'join': 0.02206707000732422,
  'write': 7.372208833694458},
 'sales_25000000': {'read_csv': 11.013280153274536,
  'filter': 0.0033180713653564453,
  'aggregation': 0.009418010711669922,
  'join': 0.019010066986083984,
  'write': 44.7241747379303},
 'parquet/sales_25000000': {'read_csv': 0.11203718185424805,
  'filter': 0

24/04/30 19:06:00 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 912962 ms exceeds timeout 120000 ms
24/04/30 19:06:00 WARN SparkContext: Killing executors is not supported by current scheduler.
24/04/30 19:22:39 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o