# Step 1: Import necessary libraries and create SparkSession


In [1]:
from tqdm.auto import tqdm
import time
import pandas as pd
from IPython.display import display
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, round as spark_round

# Create a SparkSession
spark = SparkSession.builder.appName("DataAnalysis").getOrCreate()

24/11/15 17:34:10 WARN Utils: Your hostname, nord-laptop resolves to a loopback address: 127.0.1.1; using 192.168.50.104 instead (on interface wlp0s20f3)
24/11/15 17:34:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/15 17:34:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Step 2: Define the DataProcessor class with timing measurements


In [2]:
class DataProcessor:
    def __init__(self):
        # Initialize DataFrames
        self.users_df = None
        self.purchases_df = None
        self.products_df = None
        # Initialize dictionary for timing measurements
        self.timings = {}

    def load_data(self, users_path, purchases_path, products_path):
        start_time = time.time()
        # Load CSV files into DataFrames
        self.users_df = spark.read.csv(users_path, header=True, inferSchema=True)
        self.purchases_df = spark.read.csv(
            purchases_path, header=True, inferSchema=True
        )
        self.products_df = spark.read.csv(products_path, header=True, inferSchema=True)
        end_time = time.time()
        # Record timing
        self.timings["load_data"] = end_time - start_time

    def clean_data(self):
        start_time = time.time()
        # Remove rows with missing values
        self.users_df = self.users_df.dropna()
        self.purchases_df = self.purchases_df.dropna()
        self.products_df = self.products_df.dropna()
        # Trigger action to enforce execution
        self.users_df.count()
        self.purchases_df.count()
        self.products_df.count()
        end_time = time.time()
        # Record timing
        self.timings["clean_data"] = end_time - start_time

    def total_purchases_by_category(self):
        start_time = time.time()
        # Calculate total purchases by product category
        joined_df = self.purchases_df.join(self.products_df, "product_id")
        total_purchases = joined_df.groupBy("category").agg(
            spark_sum(col("quantity") * col("price")).alias("total_sales")
        )
        # Trigger action
        total_purchases.count()
        end_time = time.time()
        # Record timing
        self.timings["total_purchases_by_category"] = end_time - start_time
        return total_purchases

    def purchases_by_category_age_group(self):
        start_time = time.time()
        # Calculate total purchases by category for age group 18-25
        users_age_group = self.users_df.filter((col("age") >= 18) & (col("age") <= 25))
        purchases_with_users = self.purchases_df.join(users_age_group, "user_id")
        joined_df = purchases_with_users.join(self.products_df, "product_id")
        total_purchases = joined_df.groupBy("category").agg(
            spark_sum(col("quantity") * col("price")).alias("total_sales")
        )
        # Trigger action
        total_purchases.count()
        end_time = time.time()
        # Record timing
        self.timings["purchases_by_category_age_group"] = end_time - start_time
        return total_purchases

    def percentage_purchases_by_category_age_group(self):
        start_time = time.time()
        # Calculate percentage of purchases by category for age group 18-25
        total_purchases_df = self.purchases_by_category_age_group()
        total_sales_row = total_purchases_df.agg(
            spark_sum("total_sales").alias("total_sales")
        ).collect()[0]
        total_sales = total_sales_row["total_sales"]
        percentage_df = total_purchases_df.withColumn(
            "percentage", spark_round((col("total_sales") / total_sales) * 100, 2)
        )
        # Trigger action
        percentage_df.count()
        end_time = time.time()
        # Record timing
        self.timings["percentage_purchases_by_category_age_group"] = (
            end_time - start_time
        )
        return percentage_df

    def top_three_categories(self):
        start_time = time.time()
        # Get top 3 categories with highest percentage of spending for age group 18-25
        percentage_df = self.percentage_purchases_by_category_age_group()
        top_three = percentage_df.orderBy(col("percentage").desc()).limit(3)
        # Trigger action
        top_three.count()
        end_time = time.time()
        # Record timing
        self.timings["top_three_categories"] = end_time - start_time
        return top_three

    def print_timings(self):
        # Create a pandas DataFrame from timings dictionary
        df = pd.DataFrame(
            list(self.timings.items()), columns=["Method", "Execution Time (seconds)"]
        )
        df["Execution Time (seconds)"] = df["Execution Time (seconds)"].apply(
            lambda x: round(x, 4)
        )
        display(df)

    def get_timings(self):
        # Return a copy of the timings dictionary
        return self.timings.copy()

# Step 3: Create an instance of the class and perform operations


In [3]:
processor = DataProcessor()

users_csv_path = "db/users.csv"
purchases_csv_path = "db/purchases.csv"
products_csv_path = "db/products.csv"

# Load data
processor.load_data(users_csv_path, purchases_csv_path, products_csv_path)

# Step 4: Clean data


In [4]:
processor.clean_data()

# Step 5: Calculate total purchases by product category


In [5]:
total_purchases = processor.total_purchases_by_category()
total_purchases.show()

+-----------+------------------+
|   category|       total_sales|
+-----------+------------------+
|       Home|1523.4999999999998|
|     Sports|1802.4999999999998|
|Electronics|1174.7999999999997|
|   Clothing|             790.3|
|     Beauty| 459.8999999999999|
+-----------+------------------+



# Step 6: Calculate total purchases by category for age group 18-25


In [6]:
age_group_purchases = processor.purchases_by_category_age_group()
age_group_purchases.show()

+-----------+------------------+
|   category|       total_sales|
+-----------+------------------+
|       Home|             361.1|
|     Sports|310.49999999999994|
|Electronics|             249.6|
|   Clothing|             245.0|
|     Beauty|41.400000000000006|
+-----------+------------------+



# Step 7: Calculate percentage of purchases by category for age group 18-25


In [7]:
percentage_purchases = processor.percentage_purchases_by_category_age_group()
percentage_purchases.show()

+-----------+------------------+----------+
|   category|       total_sales|percentage|
+-----------+------------------+----------+
|       Home|             361.1|      29.9|
|     Sports|310.49999999999994|     25.71|
|Electronics|             249.6|     20.67|
|   Clothing|             245.0|     20.29|
|     Beauty|41.400000000000006|      3.43|
+-----------+------------------+----------+



# Step 8: Get top 3 categories with highest percentage of spending for age group 18-25


In [8]:
top_three_categories = processor.top_three_categories()
top_three_categories.show()

+-----------+------------------+----------+
|   category|       total_sales|percentage|
+-----------+------------------+----------+
|       Home|             361.1|      29.9|
|     Sports|310.49999999999994|     25.71|
|Electronics|             249.6|     20.67|
+-----------+------------------+----------+



# Step 9: Functional implementation with timing measurements


In [9]:
# Initialize dictionary for timing measurements
functional_timings = {}

# Load data
start_time = time.time()
users_df = spark.read.csv(users_csv_path, header=True, inferSchema=True)
purchases_df = spark.read.csv(purchases_csv_path, header=True, inferSchema=True)
products_df = spark.read.csv(products_csv_path, header=True, inferSchema=True)
end_time = time.time()
functional_timings["load_data"] = end_time - start_time

# Clean data
start_time = time.time()
users_df = users_df.dropna()
purchases_df = purchases_df.dropna()
products_df = products_df.dropna()
# Trigger action
users_df.count()
purchases_df.count()
products_df.count()
end_time = time.time()
functional_timings["clean_data"] = end_time - start_time

# Calculate total purchases by product category
start_time = time.time()
joined_df = purchases_df.join(products_df, "product_id")
total_purchases_func = joined_df.groupBy("category").agg(
    spark_sum(col("quantity") * col("price")).alias("total_sales")
)
# Trigger action
total_purchases_func.count()
end_time = time.time()
functional_timings["total_purchases_by_category"] = end_time - start_time
total_purchases_func.show()

# Calculate total purchases by category for age group 18-25
start_time = time.time()
users_age_group = users_df.filter((col("age") >= 18) & (col("age") <= 25))
purchases_with_users = purchases_df.join(users_age_group, "user_id")
joined_df_age = purchases_with_users.join(products_df, "product_id")
age_group_purchases_func = joined_df_age.groupBy("category").agg(
    spark_sum(col("quantity") * col("price")).alias("total_sales")
)
# Trigger action
age_group_purchases_func.count()
end_time = time.time()
functional_timings["purchases_by_category_age_group"] = end_time - start_time
age_group_purchases_func.show()

# Calculate percentage of purchases by category for age group 18-25
start_time = time.time()
total_sales_row = age_group_purchases_func.agg(
    spark_sum("total_sales").alias("total_sales")
).collect()[0]
total_sales = total_sales_row["total_sales"]
percentage_purchases_func = age_group_purchases_func.withColumn(
    "percentage", spark_round((col("total_sales") / total_sales) * 100, 2)
)
# Trigger action
percentage_purchases_func.count()
end_time = time.time()
functional_timings["percentage_purchases_by_category_age_group"] = end_time - start_time
percentage_purchases_func.show()

# Get top 3 categories with highest percentage of spending for age group 18-25
start_time = time.time()
top_three_categories_func = percentage_purchases_func.orderBy(
    col("percentage").desc()
).limit(3)
# Trigger action
top_three_categories_func.count()
end_time = time.time()
functional_timings["top_three_categories"] = end_time - start_time
top_three_categories_func.show()

# Display timing measurements using pandas DataFrame
functional_df = pd.DataFrame(
    list(functional_timings.items()), columns=["Operation", "Execution Time (seconds)"]
)
functional_df["Execution Time (seconds)"] = functional_df[
    "Execution Time (seconds)"
].apply(lambda x: round(x, 4))
display(functional_df)

+-----------+------------------+
|   category|       total_sales|
+-----------+------------------+
|       Home|1523.4999999999998|
|     Sports|1802.4999999999998|
|Electronics|1174.7999999999997|
|   Clothing|             790.3|
|     Beauty| 459.8999999999999|
+-----------+------------------+

+-----------+------------------+
|   category|       total_sales|
+-----------+------------------+
|       Home|             361.1|
|     Sports|310.49999999999994|
|Electronics|             249.6|
|   Clothing|             245.0|
|     Beauty|41.400000000000006|
+-----------+------------------+

+-----------+------------------+----------+
|   category|       total_sales|percentage|
+-----------+------------------+----------+
|       Home|             361.1|      29.9|
|     Sports|310.49999999999994|     25.71|
|Electronics|             249.6|     20.67|
|   Clothing|             245.0|     20.29|
|     Beauty|41.400000000000006|      3.43|
+-----------+------------------+----------+

+------

Unnamed: 0,Operation,Execution Time (seconds)
0,load_data,1.8541
1,clean_data,0.7422
2,total_purchases_by_category,0.7872
3,purchases_by_category_age_group,1.2058
4,percentage_purchases_by_category_age_group,2.0163
5,top_three_categories,0.789


# Step 10: Compare execution times


In [10]:
# Print timing measurements for DataProcessor
processor.print_timings()

Unnamed: 0,Method,Execution Time (seconds)
0,load_data,11.454
1,clean_data,3.1591
2,total_purchases_by_category,2.7305
3,purchases_by_category_age_group,1.6662
4,percentage_purchases_by_category_age_group,3.7049
5,top_three_categories,4.7577


In [11]:
comparison_data = []
for key in processor.timings:
    class_time = processor.timings[key]
    func_time = functional_timings.get(key, 0)
    abs_diff = class_time - func_time  # Keep the sign to indicate faster or slower
    # Avoid division by zero
    if func_time != 0:
        percent_diff = (abs_diff / func_time) * 100
    else:
        percent_diff = 0
    comparison_data.append(
        {
            "Operation": key,
            "Class Execution Time (seconds)": round(class_time, 4),
            "Functional Execution Time (seconds)": round(func_time, 4),
            "Difference (seconds)": round(abs_diff, 4),
            "Percentage Difference (%)": round(percent_diff, 2),
        }
    )

comparison_df = pd.DataFrame(comparison_data)
display(comparison_df)

Unnamed: 0,Operation,Class Execution Time (seconds),Functional Execution Time (seconds),Difference (seconds),Percentage Difference (%)
0,load_data,11.454,1.8541,9.5999,517.75
1,clean_data,3.1591,0.7422,2.4168,325.61
2,total_purchases_by_category,2.7305,0.7872,1.9433,246.86
3,purchases_by_category_age_group,1.6662,1.2058,0.4604,38.18
4,percentage_purchases_by_category_age_group,3.7049,2.0163,1.6886,83.75
5,top_three_categories,4.7577,0.789,3.9687,503.01


# Final Testing


In [12]:
class Trainer:
    def __init__(
        self, users_csv_path, purchases_csv_path, products_csv_path, iterations=1000
    ):
        self.users_csv_path = users_csv_path
        self.purchases_csv_path = purchases_csv_path
        self.products_csv_path = products_csv_path
        self.iterations = iterations
        self.class_timings_list = []
        self.functional_timings_list = []

    def run_class_based(self):
        # Run class-based implementation multiple times with a progress bar
        print("Running class-based implementation...")
        start_time = time.time()
        for _ in tqdm(range(self.iterations), desc="Class-based Progress"):
            processor = DataProcessor()
            processor.load_data(
                self.users_csv_path, self.purchases_csv_path, self.products_csv_path
            )
            processor.clean_data()
            processor.total_purchases_by_category()
            processor.purchases_by_category_age_group()
            processor.percentage_purchases_by_category_age_group()
            processor.top_three_categories()
            # Collect timings
            self.class_timings_list.append(processor.get_timings())
            # Clear cache to free up memory
            spark.catalog.clearCache()
        end_time = time.time()
        total_time = end_time - start_time
        print(f"Total time for class-based implementation: {total_time:.2f} seconds")

    def run_functional(self):
        # Run functional implementation multiple times with a progress bar
        print("Running functional implementation...")
        start_time = time.time()
        for _ in tqdm(range(self.iterations), desc="Functional Progress"):
            functional_timings = {}
            # Load data
            start_time_loop = time.time()
            users_df = spark.read.csv(
                self.users_csv_path, header=True, inferSchema=True
            )
            purchases_df = spark.read.csv(
                self.purchases_csv_path, header=True, inferSchema=True
            )
            products_df = spark.read.csv(
                self.products_csv_path, header=True, inferSchema=True
            )
            end_time_loop = time.time()
            functional_timings["load_data"] = end_time_loop - start_time_loop

            # Clean data
            start_time_loop = time.time()
            users_df = users_df.dropna()
            purchases_df = purchases_df.dropna()
            products_df = products_df.dropna()
            # Trigger action
            users_df.count()
            purchases_df.count()
            products_df.count()
            end_time_loop = time.time()
            functional_timings["clean_data"] = end_time_loop - start_time_loop

            # Calculate total purchases by product category
            start_time_loop = time.time()
            joined_df = purchases_df.join(products_df, "product_id")
            total_purchases_func = joined_df.groupBy("category").agg(
                spark_sum(col("quantity") * col("price")).alias("total_sales")
            )
            # Trigger action
            total_purchases_func.count()
            end_time_loop = time.time()
            functional_timings["total_purchases_by_category"] = (
                end_time_loop - start_time_loop
            )

            # Calculate total purchases by category for age group 18-25
            start_time_loop = time.time()
            users_age_group = users_df.filter((col("age") >= 18) & (col("age") <= 25))
            purchases_with_users = purchases_df.join(users_age_group, "user_id")
            joined_df_age = purchases_with_users.join(products_df, "product_id")
            age_group_purchases_func = joined_df_age.groupBy("category").agg(
                spark_sum(col("quantity") * col("price")).alias("total_sales")
            )
            # Trigger action
            age_group_purchases_func.count()
            end_time_loop = time.time()
            functional_timings["purchases_by_category_age_group"] = (
                end_time_loop - start_time_loop
            )

            # Calculate percentage of purchases by category for age group 18-25
            start_time_loop = time.time()
            total_sales_row = age_group_purchases_func.agg(
                spark_sum("total_sales").alias("total_sales")
            ).collect()[0]
            total_sales = total_sales_row["total_sales"]
            percentage_purchases_func = age_group_purchases_func.withColumn(
                "percentage", spark_round((col("total_sales") / total_sales) * 100, 2)
            )
            # Trigger action
            percentage_purchases_func.count()
            end_time_loop = time.time()
            functional_timings["percentage_purchases_by_category_age_group"] = (
                end_time_loop - start_time_loop
            )

            # Get top 3 categories with highest percentage
            start_time_loop = time.time()
            top_three_categories_func = percentage_purchases_func.orderBy(
                col("percentage").desc()
            ).limit(3)
            # Trigger action
            top_three_categories_func.count()
            end_time_loop = time.time()
            functional_timings["top_three_categories"] = end_time_loop - start_time_loop

            # Collect timings
            self.functional_timings_list.append(functional_timings)
            # Clear cache to free up memory
            spark.catalog.clearCache()
        end_time = time.time()
        total_time = end_time - start_time
        print(f"Total time for functional implementation: {total_time:.2f} seconds")

    def average_timings(self, timings_list):
        # Calculate average timings from list of timing dictionaries
        avg_timings = {}
        num_runs = len(timings_list)
        keys = timings_list[0].keys()
        for key in keys:
            total_time = sum(timing[key] for timing in timings_list)
            avg_timings[key] = total_time / num_runs
        return avg_timings

    def compare_timings(self):
        # Average the timings
        avg_class_timings = self.average_timings(self.class_timings_list)
        avg_functional_timings = self.average_timings(self.functional_timings_list)

        # Prepare data for comparison
        comparison_data = []
        for key in avg_class_timings.keys():
            class_time = avg_class_timings[key]
            func_time = avg_functional_timings.get(key, 0)
            diff = class_time - func_time  # Retain the sign
            # Avoid division by zero
            if func_time != 0:
                percent_diff = (diff / func_time) * 100
            else:
                percent_diff = 0
            comparison_data.append(
                {
                    "Operation": key,
                    "Class Avg Time (s)": round(class_time, 4),
                    "Functional Avg Time (s)": round(func_time, 4),
                    "Difference (s)": round(diff, 4),
                    "Percentage Difference (%)": round(percent_diff, 2),
                }
            )
        # Create DataFrame
        comparison_df = pd.DataFrame(comparison_data)
        display(comparison_df)

In [13]:
# Create an instance of Trainer with desired number of iterations (e.g., 1000)
trainer = Trainer(users_csv_path, purchases_csv_path, products_csv_path, iterations=100)

# Run class-based implementation
trainer.run_class_based()

# Run functional implementation
trainer.run_functional()

# Compare average timings
print(
    "Comparing average execution times over {} iterations:".format(trainer.iterations)
)
trainer.compare_timings()

Running class-based implementation...


Class-based Progress:   0%|          | 0/100 [00:00<?, ?it/s]

Total time for class-based implementation: 254.22 seconds
Running functional implementation...


Functional Progress:   0%|          | 0/100 [00:00<?, ?it/s]

Total time for functional implementation: 120.16 seconds
Comparing average execution times over 100 iterations:


Unnamed: 0,Operation,Class Avg Time (s),Functional Avg Time (s),Difference (s),Percentage Difference (%)
0,load_data,0.3799,0.266,0.114,42.85
1,clean_data,0.2289,0.166,0.0629,37.89
2,total_purchases_by_category,0.1897,0.1404,0.0493,35.14
3,purchases_by_category_age_group,0.2359,0.1711,0.0648,37.9
4,percentage_purchases_by_category_age_group,0.6599,0.3057,0.3542,115.84
5,top_three_categories,0.8657,0.151,0.7148,473.43


# Close the SparkSession


In [14]:
spark.stop()

# Brief Conclusion:


# Classes are slower than functions:

load_data: 42.85% slower

clean_data: 37.89% slower

total_purchases_by_category: 35.14% slower

purchases_by_category_age_group: 37.90% slower

# Classes are significantly slower than functions:

percentage_purchases_by_category_age_group: 115.84% slower

top_three_categories: 473.43% slower

The results indicate that the class-based implementation consistently lags behind the functional implementation in all operations. The difference in performance is most notable in computationally intensive operations, such as percentage_purchases_by_category_age_group and top_three_categories. This suggests that the functional approach might better optimize Spark's operations due to reduced method call overhead or execution context.

The performance discrepancy also highlights the variability of Spark's execution times, which can depend on system load, caching, and Spark optimizations during different runs.