## Setting up Spark Session / Context

In [None]:
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, BooleanType, DoubleType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression


In [None]:
# Spark cluster init
def bytes_to_mb(size_bytes):
    """Converts bytes to megabytes."""
    return size_bytes / (1024 * 1024)

def bytes_to_gb(size_bytes):
    """Converts bytes to gigabytes."""
    return size_bytes / (1024 * 1024 * 1024)

def configure_spark(dataset_size_gb):
    """
    Configures Spark executor cores and memory based on the dataset size.

    The number of executor cores is calculated using a linear relationship with the dataset size (in GB),
    ensuring the result is an even number. Executor memory is determined based on the number of executor cores,
    with a maximum memory limit.

    Args:
        dataset_size_gb (float): Size of the dataset in gigabytes.

    Returns:
        tuple: A tuple containing:
            - executor_cores (int): The number of executor cores.
            - executor_memory (str): The executor memory configuration (e.g., "4g").

    Configuration Details:
        - Executor cores are calculated as: `executor_cores = int(dataset_size_gb * core_factor) + 2`
        - `core_factor` is set to 2.
        - The calculated `executor_cores` is then adjusted to the next even number if it is odd.
        - Executor memory is determined by: `executor_memory = f"{min(executor_cores, 4) * memory_factor}g"`
        - `memory_factor` is set to 1.
        - The memory value is capped, ensuring it doesn't exceed 4GB if `executor_cores` is greater than 4.
    """

    core_factor   = 2
    memory_factor = 1
    executor_cores  = int(dataset_size_gb * core_factor) + 2
    # Ensure executor_cores is even
    if executor_cores % 2 != 0:
        executor_cores += 1
    executor_memory = f"{min(executor_cores, 4) * memory_factor}g"
    
    return executor_cores, executor_memory


def build_spark_session(hdfs_path, file_path, verbose=False):
    """Builds a Spark session and retrieves file size from HDFS.

    Args:
        hdfs_path (str): HDFS path.
        file_path (str): File path within HDFS.
        verbose (bool, optional): Enable verbose output. Defaults to False.

    Returns:
        tuple: SparkSession, SparkContext, and file size.
    """
    spark = SparkSession.builder.appName("Project Group 32 HDFSFileSize").getOrCreate()
    jvm = spark._jvm
    conf = jvm.org.apache.hadoop.conf.Configuration()
    fs = jvm.org.apache.hadoop.fs.FileSystem.get(jvm.java.net.URI.create(hdfs_path), conf)
    path = jvm.org.apache.hadoop.fs.Path(file_path)
    fileStatus = fs.getFileStatus(path)
    fileSize = fileStatus.getLen()

    if verbose:
        print(f"File size in bytes: {fileSize}")

    spark.stop()

    executor_cores, executor_memory = configure_spark(bytes_to_gb(fileSize))

    """
    Configuration Parameters:
        - master (str): The master URL for the cluster. In this case, it's "spark://192.168.2.156:7077",
        indicating a Spark standalone cluster.
        - appName (str): The name of the Spark application, set to "Project Group 32".
        - spark.dynamicAllocation.enabled (bool): Enables or disables dynamic allocation of executors.
        Set to True, allowing Spark to adjust the number of executors dynamically based on workload.
        - spark.dynamicAllocation.shuffleTracking.enabled (bool): Enables or disables shuffle tracking for dynamic allocation.
        Set to True, which is often necessary for dynamic allocation to work correctly with external shuffle services.
        - spark.shuffle.service.enabled (bool): Enables or disables an external shuffle service.
        Set to False, meaning the shuffle operations will be handled by the executors themselves.
        - spark.dynamicAllocation.executorIdleTimeout (str):  Specifies the duration for which an executor can be idle before being removed.
        Set to "60s", meaning executors idle for 60 seconds will be reclaimed.
        - spark.executor.cores (int): The number of cores allocated to each executor.
        Set to the minimum of `executor_cores` and 4.  This limits the number of cores per executor to a maximum of 4.
        - spark.executor.memory (str): The amount of memory allocated to each executor.
        Set to the value of `executor_memory`, which should be a string representing the memory size (e.g., "4g").
        - spark.cores.max (int): The maximum number of total cores to use for the application.
        Set to the minimum of `executor_cores` and 32. This limits the total number of cores the application can request to a maximum of 32.
        - spark.driver.port (int): The port used by the Spark driver process.
        Set to 9999.
        - spark.blockManager.port (int): The port used by the Spark block manager.
        Set to 10005.
    """

    spark_session = SparkSession.builder\
            .master("spark://192.168.2.156:7077") \
            .appName("Project Group 32")\
            .config("spark.dynamicAllocation.enabled", True)\
            .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
            .config("spark.shuffle.service.enabled", False)\
            .config("spark.dynamicAllocation.executorIdleTimeout","60s")\
            .config("spark.executor.cores", min(executor_cores, 4))\
            .config("spark.executor.memory", executor_memory)\
            .config("spark.cores.max", min(executor_cores, 32))\
            .config("spark.driver.port",9999)\
            .config("spark.blockManager.port",10005)\
            .getOrCreate()

    # RDD API
    spark_context = spark_session.sparkContext
    spark_context.setLogLevel("ERROR")

    if verbose:
        print(f"A files size of {bytes_to_gb(fileSize):.4f} GB give a maximum \n"+
              f"of {spark_session.conf.get('spark.cores.max')} cores divided on spark executors with:\n"+
            f"Executor cores: {spark_session.conf.get('spark.executor.cores')}\n"+
            f"Executor memory: {spark_session.conf.get('spark.executor.memory')}\n"+
            f"Mem/core: {int(spark_session.conf.get('spark.executor.memory')[:-1])/int(spark_session.conf.get('spark.executor.cores')):.0f}GB")

    return spark_session, spark_context, fileSize

In [None]:
def load_data(spark_session, hdfs_path, file_path, schema=None, verbose=False):
    """Loads JSON data from HDFS into a Spark DataFrame.

    Args:
        spark_session (SparkSession): Spark session.
        hdfs_path (str): HDFS path.
        file_path (str): File path within HDFS.
        schema: Schema for the JSON object. Defaults to None which infers schema from the data.
        verbose (bool, optional): Enable verbose output. Defaults to False.

    Returns:
        DataFrame: Loaded Spark DataFrame.
    """
    # Load JSON file into a Spark DataFrame
    if schema is None:
        df = spark_session.read.json(hdfs_path + file_path)
        if verbose:
            # Show schema to understand the structure
            print("The schema:")
            df.printSchema()
            print("\n")
    else:
        df = spark_session.read.json(hdfs_path + file_path, schema=schema)

    if verbose:
        # Count the number of partitions in the underlying RDD.
        print(f"Number of default partitions after loading the data: {df.rdd.getNumPartitions()}")
        print("\n")

    return df

In [None]:
def filter_data(df):
    """
    Filters a PySpark DataFrame to include only rows from the top 25 most frequent subreddits,
    after removing rows with null values in 'subreddit', 'summary', or 'content' columns.

    Args:
        df (pyspark.sql.DataFrame): The input PySpark DataFrame.

    Returns:
        pyspark.sql.DataFrame: A filtered PySpark DataFrame containing only rows:
            - where 'subreddit', 'summary', and 'content' are not null.
            - where 'subreddit' is one of the top 25 most frequent subreddits.

    Steps:
        1. Filters out rows with null values in 'subreddit', 'summary', or 'content'.
        2. Groups the filtered DataFrame by 'subreddit', counts the occurrences, and orders them in descending order.
        3. Retrieves the top 25 subreddits based on the counts.
        4. Filters the null-filtered DataFrame to include only rows where 'subreddit' is in the top 25 list.
    """
    # Filter out NULL subreddit, summary, or content
    df_filtered = df.filter((col("subreddit").isNotNull()) & (col("summary").isNotNull()) & (col("content").isNotNull()))

    # Group on subreddit and create a "count" for each in descending order
    df_counts = df_filtered.groupBy("subreddit").count().orderBy(col("count").desc())

    # Retrieve the top 25 subreddits
    top_25_counts = df_counts.limit(25)
    
    # Collect (transfer them locally, not distributed) and put into a list
    top_25_subreddits = [row.subreddit for row in top_25_counts.collect()]

    # Filter the null-filtered data based on the top 25 subreddits
    df_filtered = df_filtered.filter(col("subreddit").isin(top_25_subreddits))

    return df_filtered



In [None]:
def split_data(df, seed=42, test_fraction=0.2):
    """
    Splits a PySpark DataFrame into training and test sets using random sampling.

    Args:
        df (pyspark.sql.DataFrame): The input PySpark DataFrame.
        seed (int, optional): The seed for the random number generator, ensuring reproducibility. Defaults to 42.
        test_fraction (float, optional): The fraction of data to be included in the test set. Defaults to 0.2.

    Returns:
        tuple: A tuple containing two PySpark DataFrames: (train_data, test_data).
            - train_data: The training set DataFrame.
            - test_data: The test set DataFrame.

    Example:
        >>> train, test = split_data(my_dataframe, seed=123, test_fraction=0.3)
        >>> print(f"Training set count: {train.count()}")
        >>> print(f"Test set count: {test.count()}")
    """
    # Split data into training and test sets
    train_data, test_data = df.randomSplit([(1-test_fraction), test_fraction], seed=seed)

    return train_data, test_data

In [21]:
def pre_processing_pipe():
    """
    Creates a pipeline for pre-processing text data for machine learning.

    This pipeline includes tokenization, stop word removal, TF-IDF vectorization,
    label indexing, and feature assembly.

    Returns:
        list: A list of Spark ML pipeline stages.
    """
    # Tokenize summary and content
    tokenizer  = Tokenizer(inputCol="summary", outputCol="summary_tokens")
    tokenizer2 = Tokenizer(inputCol="content", outputCol="content_tokens")

    # Remove stopwords
    stopwords_remover  = StopWordsRemover(inputCol="summary_tokens", outputCol="summary_clean")
    stopwords_remover2 = StopWordsRemover(inputCol="content_tokens", outputCol="content_clean")

    # Convert words to numerical features using TF-IDF
    hashing_tf = HashingTF(inputCol="summary_clean", outputCol="summary_tf", numFeatures=1000)
    idf = IDF(inputCol="summary_tf", outputCol="summary_features")

    hashing_tf2 = HashingTF(inputCol="content_clean", outputCol="content_tf", numFeatures=1000)
    idf2 = IDF(inputCol="content_tf", outputCol="content_features")

    # Convert subreddit (text label) into a numerical label
    label_indexer = StringIndexer(inputCol="subreddit", outputCol="label", handleInvalid="keep")

    # Combine summary and content features
    feature_assembler = VectorAssembler(inputCols=["summary_features", "content_features"], outputCol="features")

    # Return pre-processing pipeline.
    return [tokenizer, tokenizer2, stopwords_remover, stopwords_remover2,
            hashing_tf, idf, hashing_tf2, idf2, label_indexer, feature_assembler]

In [None]:
def model_eval(model, test_data, description="", verbose=False):
    """
    Evaluates a machine learning model's accuracy on test data.

    Args:
        model: The trained Spark ML model.
        test_data (DataFrame): The test dataset.
        description (str, optional): A description of the model for output. Defaults to "".
        verbose (bool, optional): Enable verbose output. Defaults to False.

    Returns:
        float: The accuracy of the model.
    """
    # Make predictions on test data
    predictions = model.transform(test_data)

    # Evaluate model accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    if verbose:
      print(f"Evaluation of {description}. \n"+
            f"Model Accuracy: {accuracy:.4f}")
    
    return accuracy


In [None]:
def random_forest(train_data, pre_pipe):
    """
    Trains a Random Forest classification model.

    Args:
        train_data (DataFrame): The training dataset.
        pre_pipe (list): List of pre-processing stages.

    Returns:
        PipelineModel: The trained Random Forest model.
    """
    # Define the Random Forest classifier
    classifier = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100)

    # Create a new pipeline using Random Forest
    pipeline = Pipeline(stages= pre_pipe + [classifier])

    # Train the model
    model = pipeline.fit(train_data)

    # Save the trained model
    #model.save("hdfs://192.168.2.156:9000/data/reddit/model/reddit_text_classifier_rf")
    
    return model

In [None]:
def logistic_regression(train_data, pre_pipe):
    """
    Trains a Logistic Regression classification model.

    Args:
        train_data (DataFrame): The training dataset.
        pre_pipe (list): List of pre-processing stages.

    Returns:
        PipelineModel: The trained Logistic Regression model.
    """
    # Define the classification model
    classifier = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

    # Create a new pipeline using Logistic Regression
    pipeline = Pipeline(stages= pre_pipe + [classifier])

    # Train the model
    model = pipeline.fit(train_data)

    # Save the trained model
    # model.save("hdfs://192.168.2.156:9000/data/reddit/model/reddit_text_classifier")

    return model

In [25]:
def print_results(i, fileSize, executor_cores, executor_memory, max_cores, data_load_time, training_time, evaluation_time, overall_exec_time, model_accuracy):

    print("-" * 80)
    print("Spark Processing and Model Evaluation Results\n")
    print(f"Iteration {i}")
    print("-" * 80)

    print(f"File Size:        {fileSize:.2f} GB")
    print(f"Max cores:        {max_cores}")
    print(f"Executor Cores:   {executor_cores}")
    print(f"Executor Memory:  {executor_memory}")

    print("-" * 80)
    print("Performance Metrics:")
    print("-" * 80)

    print(f"Data load Time:         {data_load_time:.2f} seconds")
    print(f"Training Time:          {training_time:.2f} seconds")
    print(f"Evaluation Time:        {evaluation_time:.2f} seconds")
    print(f"Overall execution Time: {overall_exec_time:.2f} seconds")
    print(f"Model Accuracy:         {model_accuracy:.4f}")

    print("-" * 80)

In [26]:
def evaluate_performance(hdfs_path, file_path, schema, n=5, verbose=False):

    overall_exec_time   = np.zeros(n)
    data_load_time      = np.zeros(n)
    training_time       = np.zeros(n)
    evaluation_time     = np.zeros(n)
    model_accuracy      = np.zeros(n)

    for i in range(n):

        print(f"File {file_path}, run {i}")
        start_time = time.time()

        # Create a spark session
        spark_session, spark_context, fileSize = build_spark_session(hdfs_path, file_path, verbose=verbose)
        
        # Load the data
        df = load_data(spark_session, hdfs_path, file_path, fileSize, schema=schema, verbose=verbose)

        # Filter the data
        df = filter_data(df)

        # Split data into training and test sets
        train_data, test_data = split_data(df)

        # Save time for data load/transform
        data_time = time.time()
        data_load_time[i] = data_time - start_time

        # Create a pipeline for the pre-processing
        pre_pipe = pre_processing_pipe()
        # Create and train a ML model for classification
        #model = random_forest(train_data, pre_pipe)
        model = logistic_regression(train_data, pre_pipe)

        # Save time for model training
        train_time = time.time()
        training_time[i] = train_time - data_time

        # Evaluate the performance of the ML model on the test data
        #model_accuracy[i] = model_eval(model, test_data, description="Random forest classifier", verbose=verbose)
        model_accuracy[i] = model_eval(model, test_data, description="Logistic regression classifier", verbose=verbose)

        # Save time for model evaluation
        eval_time = time.time()
        evaluation_time[i] = eval_time - train_time

        executor_cores = spark_session.conf.get("spark.executor.cores")
        executor_memory = spark_session.conf.get("spark.executor.memory")
        max_cores = spark_session.conf.get('spark.cores.max')

        spark_context.stop()

        # Determine overall execution time
        end_time = time.time()
        overall_exec_time[i] = end_time - start_time
        
        if verbose:
            print_results(i, bytes_to_gb(fileSize), executor_cores, executor_memory, max_cores, data_load_time[i], training_time[i], evaluation_time[i], overall_exec_time[i], model_accuracy[i])

    return [bytes_to_gb(fileSize), executor_cores, executor_memory, max_cores, data_load_time.mean(), training_time.mean(), evaluation_time.mean(), overall_exec_time.mean(), model_accuracy.mean()]        

In [27]:
# Define schemas for the reddit data
subreddit_field = StructField(name="subreddit", dataType=StringType(), nullable=True)
summary_field   = StructField(name="summary",   dataType=StringType(), nullable=True)
content_field   = StructField(name="content",   dataType=StringType(), nullable=True)
body_field      = StructField(name="body",      dataType=StringType(), nullable=True)

schema_v0 = StructType([subreddit_field])
schema_v1 = StructType([subreddit_field, summary_field, content_field])

In [28]:
results = list()
hdfs_path = "hdfs://192.168.2.156:9000"

file_path = "/data/reddit/"

files = ["reddit_50k.json", "reddit_100k.json", 
         "reddit_200k.json", "reddit_500k.json", 
         "corpus-webis-tldr-17.json"]

for file in files:
    try:
        res = evaluate_performance(hdfs_path, f"{file_path}{file}", schema=schema_v1, n=5)
        results.append([file[:-5]] + res)

    except Exception as e:
        print(f"Crashed when evaluating {file} with error:")
        print(str(e))


result_df = pd.DataFrame(results, columns=['File', 'File size', 'Executor cores', 'Executor memory', 'Max cores', 'Data load time', 'Training time', 'Evaluation time', 'Overall exec time', 'Model accuracy']) 
print(result_df)

File /data/reddit/reddit_50k.json, run 0


                                                                                

File /data/reddit/reddit_50k.json, run 1


                                                                                

File /data/reddit/reddit_50k.json, run 2


                                                                                

File /data/reddit/reddit_50k.json, run 3


                                                                                

File /data/reddit/reddit_50k.json, run 4


                                                                                

File /data/reddit/reddit_100k.json, run 0


                                                                                

File /data/reddit/reddit_100k.json, run 1


                                                                                

File /data/reddit/reddit_100k.json, run 2


                                                                                

File /data/reddit/reddit_100k.json, run 3


                                                                                

File /data/reddit/reddit_100k.json, run 4


                                                                                

File /data/reddit/reddit_200k.json, run 0


                                                                                

File /data/reddit/reddit_200k.json, run 1


                                                                                

File /data/reddit/reddit_200k.json, run 2


                                                                                

File /data/reddit/reddit_200k.json, run 3


                                                                                

File /data/reddit/reddit_200k.json, run 4


                                                                                

File /data/reddit/reddit_500k.json, run 0


                                                                                

File /data/reddit/reddit_500k.json, run 1


                                                                                

File /data/reddit/reddit_500k.json, run 2


                                                                                

File /data/reddit/reddit_500k.json, run 3


                                                                                

File /data/reddit/reddit_500k.json, run 4


                                                                                

File /data/reddit/corpus-webis-tldr-17.json, run 0


                                                                                

File /data/reddit/corpus-webis-tldr-17.json, run 1


                                                                                

File /data/reddit/corpus-webis-tldr-17.json, run 2


                                                                                

File /data/reddit/corpus-webis-tldr-17.json, run 3


                                                                                

File /data/reddit/corpus-webis-tldr-17.json, run 4


                                                                                

                   File  File size Executor cores Executor memory Max cores  \
0            reddit_50k   0.365163              2              2g         2   
1           reddit_100k   0.730300              4              4g         4   
2           reddit_200k   1.456891              4              4g         4   
3           reddit_500k   3.637079              4              4g        10   
4  corpus-webis-tldr-17  18.272816              4              4g        32   

   Data load time  Training time  Evaluation time  Overall exec time  \
0       10.302017      45.125135         3.271335          59.005185   
1       10.082480      29.313975         3.010626          42.592113   
2       13.161324      43.896724         4.674155          61.997070   
3       19.901318     180.384358        13.887223         214.570157   
4       88.463843     481.229342        28.389683         598.375960   

   Model accuracy  
0        0.480953  
1        0.500373  
2        0.526313  
3        0.5

In [29]:
out_path = "/home/ubuntu/out/"
out_name = "log_reg_performance_data"
result_df.to_csv(path_or_buf=f"{out_path}{out_name}.csv")

In [30]:
!pip install openpyxl 
with pd.ExcelWriter(f"{out_path}{out_name}.xlsx", engine="openpyxl") as writer:
    result_df.to_excel(writer) 

Defaulting to user installation because normal site-packages is not writeable
