In [13]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve
import numpy as np

print("Random Forest Implementation\n\n")

# Initialize Spark session
spark = SparkSession.builder.appName("CancerDiagnosisRandomForest").getOrCreate()

# Load dataset
data = spark.read.csv('./project3_data.csv', header=True, inferSchema=True)

# Data preprocessing
labelIndexer = StringIndexer(inputCol="diagnosis", outputCol="indexeddiagnosis").fit(data)
featureCols = data.columns
featureCols.remove('diagnosis')
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
(trainData, testData) = data.randomSplit([0.7, 0.3])

# Define and train the model
rf = RandomForestClassifier(labelCol="indexeddiagnosis", featuresCol="features")
pipeline = Pipeline(stages=[labelIndexer, assembler, rf])
model = pipeline.fit(trainData)

# Make predictions
predictions = model.transform(testData)

# Evaluate accuracy
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="indexeddiagnosis", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)
print("Test Accuracy = %g" % accuracy)

# Evaluate additional metrics
f1_evaluator = MulticlassClassificationEvaluator(labelCol="indexeddiagnosis", predictionCol="prediction", metricName="f1")
f1_score = f1_evaluator.evaluate(predictions)
print("F1 Score = %g" % f1_score)

precision_evaluator = MulticlassClassificationEvaluator(labelCol="indexeddiagnosis", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions)
print("Precision = %g" % precision)

recall_evaluator = MulticlassClassificationEvaluator(labelCol="indexeddiagnosis", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)
print("Recall = %g" % recall)

# Plot ROC curve (for binary classification)
if data.select("diagnosis").distinct().count() == 2:
    binary_evaluator = BinaryClassificationEvaluator(labelCol="indexeddiagnosis")
    roc_auc = binary_evaluator.evaluate(predictions, {binary_evaluator.metricName: "areaUnderROC"})
    print("ROC AUC = %g" % roc_auc)

    # Assuming 'probability' is the column returned by model.transform()
    # which contains the probability predictions for each class
    probs = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
    fpr, tpr, _ = roc_curve(predictions.select("indexeddiagnosis").collect(), probs)
    plt.plot(fpr, tpr, color='blue', label='ROC curve (area = %0.2f)' % roc_auc)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic Curve')
    plt.legend(loc="lower right")
    plt.show()
else:
    print("ROC curve not plotted as the classification problem is not binary.")


Random Forest Implementation




ConnectionRefusedError: [Errno 61] Connection refused

Logistic Regression Implementation




IllegalArgumentException: indexedDiagnosis does not exist. Available: id, diagnosis, Radius_mean, Texture_mean, perimeter_mean, area_mean, smoothness_mean, compactness_mean, concavity_mean, concave points_mean, symmetry_mean, fractal_dimension_mean, radius_se, texture_se, perimeter_se, area_se, smoothness_se, compactness_se, concavity_se, concave points_se, symmetry_se, fractal_dimension_se, radius_worst, texture_worst, perimeter_worst, area_worst, smoothness_worst, compactness_worst, concavity_worst, concave points_worst, symmetry_worst, fractal_dimension_worst, indexeddiagnosis, features