In [26]:
# Set the PySpark environment variables
import os

os.environ['SPARK_HOME'] = r"C:\Users\Faisal\Downloads\spark-4.0.0-bin-hadoop3\spark-4.0.0-bin-hadoop3"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

# You set the exact location to where spark is downloaded on your laptop

In [27]:
!pip install pyspark



In [28]:
# Import PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create SparkSession and create dataframe

In [29]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Heart-Disease-Analysis") \
    .getOrCreate()

# Creates a new spark session or uses a previous one if it was created

In [30]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

# It automatically shows DataFrame outputs (like the first few rows) without needing to call .show() manually.

In [31]:
df = spark.read.csv("heart_2022_no_nans.csv", header=True, sep=",", inferSchema=True)

# Treat first row as header and infer data types of columns

# Exploratory Data Analysis

In [32]:
df.printSchema()


root
 |-- State: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- GeneralHealth: string (nullable = true)
 |-- PhysicalHealthDays: double (nullable = true)
 |-- MentalHealthDays: double (nullable = true)
 |-- LastCheckupTime: string (nullable = true)
 |-- PhysicalActivities: string (nullable = true)
 |-- SleepHours: double (nullable = true)
 |-- RemovedTeeth: string (nullable = true)
 |-- HadHeartAttack: string (nullable = true)
 |-- HadAngina: string (nullable = true)
 |-- HadStroke: string (nullable = true)
 |-- HadAsthma: string (nullable = true)
 |-- HadSkinCancer: string (nullable = true)
 |-- HadCOPD: string (nullable = true)
 |-- HadDepressiveDisorder: string (nullable = true)
 |-- HadKidneyDisease: string (nullable = true)
 |-- HadArthritis: string (nullable = true)
 |-- HadDiabetes: string (nullable = true)
 |-- DeafOrHardOfHearing: string (nullable = true)
 |-- BlindOrVisionDifficulty: string (nullable = true)
 |-- DifficultyConcentrating: string (nullable = t

In [33]:
df.show(5,0)  

# the 0 is so that no truncation happens

+-------+------+-------------+------------------+----------------+--------------------------------------------------+------------------+----------+----------------------+--------------+---------+---------+---------+-------------+-------+---------------------+----------------+------------+-----------+-------------------+-----------------------+-----------------------+-----------------+-------------------------+-----------------+-------------+-----------------------------------------+---------+------------------------+---------------+--------------+-----------------+-----+---------------+----------+------------+-------------+---------------------------------------------------------+----------------+--------+
|State  |Sex   |GeneralHealth|PhysicalHealthDays|MentalHealthDays|LastCheckupTime                                   |PhysicalActivities|SleepHours|RemovedTeeth          |HadHeartAttack|HadAngina|HadStroke|HadAsthma|HadSkinCancer|HadCOPD|HadDepressiveDisorder|HadKidneyDisease|HadArthr

In [34]:
print(f"Row count: {df.count()}")  


Row count: 246022


# Quarter of a million people in this data

In [35]:
from pyspark.sql.functions import col, when, sum, count, round, split

df = df.withColumn('RaceEthnicityCategoryShort', split(col('RaceEthnicityCategory'), ',').getItem(0))

df.groupBy('RaceEthnicityCategoryShort').agg(
    round(
        (sum(when(col('HadHeartAttack') == 'Yes', 1).otherwise(0)) / count('*') * 100),
        2
    ).alias('HeartAttackPercent')
).orderBy(col('HeartAttackPercent').desc()).show()

# Clean the data for the rate ethnicity column first by removing all the unecessary data in the column 
# Assign 1 when heart attack otherwise 0 then sum it up and find %

+--------------------------+------------------+
|RaceEthnicityCategoryShort|HeartAttackPercent|
+--------------------------+------------------+
|               Multiracial|              6.09|
|                White only|              5.77|
|           Other race only|              4.84|
|                Black only|               4.6|
|                  Hispanic|              3.81|
+--------------------------+------------------+



# Multiracial have highest % of heart attacks, Hispanic least

In [36]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, round

total_rows = df.count()

categorical_columns = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]

for col_name in categorical_columns:
    print(f"\nColumn: {col_name}")
    df.groupBy(col_name) \
      .count() \
      .withColumn('percentage', round((col('count') / total_rows) * 100, 2)) \
      .orderBy('percentage', ascending=False) \
      .select(col_name, 'percentage') \
      .show(truncate=False)



Column: State
+--------------+----------+
|State         |percentage|
+--------------+----------+
|Washington    |6.1       |
|Maryland      |3.73      |
|Minnesota     |3.72      |
|Ohio          |3.66      |
|New York      |3.63      |
|Texas         |3.01      |
|Florida       |2.97      |
|Kansas        |2.5       |
|Wisconsin     |2.49      |
|Maine         |2.44      |
|Iowa          |2.31      |
|Hawaii        |2.27      |
|Virginia      |2.26      |
|Indiana       |2.24      |
|Massachusetts |2.22      |
|Arizona       |2.22      |
|South Carolina|2.22      |
|Michigan      |2.18      |
|Utah          |2.18      |
|Colorado      |2.1       |
+--------------+----------+
only showing top 20 rows

Column: Sex
+------+----------+
|Sex   |percentage|
+------+----------+
|Female|51.95     |
|Male  |48.05     |
+------+----------+


Column: GeneralHealth
+-------------+----------+
|GeneralHealth|percentage|
+-------------+----------+
|Very good    |35.36     |
|Good         |31.46   

# Close, but more females than males in the dataset
# 4.29% of dataset had a heart attack
# Significant amount had arthritis 34.5%

In [37]:
from pyspark.sql.functions import round, col
from pyspark.sql.types import NumericType

#  numeric columns
numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]


# describe table for numeric columns
desc_df = df.select(numeric_cols).describe()

# numeric columns converted to float and rounded to 3 decimals
for c in desc_df.columns[1:]:  
    desc_df = desc_df.withColumn(c, round(col(c).cast("float"), 3))

desc_df.show(truncate=False)


+-------+------------------+----------------+----------+--------------+-----------------+--------+
|summary|PhysicalHealthDays|MentalHealthDays|SleepHours|HeightInMeters|WeightInKilograms|BMI     |
+-------+------------------+----------------+----------+--------------+-----------------+--------+
|count  |246022.0          |246022.0        |246022.0  |246022.0      |246022.0         |246022.0|
|mean   |4.119             |4.167           |7.021     |1.705         |83.615           |28.668  |
|stddev |8.406             |8.103           |1.441     |0.107         |21.323           |6.514   |
|min    |0.0               |0.0             |1.0       |0.91          |28.12            |12.02   |
|max    |30.0              |30.0            |24.0      |2.41          |292.57           |97.65   |
+-------+------------------+----------------+----------+--------------+-----------------+--------+



In [38]:
from pyspark.sql.functions import col, when, sum, count

df.groupBy('ECigaretteUsage').agg(
    (sum(when(col('HadHeartAttack') == 'Yes', 1).otherwise(0)) / count('*') * 100).alias('HeartAttackPercent')
).orderBy(col('HeartAttackPercent').desc()).show(truncate=False)


+-----------------------------------------+------------------+
|ECigaretteUsage                          |HeartAttackPercent|
+-----------------------------------------+------------------+
|Not at all (right now)                   |5.6260252766802985|
|Never used e-cigarettes in my entire life|5.560464529159303 |
|Use them some days                       |3.769900871132472 |
|Use them every day                       |2.972292191435768 |
+-----------------------------------------+------------------+



# Basically, this result shows correlation between E-cigarette users and heart attacks, but not necessarily causation

In [39]:
from pyspark.sql.functions import col, when, sum, count

df.groupBy('RemovedTeeth').agg(
    (sum(when(col('HadHeartAttack') == 'Yes', 1).otherwise(0)) / count('*') * 100).alias('HeartAttackPercent')
).orderBy(col('HeartAttackPercent').desc()).show(truncate=False)


+----------------------+------------------+
|RemovedTeeth          |HeartAttackPercent|
+----------------------+------------------+
|All                   |16.707795035563944|
|6 or more, but not all|11.579961464354529|
|1 to 5                |5.835185135605473 |
|None of them          |2.8641558757371266|
+----------------------+------------------+



# Suprising there is a link between removing teeth and heart attack risk, higher risk with more teeth removed

# Logistic regression model

In [40]:
from pyspark.sql.functions import col, when
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Convert target to numeric
label_indexer = StringIndexer(inputCol="HadHeartAttack", outputCol="label")


#get all columns except target
features = [c for c in df.columns if c != "HadHeartAttack"]

# Index categorical features
indexers = [StringIndexer(inputCol=c, outputCol=c+"_indexed") for c in features if df.select(c).dtypes[0][1] == 'string']

#combine into a feature vector
assembler = VectorAssembler(
    inputCols=[c+"_indexed" if df.select(c).dtypes[0][1] == 'string' else c for c in features],
    outputCol="features"
)

pipeline = Pipeline(stages=indexers + [label_indexer, assembler])
pipeline_model = pipeline.fit(df)
df_transformed = pipeline_model.transform(df)


# Calculate class weights
majority_class = df_transformed.filter(col("label") == 0).count()
minority_class = df_transformed.filter(col("label") == 1).count()
imbalance_ratio = majority_class / minority_class  # Weight for minority class

# Create a new column for weights
df_transformed = df_transformed.withColumn(
    "classWeight",
    when(col("label") == 1, imbalance_ratio).otherwise(1.0)  # Higher weight for minority class
)

# Split the data
train, test = df_transformed.randomSplit([0.8, 0.2], seed=42)

# Train Logistic Regression with class weighting
lr = LogisticRegression(featuresCol="features", labelCol="label", weightCol="classWeight")
lr_model = lr.fit(train)

# Predict
predictions = lr_model.transform(test)

# Evaluate
evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")


AUC: 0.8828234749439321


In [41]:
import pandas as pd

feature_names = assembler.getInputCols()
coefficients = lr_model.coefficients.toArray()

feature_importance = pd.DataFrame({
    "Feature": feature_names,
    "Coefficient": coefficients
}).sort_values("Coefficient", ascending=False)

print(feature_importance)


                               Feature  Coefficient
9                    HadAngina_indexed     2.696328
10                   HadStroke_indexed     1.105988
26                   ChestScan_indexed     0.781622
1                          Sex_indexed     0.762306
35               PneumoVaxEver_indexed     0.310559
18         DeafOrHardOfHearing_indexed     0.295473
32             AlcoholDrinkers_indexed     0.289344
15            HadKidneyDisease_indexed     0.269728
21           DifficultyWalking_indexed     0.262479
17                 HadDiabetes_indexed     0.257223
8                 RemovedTeeth_indexed     0.242579
16                HadArthritis_indexed     0.220762
19     BlindOrVisionDifficulty_indexed     0.213105
24                SmokerStatus_indexed     0.167446
13                     HadCOPD_indexed     0.165425
23           DifficultyErrands_indexed     0.160611
12               HadSkinCancer_indexed     0.148483
6           PhysicalActivities_indexed     0.130562
2           

In [43]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)
print(f"Weighted Recall: {recall}")


Weighted Recall: 0.839512733515922


In [44]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions)
print(f"Weighted Precision: {precision}")


Weighted Precision: 0.9399740926973306


# Great model scores
# The variable with the highest positive coefficient is HadAngina_indexed (2.5457), which means having angina is the strongest predictor that increases the likelihood of a heart attack in the model.
# The variable with highest negative coefficient is height (-0.6921), which means being taller is associated with a lower risk of heart attack in the dataset.

# SQL

In [18]:
# Register DataFrame as SQL temporary view
df.createOrReplaceTempView("heart_data")
spark.sql("SELECT COUNT(*) AS total_rows FROM heart_data").show()


+----------+
|total_rows|
+----------+
|    246022|
+----------+



In [19]:
spark.sql("""
    SELECT HadHeartAttack, ROUND(AVG(BMI), 2) AS avg_BMI
    FROM heart_data
    GROUP BY HadHeartAttack
""").show()


+--------------+-------+
|HadHeartAttack|avg_BMI|
+--------------+-------+
|            No|  28.62|
|           Yes|  29.49|
+--------------+-------+



# Higher average BMI for those who had a heart attack

In [20]:
spark.sql("""
    SELECT 
    TRIM(CASE 
        WHEN INSTR(SmokerStatus, '-') > 0 THEN element_at(split(SmokerStatus, '-'), 2)
        ELSE SmokerStatus
    END) AS SmokerStatus_Extracted,
    ROUND(100 * SUM(CASE WHEN HadHeartAttack = 'Yes' THEN 1 ELSE 0 END) / COUNT(*), 2) AS pct_had_heart_attack
FROM heart_data
GROUP BY SmokerStatus_Extracted
ORDER BY pct_had_heart_attack DESC

""").show()

# Clean data by removing unecessary information from the smoker status then get % of heart attack for each category

+----------------------+--------------------+
|SmokerStatus_Extracted|pct_had_heart_attack|
+----------------------+--------------------+
|  now smokes every day|                8.32|
|         Former smoker|                8.19|
|  now smokes some days|                6.83|
|          Never smoked|                 3.7|
+----------------------+--------------------+



# Every day smokers highest % of heart attack. Unfortunately even those who quit smoking have almost a similar % of heart attacks to every day smokers, and higher % than those who smoke occasionally.