In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.0'
spark_version = 'spark-3.5.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
# Import packages
import itertools
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
url = "https://ale-work-123.s3.us-west-1.amazonaws.com/heart_statlog_cleveland_hungary_final_clean.csv"
spark.sparkContext.addFile(url)

schema = StructType(
    [
        StructField("age", IntegerType(), False),
        StructField("sex", IntegerType(), False),
        StructField("chest_pain_type", IntegerType(), False),
        StructField("resting_bp_s", IntegerType(), False),
        StructField("cholesterol", IntegerType(), False),
        StructField("fasting_blood_sugar", IntegerType(), False),
        StructField("resting_ecg", IntegerType(), False),
        StructField("max_heart_rate", IntegerType(), False),
        StructField("exercise_angina", IntegerType(), False),
        StructField("oldpeak", FloatType(), False),
        StructField("ST_slope", IntegerType(), False),
        StructField("target", IntegerType(), False),
    ]
)

df = spark.read.csv(SparkFiles.get("heart_statlog_cleveland_hungary_final_clean.csv"), sep=",", header=True, schema=schema)

df.show()

+---+---+---------------+------------+-----------+-------------------+-----------+--------------+---------------+-------+--------+------+
|age|sex|chest_pain_type|resting_bp_s|cholesterol|fasting_blood_sugar|resting_ecg|max_heart_rate|exercise_angina|oldpeak|ST_slope|target|
+---+---+---------------+------------+-----------+-------------------+-----------+--------------+---------------+-------+--------+------+
| 40|  1|              2|         140|        289|                  0|          0|           172|              0|    0.0|       1|     0|
| 49|  0|              3|         160|        180|                  0|          0|           156|              0|    1.0|       2|     1|
| 37|  1|              2|         130|        283|                  0|          1|            98|              0|    0.0|       1|     0|
| 48|  0|              4|         138|        214|                  0|          0|           108|              1|    1.5|       2|     1|
| 54|  1|              3|         

In [4]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- chest_pain_type: integer (nullable = true)
 |-- resting_bp_s: integer (nullable = true)
 |-- cholesterol: integer (nullable = true)
 |-- fasting_blood_sugar: integer (nullable = true)
 |-- resting_ecg: integer (nullable = true)
 |-- max_heart_rate: integer (nullable = true)
 |-- exercise_angina: integer (nullable = true)
 |-- oldpeak: float (nullable = true)
 |-- ST_slope: integer (nullable = true)
 |-- target: integer (nullable = true)



In [7]:
feature_cols = df.columns[:-1]

def iter_col_combos():
    for i in range(1, len(feature_cols)):
        for combo in itertools.combinations(feature_cols, i):
            yield combo

In [8]:
df = df.withColumnRenamed('target', 'label')
results = []

for combo in iter_col_combos():
    assembler = VectorAssembler(inputCols=combo, outputCol='features')
    ml_df = assembler.transform(df)

    train_data, test_data = ml_df.randomSplit([0.8, 0.2], seed=42)

    logistic_regression = LogisticRegression(featuresCol="features", labelCol="label")
    model = logistic_regression.fit(train_data)

    predictions = model.transform(test_data)

    # Accuracy, Precision, and Recall
    multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})

    print(f"Features: {combo}\nAccuracy: {accuracy:.4f}; Precision: {precision:.4f}; Recall: {recall:.4f}\n--------")
    results.append(
        {
            "features": combo,
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall
        }
    )



In [17]:
for result in sorted(results, key=lambda x: x["accuracy"], reverse=True)[:3]:
    print(f"Features: {result['features']}\nAccuracy: {result['accuracy']}; Precision: {result['precision']}; Recall: {result['recall']}\n--------")

Features: ('sex', 'chest_pain_type', 'cholesterol', 'fasting_blood_sugar', 'exercise_angina', 'ST_slope')
Accuracy: 0.8590604026845637; Precision: 0.8591489047864888; Recall: 0.8590604026845639
--------
Features: ('age', 'chest_pain_type', 'resting_bp_s', 'cholesterol', 'fasting_blood_sugar', 'exercise_angina', 'ST_slope')
Accuracy: 0.8590604026845637; Precision: 0.8611433401014212; Recall: 0.8590604026845637
--------
Features: ('sex', 'chest_pain_type', 'cholesterol', 'fasting_blood_sugar', 'resting_ecg', 'exercise_angina', 'ST_slope')
Accuracy: 0.8590604026845637; Precision: 0.8591489047864888; Recall: 0.8590604026845639
--------


In [16]:
feature_set = ['age', 'chest_pain_type', 'resting_bp_s', 'cholesterol', 'fasting_blood_sugar', 'exercise_angina', 'ST_slope']

assembler = VectorAssembler(inputCols=feature_set, outputCol='features')
ml_df = assembler.transform(df.filter("cholesterol > 0"))

train_data, test_data = ml_df.randomSplit([0.8, 0.2], seed=42)

logistic_regression = LogisticRegression(featuresCol="features", labelCol="label")
model = logistic_regression.fit(train_data)

predictions = model.transform(test_data)

# Accuracy, Precision, and Recall
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})

print(f"Features: {feature_set}\nAccuracy: {accuracy:.4f}; Precision: {precision:.4f}; Recall: {recall:.4f}\n--------")

Features: ['age', 'chest_pain_type', 'resting_bp_s', 'cholesterol', 'fasting_blood_sugar', 'exercise_angina', 'ST_slope']
Accuracy: 0.7667; Precision: 0.7721; Recall: 0.7667
--------


# Analysis

The model for predicting heart disease based upon the acquired data is about 86% accurate. This isn't too bad, overall, though I don't think this model would be acceptable for general use, as a recall of 86% means that 14% heart disease patients could go undiagnosed.

Interestingly, there are a number of patients whose cholesterol was not logged (logged data contains 0), but the non-cholesterol data provided by those patients' data is vastly more important toward the overall validity of the model than the difficulties presented by the outliers that are the zeros logged as their cholesterol levels.