# Part I

Step 1 (50 points): Order the pair of strings alphabetically.

In [4]:
# Import PySpark
import pyspark
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/25 10:54:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/25 10:54:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/03/25 10:54:51 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [6]:
# Load data from rel.csv into RDD
rel_rdd = sc.textFile("rel.csv")

In [8]:
# Clean and order the pair of strings alphabetically
ordered_pairs_rdd = rel_rdd.map(lambda line: tuple(sorted(line.replace('"', '').split(','))))
ordered_pairs_rdd.take(5) # Preview first 5 ordered pairs

[('C0005778', 'C0005790'),
 ('C1255279', 'C3537249'),
 ('C0002520', 'C1255446'),
 ('C0596019', 'C1255552'),
 ('C0004611', 'C1254417')]

Step 2 (40 points): Count the number of instances for each ordered pair resulted in Step 1. Output the result in a plain text file (named as “pair-count.txt”).

In [11]:
# Count number of instances for each ordered pair
pair_counts_rdd = ordered_pairs_rdd.map(lambda pair: (pair, 1)).reduceByKey(lambda a, b: a + b)

# Format output
formatted_rdd = pair_counts_rdd.map(lambda x: f'"{x[0][0]}","{x[0][1]}" {x[1]}')

# Output the result
formatted_rdd.coalesce(1).saveAsTextFile("pair-count-tmp") # Save to a temporary folder

25/03/25 10:55:06 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [13]:
import os
import shutil

# Rename part file to final output
for file in os.listdir("pair-count-tmp"):
    if file.startswith("part-"):
        shutil.move(f"pair-count-tmp/{file}", "pair-count.txt")
        break

# Remove the temporary folder
shutil.rmtree("pair-count-tmp")

Step 3 (10 points): Count the total number of ordered pairs, that is, how many unique ordered pairs are obtained (rather than the total number of instances of ordered pairs).

In [16]:
# Count unique ordered pairs
unique_pair_count = pair_counts_rdd.count()
print(f"Total unique ordered pairs: {unique_pair_count}")



Total unique ordered pairs: 12946540


                                                                                

# Part II

Step 1 (20 points): Remove the rows where the column “BloodPressure”, “BMI” or “Glucose” is zero (note: remove the row as long as one of the three columns is zero).

In [20]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [22]:
# Load diabetes.csv into DataFrame
df = spark.read.csv("diabetes.csv", header=True, inferSchema=True)

In [24]:
# Remove rows
filtered_df = df.filter((df.BloodPressure != 0) & (df.BMI != 0) & (df.Glucose != 0))
filtered_df.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



Step 2 (10 points): Convert the categorical column “Pregnancies” into one-hot encoding.

In [27]:
# One-hot encode "Pregnancies"
indexer = StringIndexer(inputCol="Pregnancies", outputCol="PregnanciesIndex")
indexed_df = indexer.fit(filtered_df).transform(filtered_df)

encoder = OneHotEncoder(inputCol="PregnanciesIndex", outputCol="PregnanciesVec")
encoded_df = encoder.fit(indexed_df).transform(indexed_df)
encoded_df.select("Pregnancies", "PregnanciesIndex", "PregnanciesVec").show(5, truncate=False)

+-----------+----------------+--------------+
|Pregnancies|PregnanciesIndex|PregnanciesVec|
+-----------+----------------+--------------+
|6          |6.0             |(16,[6],[1.0])|
|1          |0.0             |(16,[0],[1.0])|
|8          |8.0             |(16,[8],[1.0])|
|1          |0.0             |(16,[0],[1.0])|
|0          |1.0             |(16,[1],[1.0])|
+-----------+----------------+--------------+
only showing top 5 rows



Step 3 (10 points): Create a single column with all the features collated together using VectorAssembler.

In [30]:
# Collate features in a single column
assembler = VectorAssembler(
    inputCols=["PregnanciesVec", "Glucose", "BloodPressure", "SkinThickness",
               "Insulin", "BMI", "DiabetesPedigreeFunction", "Age"],
    outputCol="features"
)
final_df = assembler.transform(encoded_df)
final_df.select("features", "Outcome").show(5, truncate=False)

+-------------------------------------------------------------------------+-------+
|features                                                                 |Outcome|
+-------------------------------------------------------------------------+-------+
|(23,[6,16,17,18,20,21,22],[1.0,148.0,72.0,35.0,33.6,0.627,50.0])         |1      |
|(23,[0,16,17,18,20,21,22],[1.0,85.0,66.0,29.0,26.6,0.351,31.0])          |0      |
|(23,[8,16,17,20,21,22],[1.0,183.0,64.0,23.3,0.672,32.0])                 |1      |
|(23,[0,16,17,18,19,20,21,22],[1.0,89.0,66.0,23.0,94.0,28.1,0.167,21.0])  |0      |
|(23,[1,16,17,18,19,20,21,22],[1.0,137.0,40.0,35.0,168.0,43.1,2.288,33.0])|1      |
+-------------------------------------------------------------------------+-------+
only showing top 5 rows



Step 4 (10 points): Random split the collated data into training (70%) and testing (30%) datasets and use 2017 as the seed.

In [33]:
# Split data
train_df, test_df = final_df.randomSplit([0.7, 0.3], seed=2017)

print(f"Training set size: {train_df.count()}")
print(f"Test set size: {test_df.count()}")

Training set size: 512
Test set size: 212


Step 5 (40 points): Implement a random forest classifier and specify the number of decision trees as 20.

In [36]:
# Random forest classifier
rf = RandomForestClassifier(labelCol="Outcome", featuresCol="features", numTrees=20)
rf_model = rf.fit(train_df)
predictions = rf_model.transform(test_df)
predictions.select("features", "Outcome", "prediction", "probability").show(5, truncate=False)

+------------------------------------------------------------------------+-------+----------+----------------------------------------+
|features                                                                |Outcome|prediction|probability                             |
+------------------------------------------------------------------------+-------+----------+----------------------------------------+
|(23,[1,16,17,20,21,22],[1.0,57.0,60.0,21.7,0.735,67.0])                 |0      |0.0       |[0.8051880801040465,0.19481191989595348]|
|(23,[1,16,17,20,21,22],[1.0,67.0,76.0,45.3,0.194,46.0])                 |0      |0.0       |[0.7304940685195123,0.2695059314804877] |
|(23,[1,16,17,18,19,20,21,22],[1.0,84.0,64.0,22.0,66.0,35.8,0.545,21.0]) |0      |0.0       |[0.8084763801181907,0.19152361988180933]|
|(23,[1,16,17,18,19,20,21,22],[1.0,93.0,60.0,25.0,92.0,28.7,0.532,22.0]) |0      |0.0       |[0.837044023188082,0.16295597681191798] |
|(23,[1,16,17,18,19,20,21,22],[1.0,93.0,100.0,39.0,72.0

Step 6 (10 points): Evaluate the performance of the random forest classifier using the ROC curve metric.

In [39]:
# Evaluate using ROC AUC
evaluator = BinaryClassificationEvaluator(labelCol="Outcome", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"ROC AUC: {roc_auc}")

ROC AUC: 0.8446731857951623
