# Distributed Random Forest using Apache Spark
## Programming Assignment – Parallel Machine Learning

This notebook trains a Distributed Random Forest using Apache Spark,
evaluates performance, and measures speedup.


In [ ]:
# Install pyspark if needed
# !pip install pyspark


In [ ]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import time
import pandas as pd
import matplotlib.pyplot as plt


## Create Spark Session

In [ ]:
spark = SparkSession.builder \
    .appName('DistributedRF') \
    .master('local[*]') \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')


## Load Dataset
Make sure `distributed_rf_dataset.csv` is in the same folder as this notebook.

In [ ]:
data_path = 'distributed_rf_dataset.csv'
df = spark.read.csv(data_path, header=True, inferSchema=True)

label_column = 'label'
feature_cols = [c for c in df.columns if c != label_column]

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
df = assembler.transform(df).select('features', label_column)

df.show(5)

## Training Function

In [ ]:
def train_model(partitions):
    temp_df = df.repartition(partitions)
    train_data, test_data = temp_df.randomSplit([0.8, 0.2], seed=42)

    rf = RandomForestClassifier(
        featuresCol='features',
        labelCol=label_column,
        numTrees=100,
        maxDepth=10
    )

    start = time.time()
    model = rf.fit(train_data)
    training_time = time.time() - start

    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(labelCol=label_column)
    auc = evaluator.evaluate(predictions)

    return training_time, auc


## Run Experiments

In [ ]:
partitions_list = [1, 2, 4, 8]

results = []

for p in partitions_list:
    print(f'Training with {p} partitions...')
    t, auc = train_model(p)
    results.append((p, t, auc))

results_df = pd.DataFrame(results, columns=['Nodes', 'Training_Time', 'AUC'])

# Compute Speedup
baseline_time = results_df.loc[results_df['Nodes'] == 1, 'Training_Time'].values[0]
results_df['Speedup'] = baseline_time / results_df['Training_Time']

results_df

## Plot 1: Training Time vs Nodes

In [ ]:
plt.figure()
plt.plot(results_df['Nodes'], results_df['Training_Time'], marker='o')
plt.xlabel('Number of Nodes')
plt.ylabel('Training Time (seconds)')
plt.title('Training Time vs Number of Nodes')
plt.grid(True)
plt.show()

## Plot 2: Speedup vs Nodes

In [ ]:
plt.figure()
plt.plot(results_df['Nodes'], results_df['Speedup'], marker='o')
plt.xlabel('Number of Nodes')
plt.ylabel('Speedup')
plt.title('Speedup vs Number of Nodes')
plt.grid(True)
plt.show()

## Save Results

In [ ]:
results_df.to_csv('experiment_results.csv', index=False)
results_df