<a href="https://colab.research.google.com/github/Geeth-Rath/HeartDiseasesPredictionModel/blob/main/untitled2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col, desc, when

# Create Spark session
spark = SparkSession.builder.appName("HeartDiseaseAnalysis").getOrCreate()

# Define the schema for the dataset
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("sex", IntegerType(), True),
    StructField("chest_pain_type", IntegerType(), True),
    StructField("resting_blood_pressure", IntegerType(), True),
    StructField("serum_cholesterol", IntegerType(), True),
    StructField("fasting_blood_sugar", IntegerType(), True),
    StructField("resting_electrocardiographic_results", IntegerType(), True),
    StructField("max_heart_rate_achieved", IntegerType(), True),
    StructField("exercise_induced_angina", IntegerType(), True),
    StructField("oldpeak", IntegerType(), True),
    StructField("slope", IntegerType(), True),
    StructField("num_major_vessels", IntegerType(), True),
    StructField("thal", IntegerType(), True),
    StructField("target", IntegerType(), True)
])

# Load the CSV file into a Spark DataFrame
df = spark.read.csv("/content/heart.csv", header=True, schema=schema)

# # Load the CSV file 
# df = spark.read.csv("/content/heart.csv", header=True, inferSchema=True )

# Show the first 5 rows of the DataFrame
df.show(truncate=False)

+---+---+---------------+----------------------+-----------------+-------------------+------------------------------------+-----------------------+-----------------------+-------+-----+-----------------+----+------+
|age|sex|chest_pain_type|resting_blood_pressure|serum_cholesterol|fasting_blood_sugar|resting_electrocardiographic_results|max_heart_rate_achieved|exercise_induced_angina|oldpeak|slope|num_major_vessels|thal|target|
+---+---+---------------+----------------------+-----------------+-------------------+------------------------------------+-----------------------+-----------------------+-------+-----+-----------------+----+------+
|52 |1  |0              |125                   |212              |0                  |1                                   |168                    |0                      |1      |2    |2                |3   |0     |
|53 |1  |0              |140                   |203              |1                  |0                                   |155          

In [None]:
# get attribute schema
df.printSchema()

check null values

In [None]:
from pyspark.sql.functions import col, sum

# Assuming the DataFrame is named "df"
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Display the null counts
null_counts.show()


+---+---+---------------+----------------------+-----------------+-------------------+------------------------------------+-----------------------+-----------------------+-------+-----+-----------------+----+------+
|age|sex|chest_pain_type|resting_blood_pressure|serum_cholesterol|fasting_blood_sugar|resting_electrocardiographic_results|max_heart_rate_achieved|exercise_induced_angina|oldpeak|slope|num_major_vessels|thal|target|
+---+---+---------------+----------------------+-----------------+-------------------+------------------------------------+-----------------------+-----------------------+-------+-----+-----------------+----+------+
|  0|  0|              0|                     0|                0|                  0|                                   0|                      0|                      0|    584|    0|                0|   0|     0|
+---+---+---------------+----------------------+-----------------+-------------------+------------------------------------+-------------

In [None]:
#drop oldpeak colum as it has 584 null values
df = df.drop("oldpeak")
df.show()

+---+---+---------------+----------------------+-----------------+-------------------+------------------------------------+-----------------------+-----------------------+-----+-----------------+----+------+
|age|sex|chest_pain_type|resting_blood_pressure|serum_cholesterol|fasting_blood_sugar|resting_electrocardiographic_results|max_heart_rate_achieved|exercise_induced_angina|slope|num_major_vessels|thal|target|
+---+---+---------------+----------------------+-----------------+-------------------+------------------------------------+-----------------------+-----------------------+-----+-----------------+----+------+
| 52|  1|              0|                   125|              212|                  0|                                   1|                    168|                      0|    2|                2|   3|     0|
| 53|  1|              0|                   140|              203|                  1|                                   0|                    155|                     

 check duplicates

In [None]:
from pyspark.sql.functions import col

# Assuming that the DataFrame is already loaded and named as 'df'
num_duplicates = df.count() - df.dropDuplicates().count()
num_duplicates
print("Number of duplicates in the DataFrame: ", num_duplicates)


# Remove duplicates
df.distinct()
df.show(truncate=False)

Number of duplicates in the DataFrame:  723
+---+---+---------------+----------------------+-----------------+-------------------+------------------------------------+-----------------------+-----------------------+-----+-----------------+----+------+
|age|sex|chest_pain_type|resting_blood_pressure|serum_cholesterol|fasting_blood_sugar|resting_electrocardiographic_results|max_heart_rate_achieved|exercise_induced_angina|slope|num_major_vessels|thal|target|
+---+---+---------------+----------------------+-----------------+-------------------+------------------------------------+-----------------------+-----------------------+-----+-----------------+----+------+
|52 |1  |0              |125                   |212              |0                  |1                                   |168                    |0                      |2    |2                |3   |0     |
|53 |1  |0              |140                   |203              |1                  |0                                   |1

check outliers

In [None]:
from pyspark.sql.functions import col, expr
import matplotlib.pyplot as plt

# Define the columns with numerical values
num_cols = ['age', 'resting_blood_pressure', 'serum_cholesterol', 'max_heart_rate_achieved', 'num_major_vessels']

# Iterate through each column and calculate the IQR range and identify outliers
for c in num_cols:
    # Calculate the IQR range for each column
    Q1 = df.approxQuantile(c, [0.25], 0.05)[0]
    Q3 = df.approxQuantile(c, [0.75], 0.05)[0]
    IQR = Q3 - Q1

    # Define the lower and upper bounds for outliers
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    # print("lower_bound", lower_bound)
    # print("upper_bound", upper_bound)

    # Identify the outliers in the column
    outliers = df.filter((col(c) < lower_bound) | (col(c) > upper_bound))

    # Print the outliers for each column
    # outliers.show()
    df.select(c).show()


In [None]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType

# Define a function to calculate lower and upper bounds for outlier detection
def outlier_bounds(col_name):
    q1, q3 = df.approxQuantile(col_name, [0.25, 0.75], 0)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    return lower_bound, upper_bound

# Define the list of numerical columns
num_cols = ['age', 'resting_blood_pressure', 'serum_cholesterol', 'max_heart_rate_achieved', 'num_major_vessels']

# Calculate lower and upper bounds for each numerical column
bounds = [outlier_bounds(c) for c in num_cols]


# Define a function to check for outliers in a column and replace them with the median
def replace_outliers(col_name):
    lower_bound, upper_bound = outlier_bounds(col_name)
    return when((col(col_name) < lower_bound) | (col(col_name) > upper_bound), None).otherwise(col(col_name))

# Apply the outlier replacement function to each numerical column
df = df.select([replace_outliers(c).alias(c) if c in num_cols else col(c) for c in df.columns])



# Cast numerical columns to integer type
for col_name in num_cols:
    df = df.withColumn(col_name, col(col_name).cast(IntegerType()))

# Show the resulting dataset
df.show()


In [None]:
df

DataFrame[age: int, sex: int, chest_pain_type: int, resting_blood_pressure: int, serum_cholesterol: int, fasting_blood_sugar: int, resting_electrocardiographic_results: int, max_heart_rate_achieved: int, exercise_induced_angina: int, slope: int, num_major_vessels: int, thal: int, target: int]

use map reduce

In [None]:
# Define map function to extract the heart disease status
def extract_status(row):
    status = row["target"]
    return ("heart disease", status)

# Define reduce function to count the number of patients with heart disease
def count_status(a, b):
    return a + b

# MapReduce
status_count = df.rdd.map(extract_status).reduceByKey(count_status)

# Output the results
for status, count in status_count.collect():
    print(status + " patients count: " + str(count))

# Stop Spark context
spark.stop()


heart disease patients count: 526


In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col, desc, when, avg

# Filter the DataFrame by gender and calculate the average cholesterol level
male_avg_cholesterol = df.filter(col("sex") == 1).agg(avg(col("serum_cholesterol")).alias("male_avg_cholesterol")).collect()[0]["male_avg_cholesterol"]
female_avg_cholesterol = df.filter(col("sex") == 0).agg(avg(col("serum_cholesterol")).alias("female_avg_cholesterol")).collect()[0]["female_avg_cholesterol"]

# Print the results
print("Average cholesterol level for male patients: {:.2f}".format(male_avg_cholesterol))
print("Average cholesterol level for female patients: {:.2f}".format(female_avg_cholesterol))

Average cholesterol level for male patients: 239.24
Average cholesterol level for female patients: 261.46


In [None]:
# Map function to extract the chest pain type and count as 1
def extract_chest_pain_type(row):
    chest_pain_type = row["chest_pain_type"]
    return (chest_pain_type, 1)

# Reduce function to count the number of occurrences of each chest pain type
def count_chest_pain_type(count1, count2):
    return count1 + count2

# Map-reduce operation to find the most common chest pain type
chest_pain_count = df.rdd.map(extract_chest_pain_type).reduceByKey(count_chest_pain_type)

most_common_chest_pain = chest_pain_count.sortBy(lambda x: x[1], ascending=False).first()

print("The most common chest pain type is:", most_common_chest_pain[0])

The most common chest pain type is: 0


Prediction model

In [None]:
# Import necessary libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Split the dataset into training and testing sets
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=1234)

# Define the feature columns
featureCols = ['age', 'sex', 'chest_pain_type', 'resting_blood_pressure', 'serum_cholesterol', 'fasting_blood_sugar', 'resting_electrocardiographic_results', 'max_heart_rate_achieved', 'exercise_induced_angina', 'slope', 'num_major_vessels', 'thal']


# Vectorize the features
assembler = VectorAssembler(inputCols=featureCols, outputCol='features')
trainingData = assembler.transform(trainingData)
testData = assembler.transform(testData)

# Create a logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='target', maxIter=10)

# Define the hyperparameters to tune
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Define the evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='target')

# Define the cross validator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Train the model
cvModel = cv.fit(trainingData)

# Make predictions on the test data
predictions = cvModel.transform(testData)

# Evaluate the model
accuracy = evaluator.evaluate(predictions)
print('Accuracy: {}'.format(accuracy))

Accuracy: 0.8790361658076742
