In [1]:
pip install findspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
# Import findspark and initialise. 
import findspark
findspark.init()

In [4]:
# Start Spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
spark = SparkSession.builder.appName("sparkFunctions").getOrCreate()

In [5]:
# Read in data from S3 Buckets
from pyspark import SparkFiles

df = spark.read.load("Heart Disease - FOR GRAPHS.csv", format="csv", inferSchema="true", header="true")

df.show()



+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+----------+
|Age|Sex|Chest pain type| BP|Cholesterol|FBS over 120|EKG results|Max HR|Exercise angina|ST depression|Slope of ST|Number of vessels fluro|Thallium|Heart Disease|Patient ID|
+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+----------+
| 70|  1|              4|130|        322|           0|          2|   109|              0|          2.4|          2|                      3|       3|     Presence|     31065|
| 67|  0|              3|115|        564|           0|          2|   160|              0|          1.6|          2|                      0|       7|      Absence|     31606|
| 57|  1|              2|124|        261|           0|          0|   141|              0|          0.3|          1|               

In [8]:
#Gender type
df = df.withColumn('Sex', when(df['Sex'] == 1, 'Male').otherwise('Female'))

# Replace values in 'Chest pain type' column
df = df.withColumn('Chest pain type', 
                   when(df['Chest pain type'] == 1, 'typical angina')
                   .when(df['Chest pain type'] == 2, 'atypical angina')
                   .when(df['Chest pain type'] == 3, 'non-anginal pain')
                   .otherwise('asymptomatic'))

#fasting Blood Sugar level
df = df.withColumn('FBS over 120', when(df['FBS over 120'] == 1, 'True').otherwise('False'))

# Doing Exercise Yes or No
df = df.withColumn('Exercise angina', when(df['Exercise angina'] == 1, 'Yes').otherwise('No'))

# the slope of the peak exercise ST segment
df = df.withColumn('Slope of ST', 
                   when(df['Slope of ST'] == 1, 'upsloping')
                   .when(df['Slope of ST'] == 2, 'flat')
                   .otherwise('downsloping'))

# Thallium defect
df = df.withColumn('Thallium', 
                   when(df['Thallium'] == 3, 'normal')
                   .when(df['Thallium'] == 6, 'fixed defect')
                   .otherwise('reversable defect'))


df.show()


+---+------+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+-----------------+-------------+----------+
|Age|   Sex|Chest pain type| BP|Cholesterol|FBS over 120|EKG results|Max HR|Exercise angina|ST depression|Slope of ST|Number of vessels fluro|         Thallium|Heart Disease|Patient ID|
+---+------+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+-----------------+-------------+----------+
| 70|Female|   asymptomatic|130|        322|       False|          2|   109|             No|          2.4|downsloping|                      3|reversable defect|     Presence|     31065|
| 67|Female|   asymptomatic|115|        564|       False|          2|   160|             No|          1.6|downsloping|                      0|reversable defect|      Absence|     31606|
| 57|Female|   asymptomatic|124|        261|       False|          0| 

In [11]:
# Rename the "Age" column to "Years"
df_renamed = df\
    .withColumnRenamed("FBS over 120", "Fasting_Blood_Sugar_over_120 ")\
    .withColumnRenamed("EKG results", "Electrocardiogram_Results ")\
    .withColumnRenamed("Max HR", "Max_Heart_rate ")
    

# Display the DataFrame with renamed column
print("\nDataFrame with Renamed Column:")
df_renamed.show()


DataFrame with Renamed Column:
+---+------+---------------+---+-----------+-----------------------------+--------------------------+---------------+---------------+-------------+-----------+-----------------------+-----------------+-------------+----------+
|Age|   Sex|Chest pain type| BP|Cholesterol|Fasting_Blood_Sugar_over_120 |Electrocardiogram_Results |Max_Heart_rate |Exercise angina|ST depression|Slope of ST|Number of vessels fluro|         Thallium|Heart Disease|Patient ID|
+---+------+---------------+---+-----------+-----------------------------+--------------------------+---------------+---------------+-------------+-----------+-----------------------+-----------------+-------------+----------+
| 70|Female|   asymptomatic|130|        322|                        False|                         2|            109|             No|          2.4|downsloping|                      3|reversable defect|     Presence|     31065|
| 67|Female|   asymptomatic|115|        564|                

In [13]:
# Path for saving the CSV file
output_path = "DataforGraph.csv"

# Save the DataFrame to CSV
df.write.csv(output_path, header=True, mode="overwrite")