# Data Engineering I: Comparative Analysis of MapReduce and Spark on Reddit Data
### *Spark implementation and Evaluation: Tove Gunnarsson and Tebogo Sanelo Mitane*

Test cases:
| Dataset sizes  | 1 node | 2 nodes | 4 nodes |
| -----------    | ------ | ------- | ------- |
| 50k    |      |      |      |      |
| 100k   |      |      |      |      |
| 200k   |      |      |      |      |
| 400k   |      |      |      |      |
| 800k ?   |      |      |      |      |

Evaluation metrics: Execution time, CPU and Memory usage.

Measurements are written to a ... file and imported to a separate notebook where plots and other stats are generated.


In [1]:
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession.builder\
        .master("spark://192.168.2.156:7077") \
        .appName("Group 35")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 1)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .config("spark.executor.instances", 4)\
        .config("spark.ui.port", "8082")\
        .config("spark.cores.max", 1)\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/15 07:12:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Preprocessing & Processing functions
The functions to execute the preprocessing and counting job, as well as perform the measurements have been optimized to run in parallel, by choosing functions like
 * F.expr() instead of .withColumn()
 * agg() instead of groupBy().sum()



In [2]:
from pyspark.sql import SparkSession, functions as F
import time
import psutil
from pyspark.sql.functions import lower, regexp_replace, col, when, count, sum as spark_sum

# Define country list
countries = ["China", "India", "United States", "Indonesia", "Pakistan", 
             "Brazil", "Nigeria", "Bangladesh", "Russia", "Mexico"]

def preprocess_data(df):
    """Optimized preprocessing: drops nulls, replaces missing values, lowers text, and removes punctuation."""
    df = df.select("subreddit", "body").na.fill({"subreddit": "unknown", "body": ""})
    df = df.withColumn("body", F.lower(F.regexp_replace(F.col("body"), "[^a-zA-Z0-9\s]", "")))
    return df

def mentions_counter(df):
    """Optimized category count using a single operation for all columns."""
    
    exprs = [F.when(F.col("body").contains(item.lower()), 1).otherwise(0).alias(item) for item in countries]
    df = df.select("subreddit", *exprs)

    # Count distinct subreddits where each category is mentioned
    subreddit_mentions = df.groupBy("subreddit").agg(*[F.max(item).alias(item) for item in countries])
    
    # Sum up the unique subreddit counts across all subreddits
    total_mentions = subreddit_mentions.agg(*[F.sum(item).alias(item) for item in countries])
    
    return subreddit_mentions, total_mentions


def measure_performance(df, dataset_name, metrics_dict):
    """Measure execution time and system utilization."""
    cpu_before = psutil.cpu_percent(interval=None)
    memory_before = psutil.virtual_memory().used / (1024 ** 3)  # Convert bytes to GB
    
    start_time = time.time()

    subreddit_mentions, total_mentions = mentions_counter(df)

    end_time = time.time()
    execution_time = end_time - start_time

    cpu_after = psutil.cpu_percent(interval=None)
    memory_after = psutil.virtual_memory().used / (1024 ** 3)  # Convert bytes to GB

    memory_peak = psutil.Process().memory_info().rss / (1024 ** 3)  # Resident Set Size (RSS) in GB

    cpu_usage = cpu_after - cpu_before
    memory_usage = memory_after - memory_before

    metrics_dict[dataset_name] = {
        "Execution Time (s)": execution_time,
        "CPU Usage": cpu_usage,
        "Memory Usage": memory_usage,
        "Memory Peak": memory_peak,
    }
    
    return metrics_dict

    

Reading the datasets and creating the 400k and 800k datasets

In [3]:
# Step 4) Performance evaluation loop
import csv
metrics_result = []  # Dictionary to store performance results

dataset_paths = [
    "hdfs://192.168.2.156:9000/data/reddit/reddit_50k.json",
    "hdfs://192.168.2.156:9000/data/reddit/reddit_100k.json",
    "hdfs://192.168.2.156:9000/data/reddit/reddit_200k.json"
]

for dataset_path in dataset_paths:
    dataset_name = dataset_path.split("/")[-1]  # Extract dataset name
    print(f"\nProcessing dataset: {dataset_name}")

    # Load dataset
    reddit_df = spark_session.read.json(dataset_path)

    # Preprocess data
    reddit_df = preprocess_data(reddit_df)

    # Measure performance & store results
    performance_metrics = measure_performance(reddit_df, dataset_name, {})

    metrics_result.append({
        "Dataset" : dataset_name, 
        "No. nodes": 4, # <-------------- CHANGE FOR DIFFERENT NO. NODES
        **performance_metrics[dataset_name]
    })


Processing dataset: reddit_50k.json


                                                                                


Processing dataset: reddit_100k.json


                                                                                


Processing dataset: reddit_200k.json


                                                                                

In [4]:
print(metrics_result)

[{'Dataset': 'reddit_50k.json', 'No. nodes': 4, 'Execution Time (s)': 0.27208399772644043, 'CPU Usage': 41.5, 'Memory Usage': 0.067291259765625, 'Memory Peak': 0.07555770874023438}, {'Dataset': 'reddit_100k.json', 'No. nodes': 4, 'Execution Time (s)': 0.2372734546661377, 'CPU Usage': 34.5, 'Memory Usage': 0.043979644775390625, 'Memory Peak': 0.07555770874023438}, {'Dataset': 'reddit_200k.json', 'No. nodes': 4, 'Execution Time (s)': 0.14737772941589355, 'CPU Usage': 27.8, 'Memory Usage': 0.00048065185546875, 'Memory Peak': 0.07563018798828125}]


In [5]:
## Do the 400k and 800k datasets here (SEPARATELY)

#Code for importing / creating the datasets
# Load full 500k dataset
# df_500k = spark_session.read.json("hdfs://192.168.2.156:9000/data/reddit/reddit_500k.json")
# # Sample 4/5 (80%) of the dataset to get ~400K rows
# df_400k = df_500k.sample(fraction=0.8, seed=42)

# Load full dataset (19.8 GB)
df_full = spark_session.read.json("hdfs://192.168.2.156:9000/data/reddit/corpus-webis-tldr-17.json")

# Sample approximately 400K rows
df_400k = df_full.sample(fraction=(400000 / df_full.count()), seed=42)

# Sample approximately 800K rows
df_800k = df_full.sample(fraction=(800000 / df_full.count()), seed=42)

#dataset_path2 = [df_400k, df_800k]


# # Run the preprocess and the measurements

# reddit_df = preprocess_data(400k)
# performance_metrics = measure_performance(reddit_df, dataset_name, {})

# # Save the results to the SAME list : metrics_result
# metrics_result.append({
#     "Dataset" : dataset_name, 
#     "No. nodes": 1,   # <-------------- CHANGE FOR DIFFERENT NO. NODES
#     **performance_metrics[dataset_name]
# })

# # THEN save the file in the results folder by running the next block  |
# #                                                                     V

# List of dataset names corresponding to the 400k and 800k DataFrames
dataset_names = ["df_400k", "df_800k"]

# List of DataFrames to process
dataset_path2 = [df_400k, df_800k]

for i, dataset in enumerate(dataset_path2):
    dataset_name = dataset_names[i]  # Use the predefined dataset names
    print(f"\nProcessing dataset: {dataset_name}")

    # Preprocess data
    reddit_df2 = preprocess_data(dataset)  # Ensure you pass the DataFrame, not a column

    # Measure performance & store results
    performance_metrics2 = measure_performance(reddit_df2, dataset_name, {})

    metrics_result.append({
        "Dataset": dataset_name, 
        "No. nodes": 4,  # Change for different number of nodes
        **performance_metrics2[dataset_name]
    })



                                                                                


Processing dataset: df_400k

Processing dataset: df_800k


In [6]:
print(metrics_result)

[{'Dataset': 'reddit_50k.json', 'No. nodes': 4, 'Execution Time (s)': 0.27208399772644043, 'CPU Usage': 41.5, 'Memory Usage': 0.067291259765625, 'Memory Peak': 0.07555770874023438}, {'Dataset': 'reddit_100k.json', 'No. nodes': 4, 'Execution Time (s)': 0.2372734546661377, 'CPU Usage': 34.5, 'Memory Usage': 0.043979644775390625, 'Memory Peak': 0.07555770874023438}, {'Dataset': 'reddit_200k.json', 'No. nodes': 4, 'Execution Time (s)': 0.14737772941589355, 'CPU Usage': 27.8, 'Memory Usage': 0.00048065185546875, 'Memory Peak': 0.07563018798828125}, {'Dataset': 'df_400k', 'No. nodes': 4, 'Execution Time (s)': 0.2130265235900879, 'CPU Usage': 21.400000000000002, 'Memory Usage': 0.0, 'Memory Peak': 0.075775146484375}, {'Dataset': 'df_800k', 'No. nodes': 4, 'Execution Time (s)': 0.1454622745513916, 'CPU Usage': -24.3, 'Memory Usage': 0.0, 'Memory Peak': 0.075775146484375}]


In [7]:
# Save result to CSV file
csv_filename = "./results/results_4node.csv" # <--------- CHANGE FOR DIFFERENT NO. NODES
with open(csv_filename, mode="w", newline="") as file:
    writer = csv.DictWriter(file, fieldnames=["Dataset", "No. nodes", "Execution Time (s)", "CPU Usage", "Memory Usage", "Memory Peak", "Total Cores Used"])
    writer.writeheader()
    writer.writerows(metrics_result)

print(f"\nPerformance results saved to {csv_filename}")


Performance results saved to ./results/results_4node.csv


In [9]:
# Terminate session
spark_session.stop()