## Installing and Importing

In [None]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.4.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Waiting for headers] [1 InRelease 0 B/3,626 B 0%] [Co0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcont                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcont                                                                               Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Waiting for headers] [3 InRelease 12.7 kB/110 kB 12%] [Connecting to ppa.la                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:6 http://archive.ubuntu.com/

In [None]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [None]:
# Import our dependencies
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pandas as pd
import tensorflow as tf

## Reading CSV and Exploring Data

In [None]:
#  Import and read the Heart_Disease_Prediction.csv
from pyspark import SparkFiles
url = "https://raw.githubusercontent.com/AleidvdZ/Project4HeartDisease/main/Heart_Disease_Prediction.csv"
spark.sparkContext.addFile(url)
hd_df = spark.read.csv(SparkFiles.get("Heart_Disease_Prediction.csv"), sep=",", header=True)

# Show DataFrame
hd_df.show()

+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|index|Age|Sex|Chest pain type| BP|Cholesterol|FBS over 120|EKG results|Max HR|Exercise angina|ST depression|Slope of ST|Number of vessels fluro|Thallium|Heart Disease|
+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|    0| 70|  1|              4|130|        322|           0|          2|   109|              0|          2.4|          2|                      3|       3|     Presence|
|    1| 67|  0|              3|115|        564|           0|          2|   160|              0|          1.6|          2|                      0|       7|      Absence|
|    2| 57|  1|              2|124|        261|           0|          0|   141|              0|          0.3|          1|                      0|       7| 

In [None]:
hd_df = hd_df.withColumnRenamed('Chest pain type', 'Chest_pain_type')
hd_df = hd_df.withColumnRenamed('FBS over 120', 'FBS_over_120')
hd_df = hd_df.withColumnRenamed('EKG Results', 'EKG_Results')
hd_df = hd_df.withColumnRenamed('Max HR', 'Max_HR')
hd_df = hd_df.withColumnRenamed('Exercise angina', 'Exercise_angina')
hd_df = hd_df.withColumnRenamed('ST depression', 'ST_depression')
hd_df = hd_df.withColumnRenamed('Slope of ST', 'Slope_of_ST')
hd_df = hd_df.withColumnRenamed('Number of vessels fluro', 'Number_of_vessels_fluro')
hd_df = hd_df.withColumnRenamed('Heart Disease', 'Heart_Disease')
hd_df.show()

+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|index|Age|Sex|Chest_pain_type| BP|Cholesterol|FBS_over_120|EKG_Results|Max_HR|Exercise_angina|ST_depression|Slope_of_ST|Number_of_vessels_fluro|Thallium|Heart_Disease|
+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|    0| 70|  1|              4|130|        322|           0|          2|   109|              0|          2.4|          2|                      3|       3|     Presence|
|    1| 67|  0|              3|115|        564|           0|          2|   160|              0|          1.6|          2|                      0|       7|      Absence|
|    2| 57|  1|              2|124|        261|           0|          0|   141|              0|          0.3|          1|                      0|       7| 

In [None]:
from pyspark.sql.functions import when, col
hd_df = hd_df.withColumn("Heart_Disease", when(col("Heart_Disease") == "Presence", 1).when(col("Heart_Disease") == "Absence", 0).otherwise(col("Heart_Disease")))

# Display the DataFrame
hd_df.show()

+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|index|Age|Sex|Chest_pain_type| BP|Cholesterol|FBS_over_120|EKG_Results|Max_HR|Exercise_angina|ST_depression|Slope_of_ST|Number_of_vessels_fluro|Thallium|Heart_Disease|
+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|    0| 70|  1|              4|130|        322|           0|          2|   109|              0|          2.4|          2|                      3|       3|            1|
|    1| 67|  0|              3|115|        564|           0|          2|   160|              0|          1.6|          2|                      0|       7|            0|
|    2| 57|  1|              2|124|        261|           0|          0|   141|              0|          0.3|          1|                      0|       7| 

In [None]:
from pyspark.sql.functions import col
for col_name in hd_df.columns:
    hd_df = hd_df.withColumn(col_name, col(col_name).cast('integer'))

# Display the DataFrame
hd_df.show()

+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|index|Age|Sex|Chest_pain_type| BP|Cholesterol|FBS_over_120|EKG_Results|Max_HR|Exercise_angina|ST_depression|Slope_of_ST|Number_of_vessels_fluro|Thallium|Heart_Disease|
+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|    0| 70|  1|              4|130|        322|           0|          2|   109|              0|            2|          2|                      3|       3|            1|
|    1| 67|  0|              3|115|        564|           0|          2|   160|              0|            1|          2|                      0|       7|            0|
|    2| 57|  1|              2|124|        261|           0|          0|   141|              0|            0|          1|                      0|       7| 

In [None]:
# Show schema
hd_df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Sex: integer (nullable = true)
 |-- Chest_pain_type: integer (nullable = true)
 |-- BP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FBS_over_120: integer (nullable = true)
 |-- EKG_Results: integer (nullable = true)
 |-- Max_HR: integer (nullable = true)
 |-- Exercise_angina: integer (nullable = true)
 |-- ST_depression: integer (nullable = true)
 |-- Slope_of_ST: integer (nullable = true)
 |-- Number_of_vessels_fluro: integer (nullable = true)
 |-- Thallium: integer (nullable = true)
 |-- Heart_Disease: integer (nullable = true)



In [None]:
# Create a temporary view of the DataFrame.
hd_df.createOrReplaceTempView('data')

# Look at data using SparkSQL
spark.sql("select * from data limit 10").show()

+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|index|Age|Sex|Chest_pain_type| BP|Cholesterol|FBS_over_120|EKG_Results|Max_HR|Exercise_angina|ST_depression|Slope_of_ST|Number_of_vessels_fluro|Thallium|Heart_Disease|
+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|    0| 70|  1|              4|130|        322|           0|          2|   109|              0|            2|          2|                      3|       3|            1|
|    1| 67|  0|              3|115|        564|           0|          2|   160|              0|            1|          2|                      0|       7|            0|
|    2| 57|  1|              2|124|        261|           0|          0|   141|              0|            0|          1|                      0|       7| 

To analyze:

*  Total number of indiviuals
*  Number/percent of individuals of each sex
*  Mean/Median/Mode of ages by sex
*  % of male/female with heart disease

In [None]:
# Total number of indiviuals
spark.sql("""
  SELECT COUNT(index) AS Number_of_Indiv
  FROM data
    """).show()

+---------------+
|Number_of_Indiv|
+---------------+
|            270|
+---------------+



In [None]:
# Mean/Median/Mode of ages (by sex)
spark.sql("""
  SELECT
    MIN(Age) AS min_value,
    MAX(Age) AS max_value,
    ROUND(AVG(Age),1) AS mean_value,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY Age) AS median_value,
    ROUND(STDDEV(Age),1) AS std_deviation,
    COUNT(Age) AS count
FROM data;
    """).show()

+---------+---------+----------+------------+-------------+-----+
|min_value|max_value|mean_value|median_value|std_deviation|count|
+---------+---------+----------+------------+-------------+-----+
|       29|       77|      54.4|        55.0|          9.1|  270|
+---------+---------+----------+------------+-------------+-----+



In [None]:
# Percent of individuals of each sex (0 = female, 1 = male)
spark.sql("""
  SELECT Sex,
  COUNT(*) AS Frequency
  FROM data
  GROUP BY Sex
  ORDER BY Frequency DESC
  """).show()


  # SELECT Sex
  #   (COUNT(*) / (SELECT COUNT(*) FROM data)) * 100 AS Percentage
  #   FROM data
  #   GROUP BY Sex
  #   ORDER BY Percentage DESC


+---+---------+
|Sex|Frequency|
+---+---------+
|  1|      183|
|  0|       87|
+---+---------+



## Preprocessing Data


Scaling: Age, BP, Cholesterol, MaxHR, StDep  
Dummy (more than 2 categories): Chest Pain, EKG Result, Slope of ST, Thalium  

In [None]:
#convert to pandas
pandas_df = hd_df.toPandas()
pandas_df.head()

Unnamed: 0,index,Age,Sex,Chest_pain_type,BP,Cholesterol,FBS_over_120,EKG_Results,Max_HR,Exercise_angina,ST_depression,Slope_of_ST,Number_of_vessels_fluro,Thallium,Heart_Disease
0,0,70,1,4,130,322,0,2,109,0,2,2,3,3,1
1,1,67,0,3,115,564,0,2,160,0,1,2,0,7,0
2,2,57,1,2,124,261,0,0,141,0,0,1,0,7,1
3,3,64,1,4,128,263,0,0,105,1,0,2,1,7,0
4,4,74,0,2,120,269,0,2,121,1,0,1,1,3,0


In [None]:
# Convert categorical data to numeric with `pd.get_dummies`
dummy_cols = ['Chest_pain_type','EKG_Results','Slope_of_ST','Thallium']
pandas_df = pd.get_dummies(pandas_df, columns=dummy_cols)
pandas_df.head()

Unnamed: 0,index,Age,Sex,BP,Cholesterol,FBS_over_120,Max_HR,Exercise_angina,ST_depression,Number_of_vessels_fluro,...,Chest_pain_type_4,EKG_Results_0,EKG_Results_1,EKG_Results_2,Slope_of_ST_1,Slope_of_ST_2,Slope_of_ST_3,Thallium_3,Thallium_6,Thallium_7
0,0,70,1,130,322,0,109,0,2,3,...,1,0,0,1,0,1,0,1,0,0
1,1,67,0,115,564,0,160,0,1,0,...,0,0,0,1,0,1,0,0,0,1
2,2,57,1,124,261,0,141,0,0,0,...,0,1,0,0,1,0,0,0,0,1
3,3,64,1,128,263,0,105,1,0,1,...,1,1,0,0,0,1,0,0,0,1
4,4,74,0,120,269,0,121,1,0,1,...,0,0,0,1,1,0,0,1,0,0


## Machine Learning Model - Neural Network

In [None]:
#%%python
# Remove Heart Disease target from features data (NOTE: NEED TO UPDATE COLUMNS)
y = pandas_df['Heart_Disease'].values
X = pandas_df.drop(columns=["Heart_Disease"]).values

# Split training/test datasets
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=1, stratify=y)

In [None]:
# Create a StandardScaler instances
scaler = StandardScaler()

# Fit the StandardScaler
X_scaler = scaler.fit(X_train)

# Scale the data
X_train_scaled = X_scaler.transform(X_train)
X_test_scaled = X_scaler.transform(X_test)

In [None]:
# Define the model - deep neural net, i.e., the number of input features and hidden nodes for each layer.
number_input_features = len(X_train[0])
hidden_nodes_layer1 =  8
hidden_nodes_layer2 = 3

nn = tf.keras.models.Sequential()

# First hidden layer
nn.add(
    tf.keras.layers.Dense(units=hidden_nodes_layer1, input_dim=number_input_features, activation="relu")
)

# Second hidden layer
nn.add(tf.keras.layers.Dense(units=hidden_nodes_layer2, activation="relu"))

# Output layer
nn.add(tf.keras.layers.Dense(units=1, activation="sigmoid"))

# Check the structure of the model
nn.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_3 (Dense)             (None, 8)                 192       
                                                                 
 dense_4 (Dense)             (None, 3)                 27        
                                                                 
 dense_5 (Dense)             (None, 1)                 4         
                                                                 
Total params: 223 (892.00 Byte)
Trainable params: 223 (892.00 Byte)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
# Compile the model
nn.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

In [None]:
# Train the model
fit_model = nn.fit(X_train_scaled,y_train,epochs=100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [None]:
# Evaluate the model using the test data
model_loss, model_accuracy = nn.evaluate(X_test_scaled,y_test,verbose=2)
print(f"Loss: {model_loss}, Accuracy: {model_accuracy}")

3/3 - 0s - loss: 0.4371 - accuracy: 0.8824 - 327ms/epoch - 109ms/step
Loss: 0.43711617588996887, Accuracy: 0.8823529481887817
