In [76]:
import pyspark
import timeit
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, mean, count, udf
import timeit
from pyspark.sql.types import StringType

In [77]:
cycles = 10

email_1 = 'splitted/emails_part_1.csv'
email_2 = 'splitted/emails_part_2.csv'
email_3 = 'splitted/emails_part_3.csv'
email_4 = 'splitted/emails_part_4.csv'
email_5 = 'splitted/emails_part_5.csv'
email_6 = 'splitted/emails_part_6.csv'


df1 = spark.read.option('header', 'true').csv(email_1, inferSchema=True) # 37  MB
df2 = spark.read.option('header', 'true').csv(email_2, inferSchema=True) # 79  MB
df3 = spark.read.option('header', 'true').csv(email_3, inferSchema=True) # 139 MB
df4 = spark.read.option('header', 'true').csv(email_4, inferSchema=True) # 331 MB
df5 = spark.read.option('header', 'true').csv(email_5, inferSchema=True) # 599 MB
df6 = spark.read.option('header', 'true').csv(email_6, inferSchema=True) # 1.08 GB

dfs = [df1, df2, df3, df4, df5, df6]

file_paths = [
    'splitted/emails_part_1.csv',
    'splitted/emails_part_2.csv',
    'splitted/emails_part_3.csv',
    'splitted/emails_part_4.csv',
    'splitted/emails_part_5.csv',
    'splitted/emails_part_6.csv'
]


def average_time(myList):
    total_time_spark = 0
    for i in myList:
        total_time_spark += i
    avg_time = total_time_spark / cycles
    return avg_time

                                                                                

In [78]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("emails_session").getOrCreate()
spark

# Data Loading


In [79]:
# Read the CSV file

# Read the CSV file
def read_csv(x):
    df = spark.read.option('header', 'true').csv(x, inferSchema=True)
    return df

# Measurement - read_csv
def read_time(dis):
    start_time = timeit.default_timer()
    dataframe = read_csv(dis)
    end_time = timeit.default_timer()
    return end_time - start_time

    
def average_time(myList):
    total_time_spark = 0
    for i in myList:
        total_time_spark += i
    avg_time = total_time_spark / cycles
    return avg_time

def measure_average_read_time(files, cycles=1):
    avg_times = []
    for file_path in files:
        timings_spark = [read_time(file_path) for _ in range(cycles)]
        avg_time_spark = average_time(timings_spark)
        avg_times.append(avg_time_spark)
    return avg_times

avg_times = measure_average_read_time(file_paths)

for i, avg_time in enumerate(avg_times, 1):
    print(f"Average read time for email_{i}: {avg_time:.5f} seconds")


[Stage 762:>                                                        (0 + 8) / 9]

Average read time for email_1: 0.02039 seconds
Average read time for email_2: 0.01532 seconds
Average read time for email_3: 0.01878 seconds
Average read time for email_4: 0.04353 seconds
Average read time for email_5: 0.06368 seconds
Average read time for email_6: 0.10706 seconds


                                                                                

# Filtering and selection


In [80]:
def filter_and_select_spark(df, sender=None):
    filtered_df = df
    if sender:
        filtered_df = filtered_df.filter(col("From") == sender) 
    return filtered_df

def filter_and_select_time_spark(file_path, sender):
    start_time = timeit.default_timer()
    filtered_df = filter_and_select_spark(df, sender)
    end_time = timeit.default_timer()
    return end_time - start_time


def measure_average_filter_and_select_time_spark(files, sender):
    avg_times = []
    for file_path in files:
        timings_spark = [filter_and_select_time_spark(file_path, sender) for _ in range(cycles)]
        avg_time_spark = average_time(timings_spark)
        avg_times.append(avg_time_spark)
    return avg_times


sender = "john.arnold@enron.com"
avg_times = measure_average_filter_and_select_time_spark(dfs, sender)

for i, avg_time in enumerate(avg_times, 1):
    print(f"Average filter and select time for email_{i} with sender '{sender}': {avg_time:.5f} seconds")

Average filter and select time for email_1 with sender 'john.arnold@enron.com': 0.00317 seconds
Average filter and select time for email_2 with sender 'john.arnold@enron.com': 0.00176 seconds
Average filter and select time for email_3 with sender 'john.arnold@enron.com': 0.00138 seconds
Average filter and select time for email_4 with sender 'john.arnold@enron.com': 0.00113 seconds
Average filter and select time for email_5 with sender 'john.arnold@enron.com': 0.00104 seconds
Average filter and select time for email_6 with sender 'john.arnold@enron.com': 0.00086 seconds


# GroupBy and aggregation

In [81]:


def groupby_and_aggregate_spark(df, groupby_column, agg_function):
    return df.groupby(groupby_column).agg(agg_function)

def groupby_and_aggregate_time_spark(file_path, groupby_column, agg_function):
    start_time = timeit.default_timer()
    grouped_df = groupby_and_aggregate_spark(df, groupby_column, agg_function)
    end_time = timeit.default_timer()
    return end_time - start_time


def measure_average_groupby_and_aggregate_time_spark(files, groupby_column, agg_function):
    avg_times = []
    for file_path in files:
        timings_spark = [groupby_and_aggregate_time_spark(file_path, groupby_column, agg_function) for _ in range(cycles)]
        avg_time_spark = average_time(timings_spark)
        avg_times.append(avg_time_spark)
    return avg_times

groupby_column = "From"
agg_function = {"Date": "count"}

avg_times = measure_average_groupby_and_aggregate_time_spark(dfs, groupby_column, agg_function)

for i, avg_time in enumerate(avg_times, 1):
    print(f"Average groupby and aggregate time for email_{i}: {avg_time:.5f} seconds")


Average groupby and aggregate time for email_1: 0.00488 seconds
Average groupby and aggregate time for email_2: 0.00281 seconds
Average groupby and aggregate time for email_3: 0.00222 seconds
Average groupby and aggregate time for email_4: 0.00259 seconds
Average groupby and aggregate time for email_5: 0.00256 seconds
Average groupby and aggregate time for email_6: 0.00249 seconds


# Data transformation

In [82]:
def transform_data_spark(df, column, transformation_function):
    udf_transform = udf(transformation_function, StringType())
    return df.withColumn(column, udf_transform(col(column)))

# Data transformation example
def uppercase_subject_spark(subject):
    if subject:
        return subject.upper()
    return subject


def transform_data_time_spark(file_path, column, transformation_function):
    start_time = timeit.default_timer()
    transformed_df = transform_data_spark(df, column, transformation_function)
    end_time = timeit.default_timer()
    return end_time - start_time


def measure_average_transform_data_time_spark(files, column, transformation_function):
    avg_times = []
    for file_path in files:
        timings_spark = [transform_data_time_spark(file_path, column, transformation_function) for _ in range(cycles)]
        avg_time_spark = average_time(timings_spark)
        avg_times.append(avg_time_spark)
    return avg_times

column = "Subject"

avg_times = measure_average_transform_data_time_spark(dfs, column, uppercase_subject_spark)

for i, avg_time in enumerate(avg_times, 1):
    print(f"Average data transformation time for email_{i}: {avg_time:.5f} seconds")


Average data transformation time for email_1: 0.00851 seconds
Average data transformation time for email_2: 0.00657 seconds
Average data transformation time for email_3: 0.00756 seconds
Average data transformation time for email_4: 0.00662 seconds
Average data transformation time for email_5: 0.00710 seconds
Average data transformation time for email_6: 0.00498 seconds
