# Importing Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import matplotlib.pyplot as plt

# Reading data

In [12]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Heart Disease Analysis") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Loading the dataset
df = spark.read.csv("heart_2022_no_nans.csv", header=True, inferSchema=True)

# Showing the first few rows and columns of the dataframe to understand the structure
df.show(5)

+-------+------+-------------+------------------+----------------+--------------------+------------------+----------+--------------------+--------------+---------+---------+---------+-------------+-------+---------------------+----------------+------------+-----------+-------------------+-----------------------+-----------------------+-----------------+-------------------------+-----------------+-------------+--------------------+---------+---------------------+---------------+--------------+-----------------+-----+---------------+----------+------------+-------------+--------------------+----------------+--------+
|  State|   Sex|GeneralHealth|PhysicalHealthDays|MentalHealthDays|     LastCheckupTime|PhysicalActivities|SleepHours|        RemovedTeeth|HadHeartAttack|HadAngina|HadStroke|HadAsthma|HadSkinCancer|HadCOPD|HadDepressiveDisorder|HadKidneyDisease|HadArthritis|HadDiabetes|DeafOrHardOfHearing|BlindOrVisionDifficulty|DifficultyConcentrating|DifficultyWalking|DifficultyDressingBath

# Analytical queries

Query 1: Average age of patients who have heart disease versus patients who do not have heart disease

Query 2: Percentage of Smoker with Heart Desease

Query 3: Percentage of alcohol consumers who have heart disease

Query 4: BMI Difference of Patients with and without Heart Disease

Query 5: Average Sleeping Time Difference of Patients with and without Heart Disease

Query 6: Percentage of Patients with Heart Disease Who Partake in Physical Activity

In [13]:
# Function to map AgeCategory to Numeric Values
def map_age_category(df):
    df = df.withColumn('AgeNumeric', 
        when(col('AgeCategory') == 'Age 18 to 24', 21)
        .when(col('AgeCategory') == 'Age 25 to 29', 27)
        .when(col('AgeCategory') == 'Age 30 to 34', 32)
        .when(col('AgeCategory') == 'Age 35 to 39', 37)
        .when(col('AgeCategory') == 'Age 40 to 44', 42)
        .when(col('AgeCategory') == 'Age 45 to 49', 47)
        .when(col('AgeCategory') == 'Age 50 to 54', 52)
        .when(col('AgeCategory') == 'Age 55 to 59', 57)
        .when(col('AgeCategory') == 'Age 60 to 64', 62)
        .when(col('AgeCategory') == 'Age 65 to 69', 67)
        .when(col('AgeCategory') == 'Age 70 to 74', 72)
        .when(col('AgeCategory') == 'Age 75 to 79', 77)
        .when(col('AgeCategory') == 'Age 80 or older', 82)
        .otherwise(None))
    return df

# Apply age mapping
df = map_age_category(df)

# Analytical Queries
def analytical_queries(df):
    avg_age_with_hd = df.filter(df['HadHeartAttack'] == 'Yes').select('AgeNumeric').groupBy().avg().collect()[0][0]
    avg_age_without_hd = df.filter(df['HadHeartAttack'] == 'No').select('AgeNumeric').groupBy().avg().collect()[0][0]
    print(f"Average age of patients with heart disease: {avg_age_with_hd}")
    print(f"Average age of patients without heart disease: {avg_age_without_hd}\n")
    
    total_smokers = df.filter(col('SmokerStatus').startswith('Current smoker')).count()
    smokers_with_hd = df.filter((df['HadHeartAttack'] == 'Yes') & (col('SmokerStatus').startswith('Current smoker'))).count()
    percentage_smokers_hd = (smokers_with_hd / total_smokers) * 100
    print(f"Percentage of smokers who have heart disease: {percentage_smokers_hd:.2f}%\n")
    
    total_hd_patients = df.filter(df['HadHeartAttack'] == 'Yes').count()
    alcohol_consumers_hd = df.filter((df['HadHeartAttack'] == 'Yes') & (df['AlcoholDrinkers'] == 'Yes')).count()
    percentage_alcohol_hd = (alcohol_consumers_hd / total_hd_patients) * 100
    print(f"Percentage of alcohol consumers who have heart disease: {percentage_alcohol_hd:.2f}%\n")
    
    avg_bmi_heart_disease = df.filter(df['HadHeartAttack'] == 'Yes').agg(avg("BMI")).collect()[0][0]
    avg_bmi_no_heart_disease = df.filter(df['HadHeartAttack'] == 'No').agg(avg("BMI")).collect()[0][0]
    print(f"Average BMI for patients with heart disease: {avg_bmi_heart_disease}")
    print(f"Average BMI for patients without heart disease: {avg_bmi_no_heart_disease}\n")
    
    avg_sleep_heart_disease = df.filter(df['HadHeartAttack'] == 'Yes').agg(avg("SleepHours")).collect()[0][0]
    avg_sleep_no_heart_disease = df.filter(df['HadHeartAttack'] == 'No').agg(avg("SleepHours")).collect()[0][0]
    print(f"Average sleeping time for patients with heart disease: {avg_sleep_heart_disease}")
    print(f"Average sleeping time for patients without heart disease: {avg_sleep_no_heart_disease}\n")
    
    active_heart_disease = df.filter((df['HadHeartAttack'] == 'Yes') & (df['PhysicalActivities'] == 'Yes')).count()
    percentage_active_heart_disease = (active_heart_disease / total_hd_patients) * 100
    print(f"Percentage of patients with heart disease who partake in physical activity: {percentage_active_heart_disease:.2f}%\n")
    
    # Return the calculated values for use in the ML model function
    return avg_age_with_hd, avg_age_without_hd, percentage_smokers_hd, percentage_alcohol_hd, avg_bmi_heart_disease, avg_bmi_no_heart_disease, avg_sleep_heart_disease, avg_sleep_no_heart_disease, percentage_active_heart_disease

# Perform analytical queries
avg_age_with_hd, avg_age_without_hd, percentage_smokers_hd, percentage_alcohol_hd, avg_bmi_heart_disease, avg_bmi_no_heart_disease, avg_sleep_heart_disease, avg_sleep_no_heart_disease, percentage_active_heart_disease = analytical_queries(df)


Average age of patients with heart disease: 67.99962783773725
Average age of patients without heart disease: 54.81721678339718

Percentage of smokers who have heart disease: 7.91%

Percentage of alcohol consumers who have heart disease: 39.64%

Average BMI for patients with heart disease: 29.492435429847426
Average BMI for patients without heart disease: 28.620521482284047

Average sleeping time for patients with heart disease: 7.043096390026052
Average sleeping time for patients without heart disease: 7.020074208790689

Percentage of patients with heart disease who partake in physical activity: 63.37%



# Machine learning model

In [None]:
# Convert 'HadHeartAttack' to numerical
df = df.withColumn('HadHeartAttackNumeric', when(col('HadHeartAttack') == 'Yes', 1).otherwise(0))

# Identify categorical columns
categorical_cols = [col for col, dtype in df.dtypes if dtype == 'string' and col != 'HadHeartAttack']

# Index and encode categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_vec") for col in categorical_cols]

# Apply indexers and encoders
for indexer in indexers:
    df = indexer.fit(df).transform(df)
for encoder in encoders:
    df = encoder.fit(df).transform(df)

# List of all columns excluding the target and original categorical columns
feature_cols = [col for col in df.columns if col not in categorical_cols + ['HadHeartAttack', 'HadHeartAttackNumeric'] and not col.endswith("_index")]

# Assemble all features into a feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Select final dataset with features and label
final_df = df.select("features", "HadHeartAttackNumeric")

# Split the data into training and test sets
train_df, test_df = final_df.randomSplit([0.7, 0.3], seed=42)

# Initialize and train the logistic regression model
lr = LogisticRegression(labelCol="HadHeartAttackNumeric")
lr_model = lr.fit(train_df)

# Make predictions on the test set
predictions = lr_model.transform(test_df)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="HadHeartAttackNumeric", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)

# Calculate accuracy, sensitivity, and specificity
predictions = predictions.withColumn("prediction", when(col("prediction") >= 0.5, 1).otherwise(0))
tp = predictions.filter((predictions['prediction'] == 1) & (predictions['HadHeartAttackNumeric'] == 1)).count()
tn = predictions.filter((predictions['prediction'] == 0) & (predictions['HadHeartAttackNumeric'] == 0)).count()
fp = predictions.filter((predictions['prediction'] == 1) & (predictions['HadHeartAttackNumeric'] == 0)).count()
fn = predictions.filter((predictions['prediction'] == 0) & (predictions['HadHeartAttackNumeric'] == 1)).count()

accuracy = (tp + tn) / (tp + tn + fp + fn)
sensitivity = tp / (tp + fn) if (tp + fn) != 0 else 0
specificity = tn / (tn + fp) if (tn + fp) != 0 else 0

# Prepare results DataFrame
results = spark.createDataFrame([
    ("AvgAgeWithHD", avg_age_with_hd),
    ("AvgAgeWithoutHD", avg_age_without_hd),
    ("PercentageSmokersHD", percentage_smokers_hd),
    ("PercentageAlcoholHD", percentage_alcohol_hd),
    ("AvgBMIWithHD", avg_bmi_heart_disease),
    ("AvgBMINoHD", avg_bmi_no_heart_disease),
    ("AvgSleepWithHD", avg_sleep_heart_disease),
    ("AvgSleepNoHD", avg_sleep_no_heart_disease),
    ("PercentageActiveHD", percentage_active_heart_disease),
    ("ROC_AUC", roc_auc),
    ("Accuracy", accuracy),
    ("Sensitivity", sensitivity),
    ("Specificity", specificity)
], ["Metric", "Value"])

# Extract coefficients
coefficients = lr_model.coefficients
intercept = lr_model.intercept

# Combine coefficients with feature names
coef_df = spark.createDataFrame([(feature, float(coef)) for feature, coef in zip(feature_cols, coefficients)], ["Feature", "Coefficient"])

# Convert Spark DataFrame to Pandas DataFrame
coef_pd_df = coef_df.toPandas()

# Calculate the absolute values of the coefficients
coef_pd_df['abs_coeff'] = coef_pd_df['Coefficient'].abs()

# Sort by absolute coefficient values to get the top 10 features
top_10_features = coef_pd_df.sort_values(by='abs_coeff', ascending=False).head(10)

# Plot the top 10 features
plt.figure(figsize=(10, 6))
plt.barh(top_10_features['Feature'], top_10_features['Coefficient'], color='skyblue')
plt.xlabel('Coefficient')
plt.title('Top 10 Most Important Variables')
plt.gca().invert_yaxis()  # Invert y-axis to have the largest coefficient on top
plt.show()

results.show()
coef_df.show()