<a href="https://colab.research.google.com/github/Sayed-Hossein-Hosseini/SparkML_Heart_Risk_Classifier/blob/master/SparkML_Heart_Risk_Classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **SparkML Heart Risk Classifier**

## **Libraries**

In [21]:
pip install pyspark



In [22]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.sql.functions import col

## **Loading Dataset**

In [23]:
# Create a local Spark session
spark = SparkSession.builder.appName("HeartDiseaseClassification").getOrCreate()

# Upload CSV file
data = spark.read.csv("heart_disease_uci.csv", header=True, inferSchema=True)

# Display data
data.printSchema()
data.show(5)

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- dataset: string (nullable = true)
 |-- cp: string (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: boolean (nullable = true)
 |-- restecg: string (nullable = true)
 |-- thalch: integer (nullable = true)
 |-- exang: boolean (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: string (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: string (nullable = true)
 |-- num: integer (nullable = true)

+---+---+------+---------+---------------+--------+----+-----+--------------+------+-----+-------+-----------+---+-----------------+---+
| id|age|   sex|  dataset|             cp|trestbps|chol|  fbs|       restecg|thalch|exang|oldpeak|      slope| ca|             thal|num|
+---+---+------+---------+---------------+--------+----+-----+--------------+------+-----+-------+-----------+---+--------------

## **Data Preprocessing**

In [31]:
# Step 1: Categorical columns to be indexed
categorical_cols = ['sex', 'dataset', 'cp', 'restecg', 'slope', 'thal']
indexers = [StringIndexer(inputCol=c, outputCol=c + "_indexed", handleInvalid="keep") for c in categorical_cols]

# Step 2: Define final feature columns after indexing
numerical_cols = ['age', 'trestbps', 'chol', 'fbs', 'thalch', 'exang', 'oldpeak', 'ca']
final_feature_cols = numerical_cols + [c + "_indexed" for c in categorical_cols]

# Step 3: VectorAssembler
assembler = VectorAssembler(inputCols=final_feature_cols, outputCol="features", handleInvalid="keep")

# Step 4: Label indexer
label_indexer = StringIndexer(inputCol="num", outputCol="label")  # or "target" if your dataset uses that

# step 5: Train / Test Split
data = data.dropna()
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

## **Building a Classification Model**

In [32]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# Build Pipeline
pipeline = Pipeline(stages=indexers + [assembler, label_indexer, rf])

# Training Model
model = pipeline.fit(train_data)

## **Model Evaluation**

In [35]:
# Prediction on test data
predictions = model.transform(test_data)
predictions.select("prediction", "label", "probability").show(5)

# Evaluation criteria
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision").evaluate(predictions)
recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall").evaluate(predictions)
f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1").evaluate(predictions)

# roc_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderROC").evaluate(predictions)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
# print("ROC-AUC:", roc_auc)


+----------+-----+--------------------+
|prediction|label|         probability|
+----------+-----+--------------------+
|       3.0|  1.0|[0.07863744951865...|
|       1.0|  3.0|[0.26943736597709...|
|       2.0|  2.0|[0.18585347032870...|
|       0.0|  0.0|[0.68050937397274...|
|       0.0|  0.0|[0.92264573164801...|
+----------+-----+--------------------+
only showing top 5 rows

Accuracy: 0.5106382978723404
Precision: 0.39620060790273554
Recall: 0.5106382978723404
F1 Score: 0.4311381290104694
