<a href="https://colab.research.google.com/github/BaherMo/Bdatatask10/blob/main/Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

# ***PRE-PROCESSING***

In [3]:
# Install gdown library
!pip install gdown

# Import necessary libraries
from pyspark.sql import SparkSession
import gdown
import os

# Create a Spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Define the file ID and output file name
file_id = '1XPTMjsJrFFZTiVhSm5Q--gfkbm8jQGnO'
output_file = 'your_dataset.csv'

# Download the file from Google Drive
gdown.download(f'https://drive.google.com/uc?id={file_id}', output_file, quiet=False)

# Read the dataset using PySpark
df_spark = spark.read.csv(output_file, header=True, inferSchema=True)

# Show the DataFrame
df_spark.show()




Downloading...
From: https://drive.google.com/uc?id=1XPTMjsJrFFZTiVhSm5Q--gfkbm8jQGnO
To: /content/your_dataset.csv
100%|██████████| 3.81M/3.81M [00:00<00:00, 27.8MB/s]


+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|gender| age|hypertension|heart_disease|smoking_history|  bmi|HbA1c_level|blood_glucose_level|diabetes|
+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+
|Female|80.0|           0|            1|          never|25.19|        6.6|                140|       0|
|Female|54.0|           0|            0|        No Info|27.32|        6.6|                 80|       0|
|  Male|28.0|           0|            0|          never|27.32|        5.7|                158|       0|
|Female|36.0|           0|            0|        current|23.45|        5.0|                155|       0|
|  Male|76.0|           1|            1|        current|20.14|        4.8|                155|       0|
|Female|20.0|           0|            0|          never|27.32|        6.6|                 85|       0|
|Female|44.0|           0|            0|          never|19.31|  

Checking for missing values

In [4]:
from pyspark.sql.functions import col

# Check for missing values
missing_values_count = df_spark.select([col(c).isNull().cast("int").alias(c) for c in df_spark.columns]).groupBy().sum().collect()[0]

# Print the number of missing values for each column
for col_name, missing_count in zip(df_spark.columns, missing_values_count):
    print(f"Number of missing values in {col_name}: {missing_count}")

# Total number of missing values
total_missing_values = sum(missing_values_count)
print(f"Total number of missing values = {total_missing_values}")



Number of missing values in gender: 0
Number of missing values in age: 0
Number of missing values in hypertension: 0
Number of missing values in heart_disease: 0
Number of missing values in smoking_history: 0
Number of missing values in bmi: 0
Number of missing values in HbA1c_level: 0
Number of missing values in blood_glucose_level: 0
Number of missing values in diabetes: 0
Total number of missing values = 0


In [5]:
# Display summary statistics
df_spark.describe().show()

# Display schema information
df_spark.printSchema()

# Display the number of rows and columns
num_rows = df_spark.count()
num_cols = len(df_spark.columns)

print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")


+-------+------+-----------------+------------------+------------------+---------------+-----------------+------------------+-------------------+-------------------+
|summary|gender|              age|      hypertension|     heart_disease|smoking_history|              bmi|       HbA1c_level|blood_glucose_level|           diabetes|
+-------+------+-----------------+------------------+------------------+---------------+-----------------+------------------+-------------------+-------------------+
|  count|100000|           100000|            100000|            100000|         100000|           100000|            100000|             100000|             100000|
|   mean|  null|41.88585600000013|           0.07485|           0.03942|           null|27.32076709999422|5.5275069999983275|          138.05806|              0.085|
| stddev|  null|22.51683987161704|0.2631504702289171|0.1945930169980986|           null|6.636783416648357|1.0706720918835468|  40.70813604870383|0.27888308976661896|
|   

Dealing with Duplicated Data

In [6]:
# Check for duplicates
num_duplicates = df_spark.count() - df_spark.dropDuplicates().count()

print(f"Number of duplicate rows: {num_duplicates}")


Number of duplicate rows: 3854


In [7]:
# Remove duplicate rows
df_spark = df_spark.dropDuplicates()


Defining Categorical and Continous Variables

In [8]:
# Replace 'df2' with the actual name of your PySpark DataFrame
cont_cols = [col_name for col_name, data_type in df_spark.dtypes if data_type in ['int', 'bigint', 'float', 'double']]
cat_cols = list(set(df_spark.columns) - set(cont_cols))

# Print the continuous variables
print("The continuous variables are:", cont_cols)

# Print the categorical variables
print("The categorical variables are:", cat_cols)


The continuous variables are: ['age', 'hypertension', 'heart_disease', 'bmi', 'HbA1c_level', 'blood_glucose_level', 'diabetes']
The categorical variables are: ['smoking_history', 'gender']


Converting Categorical data into Numeric data

In [9]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Define the columns to be converted
categorical_columns = ['gender', 'smoking_history']

# Create a StringIndexer for each categorical column
indexers = [StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index") for col_name in categorical_columns]

# Create a pipeline to execute the indexers
pipeline = Pipeline(stages=indexers)

# Fit and transform the DataFrame using the pipeline
df_spark_indexed = pipeline.fit(df_spark).transform(df_spark)

# Show the transformed DataFrame
df_spark_indexed.show()


+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+------------+---------------------+
|gender| age|hypertension|heart_disease|smoking_history|  bmi|HbA1c_level|blood_glucose_level|diabetes|gender_index|smoking_history_index|
+------+----+------------+-------------+---------------+-----+-----------+-------------------+--------+------------+---------------------+
|Female|21.0|           0|            0|          never|27.32|        5.8|                126|       0|         0.0|                  0.0|
|  Male|26.0|           0|            0|          never|27.32|        6.6|                100|       0|         1.0|                  0.0|
|Female|49.0|           0|            0|          never| 21.7|        5.8|                158|       0|         0.0|                  0.0|
|Female|24.0|           0|            0|         former|20.47|        4.8|                100|       0|         0.0|                  2.0|
|Female|53.0|           0| 

In [10]:
# Drop original columns
df_spark_indexed = df_spark_indexed.drop('gender', 'smoking_history')

# Rename indexed columns
df_spark_indexed = df_spark_indexed.withColumnRenamed('gender_index', 'gender').withColumnRenamed('smoking_history_index', 'smoking_history')

# Show the updated DataFrame
df_spark_indexed.show()


+----+------------+-------------+-----+-----------+-------------------+--------+------+---------------+
| age|hypertension|heart_disease|  bmi|HbA1c_level|blood_glucose_level|diabetes|gender|smoking_history|
+----+------------+-------------+-----+-----------+-------------------+--------+------+---------------+
|21.0|           0|            0|27.32|        5.8|                126|       0|   0.0|            0.0|
|26.0|           0|            0|27.32|        6.6|                100|       0|   1.0|            0.0|
|49.0|           0|            0| 21.7|        5.8|                158|       0|   0.0|            0.0|
|24.0|           0|            0|20.47|        4.8|                100|       0|   0.0|            2.0|
|53.0|           0|            0| 31.4|        5.7|                 85|       0|   0.0|            0.0|
|74.0|           0|            0| 40.5|        3.5|                160|       0|   0.0|            2.0|
|76.0|           0|            0|27.76|        6.5|             

Info

In [11]:
from pyspark.sql.functions import col


# Define the numeric columns
numeric_columns = ['age', 'hypertension', 'heart_disease', 'bmi', 'HbA1c_level', 'blood_glucose_level', 'diabetes']

# Display descriptive statistics for each feature
for col_name in numeric_columns:
    df_spark_indexed.select(col(col_name)).describe().show()


+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|             96146|
|   mean| 41.79432571297818|
| stddev|22.462947577419353|
|    min|              0.08|
|    max|              80.0|
+-------+------------------+

+-------+-------------------+
|summary|       hypertension|
+-------+-------------------+
|  count|              96146|
|   mean|0.07760073221974913|
| stddev|0.26754364703227407|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

+-------+-------------------+
|summary|      heart_disease|
+-------+-------------------+
|  count|              96146|
|   mean|0.04080252948640609|
| stddev|0.19783349095456876|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

+-------+------------------+
|summary|               bmi|
+-------+------------------+
|  count|             96146|
|   mean|27.321461111226636|
| stddev| 6.767715560480339|
|    min|             

Undersampling

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Add a unique index column to df_spark_indexed
df_spark_indexed_with_index = df_spark_indexed.withColumn("index", F.monotonically_increasing_id())

# Before undersampling
original_class_distribution = df_spark_indexed_with_index.groupBy('diabetes').count().show()
print("Original Class Distribution:")
original_class_distribution

# Define a Window specification
window_spec = Window.partitionBy('diabetes').orderBy('index')

# Undersample each group (diabetes=0 and diabetes=1) to have exactly 8482 rows
undersampled_df = df_spark_indexed_with_index.withColumn(
    'row_num',
    F.row_number().over(window_spec)
).filter('row_num <= 8482').drop('index', 'row_num')

# After undersampling
undersampled_class_distribution = undersampled_df.groupBy('diabetes').count().show()
print("Undersampled Class Distribution:")
undersampled_class_distribution

# Check the number of rows in the undersampled DataFrame
num_rows_undersampled = undersampled_df.count()
print("Number of Rows in Undersampled DataFrame: {}".format(num_rows_undersampled))


+--------+-----+
|diabetes|count|
+--------+-----+
|       1| 8482|
|       0|87664|
+--------+-----+

Original Class Distribution:
+--------+-----+
|diabetes|count|
+--------+-----+
|       1| 8482|
|       0| 8482|
+--------+-----+

Undersampled Class Distribution:
Number of Rows in Undersampled DataFrame: 16964


In [14]:
undersampled_df.show()



+----+------------+-------------+-----+-----------+-------------------+--------+------+---------------+
| age|hypertension|heart_disease|  bmi|HbA1c_level|blood_glucose_level|diabetes|gender|smoking_history|
+----+------------+-------------+-----+-----------+-------------------+--------+------+---------------+
|55.0|           0|            0|42.64|        6.0|                155|       1|   0.0|            0.0|
|68.0|           1|            0|42.15|        6.2|                145|       1|   1.0|            0.0|
|73.0|           1|            0| 22.8|        9.0|                126|       1|   0.0|            0.0|
|60.0|           0|            0|45.98|        6.5|                159|       1|   0.0|            0.0|
|80.0|           0|            1| 27.8|        8.8|                145|       1|   1.0|            2.0|
|74.0|           0|            1|28.83|        6.0|                280|       1|   0.0|            0.0|
|62.0|           0|            0|27.32|        5.7|             

Removing Outliers

In [18]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


# Define the feature columns (excluding the target column)
feature_columns = [col_name for col_name in df_spark_cleaned.columns if col_name != 'diabetes']

# Standardize the features
for col_name in feature_columns:
    mean_col = F.mean(col(col_name)).over(Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
    std_col = F.stddev(col(col_name)).over(Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
    col_name_z = f'{col_name}_z'
    df_spark_cleaned = df_spark_cleaned.withColumn(col_name_z, (col(col_name) - mean_col) / std_col)

# Define a threshold to identify outliers (e.g., 3 standard deviations)
threshold = 3

# Create a boolean mask to identify rows with outliers
outlier_mask_expr = ' OR '.join([f'abs({col_name}_z) > {threshold}' for col_name in feature_columns])
df_spark_cleaned = df_spark_cleaned.filter(~F.expr(outlier_mask_expr))

# Drop the Z-score columns
df_spark_cleaned = df_spark_cleaned.drop(*[f'{col_name}_z' for col_name in feature_columns])

# Show the cleaned DataFrame
df_spark_cleaned.show()


+----+------------+-------------+-----+-----------+-------------------+--------+------+---------------+
| age|hypertension|heart_disease|  bmi|HbA1c_level|blood_glucose_level|diabetes|gender|smoking_history|
+----+------------+-------------+-----+-----------+-------------------+--------+------+---------------+
|55.0|           0|            0|42.64|        6.0|                155|       1|   0.0|            0.0|
|68.0|           1|            0|42.15|        6.2|                145|       1|   1.0|            0.0|
|73.0|           1|            0| 22.8|        9.0|                126|       1|   0.0|            0.0|
|60.0|           0|            0|45.98|        6.5|                159|       1|   0.0|            0.0|
|62.0|           0|            0|27.32|        5.7|                260|       1|   0.0|            0.0|
|14.0|           0|            0|19.97|        8.2|                260|       1|   0.0|            0.0|
|52.0|           0|            0| 42.1|        6.1|             

In [19]:
df=df_spark_cleaned
df.show()
df.count()

+----+------------+-------------+-----+-----------+-------------------+--------+------+---------------+
| age|hypertension|heart_disease|  bmi|HbA1c_level|blood_glucose_level|diabetes|gender|smoking_history|
+----+------------+-------------+-----+-----------+-------------------+--------+------+---------------+
|55.0|           0|            0|42.64|        6.0|                155|       1|   0.0|            0.0|
|68.0|           1|            0|42.15|        6.2|                145|       1|   1.0|            0.0|
|73.0|           1|            0| 22.8|        9.0|                126|       1|   0.0|            0.0|
|60.0|           0|            0|45.98|        6.5|                159|       1|   0.0|            0.0|
|62.0|           0|            0|27.32|        5.7|                260|       1|   0.0|            0.0|
|14.0|           0|            0|19.97|        8.2|                260|       1|   0.0|            0.0|
|52.0|           0|            0| 42.1|        6.1|             

15234

Splitting Data to Training and Test (80% and 20%)

In [20]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession

# Assemble features into a single vector column
feature_columns = [col_name for col_name in df_spark_cleaned.columns if col_name != 'diabetes']
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
df_spark_assembled = vector_assembler.transform(df_spark_cleaned)

# Standardize the features
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=True)
scaler_model = scaler.fit(df_spark_assembled)
df_spark_scaled = scaler_model.transform(df_spark_assembled)

# Split the data into training and testing sets
# Use randomSplit method
train_ratio = 0.8
test_ratio = 1 - train_ratio
seed = 42

df_spark_train, df_spark_test = df_spark_scaled.randomSplit([train_ratio, test_ratio], seed=seed)

# Show the dimensions of the resulting DataFrames
print("Training set size:", df_spark_train.count())
print("Testing set size:", df_spark_test.count())


Training set size: 12243
Testing set size: 2991


# ***MODELS*** **bold text**

Linear Regression Model

In [24]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# Assemble features into a single vector column with a different name
feature_columns = [col_name for col_name in df_spark_train.columns if col_name != 'diabetes']
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol='assembled_features')
df_spark_train_assembled = vector_assembler.transform(df_spark_train)
df_spark_test_assembled = vector_assembler.transform(df_spark_test)

# Initialize the Linear Regression model
lr = LinearRegression(featuresCol='assembled_features', labelCol='diabetes')

# Fit the model
model = lr.fit(df_spark_train_assembled)

# Make predictions on the training set
train_predictions = model.transform(df_spark_train_assembled)

# Make predictions on the testing set
test_predictions = model.transform(df_spark_test_assembled)

# Evaluate R-squared on the training set
evaluator_train = RegressionEvaluator(labelCol="diabetes", predictionCol="prediction", metricName="r2")
r2_train = evaluator_train.evaluate(train_predictions)
print("R-squared on training set:", r2_train)

# Evaluate R-squared on the testing set
evaluator_test = RegressionEvaluator(labelCol="diabetes", predictionCol="prediction", metricName="r2")
r2_test = evaluator_test.evaluate(test_predictions)
print("R-squared on testing set:", r2_test)


R-squared on training set: 0.5875790008080288
R-squared on testing set: 0.5810031612628319


Decision Trees Model

In [32]:
from pyspark.ml.feature import StringIndexer

# Create StringIndexer for categorical columns
gender_indexer = StringIndexer(inputCol='gender', outputCol='gender_index')
smoking_history_indexer = StringIndexer(inputCol='smoking_history', outputCol='smoking_history_index')

# Assemble features into a single vector column
feature_columns = [col_name for col_name in df_spark_cleaned.columns if col_name not in ['diabetes', 'gender', 'smoking_history']]
vector_assembler = VectorAssembler(inputCols=feature_columns + ['gender_index', 'smoking_history_index'], outputCol='features')

# Initialize DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol='features', labelCol='diabetes', seed=42)

# Create a pipeline
pipeline = Pipeline(stages=[gender_indexer, smoking_history_indexer, vector_assembler, dt])

# Split the data into training and testing sets
train_ratio = 0.8
test_ratio = 1 - train_ratio
seed = 42

df_spark_train, df_spark_test = df_spark_cleaned.randomSplit([train_ratio, test_ratio], seed=seed)

# Fit the model
model = pipeline.fit(df_spark_train)

# Make predictions on the testing set
df_spark_predictions = model.transform(df_spark_test)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='diabetes', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(df_spark_predictions)
df_spark_train_predictions = model.transform(df_spark_train)
# Evaluate the model on training set
evaluator_train = MulticlassClassificationEvaluator(labelCol='diabetes', predictionCol='prediction', metricName='accuracy')
accuracy_train = evaluator_train.evaluate(df_spark_train_predictions)

print("Decision Tree Training Accuracy:", accuracy_train)
print("Decision Tree Test Accuracy:", accuracy)


Decision Tree Training Accuracy: 0.8909580985052683
Decision Tree Test Accuracy: 0.8933467067870278


Decision Trees Precision, Recall, F1 Scores and Confusion Matrix

In [33]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F


# Make predictions on the testing set
df_spark_test_predictions = model.transform(df_spark_test)

# Evaluate precision
precision_evaluator = MulticlassClassificationEvaluator(labelCol='diabetes', predictionCol='prediction', metricName='weightedPrecision')
precision = precision_evaluator.evaluate(df_spark_test_predictions)

print("Precision:", precision)

# Evaluate recall
recall_evaluator = MulticlassClassificationEvaluator(labelCol='diabetes', predictionCol='prediction', metricName='weightedRecall')
recall = recall_evaluator.evaluate(df_spark_test_predictions)

print("Recall:", recall)


Precision: 0.8972908310644124
Recall: 0.8933467067870278


In [34]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions on the testing set
df_spark_test_predictions = model.transform(df_spark_test)

# Evaluate F1 score
f1_evaluator = MulticlassClassificationEvaluator(labelCol='diabetes', predictionCol='prediction', metricName='f1')
f1_score = f1_evaluator.evaluate(df_spark_test_predictions)

print("F1 Score:", f1_score)


F1 Score: 0.8933864804371636


In [35]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert the PySpark DataFrame to an RDD of (prediction, label) tuples
prediction_and_label = df_spark_test_predictions.select('prediction', 'diabetes').rdd.map(lambda row: (float(row['prediction']), float(row['diabetes'])))

# Instantiate the MulticlassMetrics class
metrics = MulticlassMetrics(prediction_and_label)

# Get the confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()

print("Confusion Matrix:")
print(confusion_matrix)


Confusion Matrix:
[[1348.  229.]
 [  90. 1324.]]


Random Forest Model Accuracy

In [40]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Check if 'new_features' column already exists
if 'new_features' in df_spark_cleaned.columns:
    # If it exists, drop the column
    df_spark_cleaned = df_spark_cleaned.drop('new_features')

# Assemble features into a single vector column
feature_columns = [col_name for col_name in df_spark_cleaned.columns if col_name != 'diabetes']
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol='new_features')
df_spark_assembled = vector_assembler.transform(df_spark_cleaned)

# Split the data into training and testing sets
# Use randomSplit method
train_ratio = 0.8
test_ratio = 1 - train_ratio
seed = 42
df_spark_train, df_spark_test = df_spark_assembled.randomSplit([train_ratio, test_ratio], seed=seed)

# Create a RandomForestClassifier without using VectorAssembler
rf = RandomForestClassifier(featuresCol='new_features', labelCol='diabetes', seed=42)

# Create a pipeline without VectorAssembler
pipeline_rf = Pipeline(stages=[rf])

# Train the model
model_rf = pipeline_rf.fit(df_spark_train)

# Make predictions on the testing set
df_spark_test_predictions_rf = model_rf.transform(df_spark_test)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='diabetes', metricName='accuracy')
accuracy_rf = evaluator.evaluate(df_spark_test_predictions_rf)

print("Random Forest Test Accuracy:", accuracy_rf)


Random Forest Test Accuracy: 0.8936810431293881


Random Forest Precission, Recall, F1 Scores and Confusion Matrix

In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# Make predictions on the testing set
df_spark_test_predictions_rf = model_rf.transform(df_spark_test)

# Evaluate the model using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='diabetes', predictionCol='prediction', metricName='accuracy')
accuracy_rf = evaluator.evaluate(df_spark_test_predictions_rf)

# Calculate precision, recall, and F1-score using MulticlassMetrics
predictionAndLabels = df_spark_test_predictions_rf.select('prediction', 'diabetes').rdd.map(lambda row: (float(row['prediction']), float(row['diabetes'])))
metrics = MulticlassMetrics(predictionAndLabels)

# Precision
precision_rf = metrics.precision(label=1.0)
print("Precision:", precision_rf)

# Recall
recall_rf = metrics.recall(label=1.0)
print("Recall:", recall_rf)

# F1 Score
f1_rf = metrics.fMeasure(label=1.0)
print("F1 Score:", f1_rf)

# Confusion Matrix
confusion_matrix_rf = metrics.confusionMatrix().toArray()
print("Confusion Matrix:")
print(confusion_matrix_rf)


Precision: 0.9376996805111821
Recall: 0.8302687411598303
F1 Score: 0.8807201800450113
Confusion Matrix:
[[1499.   78.]
 [ 240. 1174.]]


Accuracy for SVM Model

In [44]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline


# Assemble features into a single vector column
feature_columns = [col_name for col_name in df_spark_train.columns if col_name != 'diabetes']
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Standardize the features
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=True)

# Define the SVM model
svm = LinearSVC(featuresCol='scaled_features', labelCol='diabetes')

# Create a pipeline
pipeline_svm = Pipeline(stages=[vector_assembler, scaler, svm])

# Train the SVM model
model_svm = pipeline_svm.fit(df_spark_train)

# Make predictions on the testing set
df_spark_test_predictions_svm = model_svm.transform(df_spark_test)

# Evaluate the model using MulticlassClassificationEvaluator
evaluator_svm = MulticlassClassificationEvaluator(labelCol='diabetes', metricName='accuracy')
accuracy_svm = evaluator_svm.evaluate(df_spark_test_predictions_svm)

# Print SVM Test Accuracy
print("SVM Test Accuracy:", accuracy_svm)


SVM Test Accuracy: 0.8839852892009361


SVM Model Precission, Recall, F1 Scores and Confusion Matrix


In [46]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# Assemble features into a single vector column
feature_columns = [col_name for col_name in df_spark_train.columns if col_name != 'diabetes']
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Standardize the features
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=True)

# Define the SVM model
svm = LinearSVC(featuresCol='scaled_features', labelCol='diabetes')

# Create a pipeline
pipeline_svm = Pipeline(stages=[vector_assembler, scaler, svm])

# Train the SVM model
model_svm = pipeline_svm.fit(df_spark_train)

# Make predictions on the testing set
df_spark_test_predictions_svm = model_svm.transform(df_spark_test)

# Evaluate the model using MulticlassClassificationEvaluator
evaluator_svm = MulticlassClassificationEvaluator(labelCol='diabetes', metricName='weightedPrecision')
precision_svm = evaluator_svm.evaluate(df_spark_test_predictions_svm)

evaluator_svm = MulticlassClassificationEvaluator(labelCol='diabetes', metricName='weightedRecall')
recall_svm = evaluator_svm.evaluate(df_spark_test_predictions_svm)

evaluator_svm = MulticlassClassificationEvaluator(labelCol='diabetes', metricName='f1')
f1_svm = evaluator_svm.evaluate(df_spark_test_predictions_svm)

# Extract predictions and labels as RDD for confusion matrix
prediction_and_label = df_spark_test_predictions_svm.select("prediction", "diabetes").rdd

# Instantiate MulticlassMetrics
metrics_svm = MulticlassMetrics(prediction_and_label)


# Print results
print("SVM Precision:", precision_svm)
print("SVM Recall:", recall_svm)
print("SVM F1 Score:", f1_svm)



SVM Precision: 0.8839668604721829
SVM Recall: 0.8839852892009361
SVM F1 Score: 0.8839402784519845


In [49]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


# Create a temporary view for Spark SQL
df_spark_test_predictions_svm.createOrReplaceTempView("predictions")

# Use Spark SQL to calculate confusion matrix
confusion_matrix_svm = spark.sql("""
    SELECT
        SUM(CASE WHEN prediction = 1 AND diabetes = 1 THEN 1 ELSE 0 END) AS true_positive,
        SUM(CASE WHEN prediction = 0 AND diabetes = 1 THEN 1 ELSE 0 END) AS false_negative,
        SUM(CASE WHEN prediction = 1 AND diabetes = 0 THEN 1 ELSE 0 END) AS false_positive,
        SUM(CASE WHEN prediction = 0 AND diabetes = 0 THEN 1 ELSE 0 END) AS true_negative
    FROM predictions
""").collect()[0]

# Convert the result to a Python dictionary
confusion_matrix_svm_dict = confusion_matrix_svm.asDict()

# Print the confusion matrix
print("Confusion Matrix for SVM:")
print(confusion_matrix_svm_dict)


Confusion Matrix for SVM:
{'true_positive': 1231, 'false_negative': 183, 'false_positive': 164, 'true_negative': 1413}
