# BIG DATA PROJECTS SPARK

# Install and configure Spark on colab

In [1]:
# Install PySpark and Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark




In [2]:
import pyspark
print(pyspark.__version__)  # Check if PySpark is installed


3.5.5


In [3]:
# Library
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline


In [4]:
# Create Spark Session
spark = SparkSession.builder.appName("HeartDiseasePrediction").getOrCreate()


# PIPELINE

## 1. Load and preprocess the data (handle missing values, categorical encoding).

In [9]:
from google.colab import files

# Upload the CSV file manually
uploaded = files.upload()

# Read the dataset using Spark
df = spark.read.csv("HeartDiseaseUCI.csv", header=True, inferSchema=True)

# Display first few rows
df.show(5)


Saving HeartDiseaseUCI.csv to HeartDiseaseUCI (1).csv
+---+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+---+
|_c0|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|num|
+---+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+---+
|  1| 63|  1|  1|     145| 233|  1|      2|    150|    0|    2.3|    3|  0|   6|  0|
|  2| 67|  1|  4|     160| 286|  0|      2|    108|    1|    1.5|    2|  3|   3|  2|
|  3| 67|  1|  4|     120| 229|  0|      2|    129|    1|    2.6|    2|  2|   7|  1|
|  4| 37|  1|  3|     130| 250|  0|      0|    187|    0|    3.5|    3|  0|   3|  0|
|  5| 41|  0|  2|     130| 204|  0|      2|    172|    0|    1.4|    1|  0|   3|  0|
+---+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+---+
only showing top 5 rows



In [11]:
from pyspark.sql.types import IntegerType, DoubleType

# Get a list of numeric columns
numeric_cols = [c for c, dtype in df.dtypes if dtype in ('int', 'double')]

# Fill missing values with mean for numeric columns only
imputer = Imputer(inputCols=numeric_cols, outputCols=numeric_cols).setStrategy("mean")
df = imputer.fit(df).transform(df)


## 2. Feature engineering

In [12]:
categorical_columns = ["sex", "cp", "fbs", "restecg", "exang", "slope", "thal"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_indexed") for col in categorical_columns]


In [13]:
feature_cols = [c+"_indexed" for c in categorical_columns] + ["age", "trestbps", "chol", "thalach", "oldpeak"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")


## 3. Split data into training and testing sets

In [14]:
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)


## 4. Train a model using Random Forest

In [24]:
rf = RandomForestClassifier(labelCol="target", featuresCol="features", numTrees=100)


In [25]:

rf = RandomForestClassifier(labelCol="num", featuresCol="features", numTrees=100)

pipeline = Pipeline(stages=indexers + [assembler, rf])
model = pipeline.fit(train_data)


## 5. Evaluate using ROC AUC

In [31]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")


Accuracy: 0.5957446808510638


# 6. Improve the models

In [32]:
# Check class distribution
df.groupBy("target").count().show()


+------+-----+
|target|count|
+------+-----+
|     1|   55|
|     3|   35|
|     4|   13|
|     2|   36|
|     0|  164|
+------+-----+



In [33]:
# Resampling
from pyspark.sql.functions import col

# Find the max class count (class 0 has 164 instances)
max_count = df.groupBy("target").count().orderBy("count", ascending=False).first()["count"]

# Function to balance dataset
def balance_classes(df, target_col):
    balanced_df = None
    for label, count in df.groupBy(target_col).count().collect():
        oversample_ratio = max_count / count  # Compute ratio
        sampled_df = df.filter(col(target_col) == label).sample(True, oversample_ratio)  # Oversample
        balanced_df = sampled_df if balanced_df is None else balanced_df.union(sampled_df)
    return balanced_df

# Apply the balancing function
df_balanced = balance_classes(df, "target")
df_balanced.groupBy("target").count().show()  # Check new class distribution


+------+-----+
|target|count|
+------+-----+
|     1|  168|
|     3|  145|
|     4|  182|
|     2|  149|
|     0|  144|
+------+-----+



In [35]:
# Try a another model : Gradient Boosting

from pyspark.ml.classification import GBTClassifier
from pyspark.sql.functions import when

# Ensure the 'target' column only contains 0 and 1
# If target values are other than 0 or 1, convert them to 0
train_data = train_data.withColumn("target", when(train_data["target"] == 1, 1).otherwise(0))
test_data = test_data.withColumn("target", when(test_data["target"] == 1, 1).otherwise(0))

gbt = GBTClassifier(labelCol="target", featuresCol="features", maxIter=100)
pipeline = Pipeline(stages=indexers + [assembler, gbt])
model = pipeline.fit(train_data)

predictions = model.transform(test_data)


In [36]:
# Use Weighted Class Balancing (Assign Higher Weights to Minority Classes)
from pyspark.sql.functions import when

# Compute class weights
total_samples = df.count()
class_weights = df.groupBy("target").count().withColumn("weight", total_samples / col("count"))

# Join with the original dataset
df_weighted = df.join(class_weights.select("target", "weight"), on="target")

# Adjust model with weightCol
rf = RandomForestClassifier(labelCol="target", featuresCol="features", weightCol="weight", numTrees=100)


In [37]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Use "prediction" column for accuracy calculation
evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")

# Evaluate the new model
accuracy = evaluator.evaluate(predictions)
print(f"Updated Accuracy: {accuracy}")


Updated Accuracy: 0.7872340425531915


In [38]:
# Transform the test data using the trained pipeline model
predictions = model.transform(test_data)


In [40]:
rf = RandomForestClassifier(labelCol="num", featuresCol="features", numTrees=100)

In [41]:
# Show predictions alongside the actual label and predicted probability
predictions.select("target", "prediction", "probability").show(10)

+------+----------+--------------------+
|target|prediction|         probability|
+------+----------+--------------------+
|     1|       0.0|[0.97419072860503...|
|     0|       0.0|[0.98526407084945...|
|     0|       0.0|[0.72039463660067...|
|     0|       0.0|[0.93369415996233...|
|     0|       0.0|[0.98379154274471...|
|     0|       0.0|[0.95981687447955...|
|     0|       0.0|[0.77529469657789...|
|     0|       0.0|[0.95213935621222...|
|     0|       0.0|[0.98366959237480...|
|     0|       0.0|[0.76748140252849...|
+------+----------+--------------------+
only showing top 10 rows

