In [0]:
# Importing Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# Creating Spark Object

spark = SparkSession.builder.appName("Data Cleaning").getOrCreate()

In [0]:
# Read raw data
df = spark.read.format("delta").load("/dbfs/FileStore/Heart Disease/heart_ingestion")

In [0]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- dataset: string (nullable = true)
 |-- cp: string (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: boolean (nullable = true)
 |-- restecg: string (nullable = true)
 |-- thalch: integer (nullable = true)
 |-- exang: boolean (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: string (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: string (nullable = true)
 |-- num: integer (nullable = true)



In [0]:
# Total Rows Of Data Frame

print(f"The Data Frame has {df.count()} rows.")

The Data Frame has 920 rows.


In [0]:
# Check For Duplicate Rows

df.groupBy("id").count().filter("count>1").show()

+---+-----+
| id|count|
+---+-----+
+---+-----+



In [0]:
# Checking For Null Values

df.select([count(when(col(c).isNull(),1)).alias(c) for c in df.columns]).show()


+---+---+---+-------+---+--------+----+---+-------+------+-----+-------+-----+---+----+---+
| id|age|sex|dataset| cp|trestbps|chol|fbs|restecg|thalch|exang|oldpeak|slope| ca|thal|num|
+---+---+---+-------+---+--------+----+---+-------+------+-----+-------+-----+---+----+---+
|  0|  0|  0|      0|  0|      59|  30| 90|      2|    55|   55|     62|  309|611| 486|  0|
+---+---+---+-------+---+--------+----+---+-------+------+-----+-------+-----+---+----+---+



In [0]:
# Rename "num" to "stage"
df = df.withColumnRenamed("num", "stage")


In [0]:
# Filling missing value with mean For trestbps
mean_value = df.select(mean("trestbps")).collect()[0][0]
df = df.fillna({"trestbps": mean_value})

In [0]:
# Filling missing value with mean For chol

mean_value=df.select(mean("chol")).collect()[0][0]
df = df.fillna({"chol": mean_value})

In [0]:
# Distribution Of fbs

df.groupBy("fbs").count().show()

+-----+-----+
|  fbs|count|
+-----+-----+
| null|   90|
| true|  138|
|false|  692|
+-----+-----+



In [0]:
# Imputing fbs By False

df = df.fillna({"fbs": False})

In [0]:
# Distribution Of restecg

df.groupBy("restecg").count().show()

+----------------+-----+
|         restecg|count|
+----------------+-----+
|  lv hypertrophy|  188|
|            null|    2|
|          normal|  551|
|st-t abnormality|  179|
+----------------+-----+



In [0]:
# Imputing restecg By Normal

df = df.fillna({"fbs": "normal"})

In [0]:
# Filling missing value with mean For thalch

mean_value=df.select(mean("thalch")).collect()[0][0]
df = df.na.fill({"thalch": mean_value})


In [0]:
# Distribution Of exang

df.groupBy("exang").count().show()

+-----+-----+
|exang|count|
+-----+-----+
| null|   55|
| true|  337|
|false|  528|
+-----+-----+



In [0]:
# Filling missing value Fot exang

df = df.fillna({"exang":False})

In [0]:
# Filling missing value with mean For oldpeak

mean_value=df.select(mean("oldpeak")).collect()[0][0]
df = df.na.fill({"oldpeak": mean_value})

In [0]:
# Distribution Of slope

df.groupBy("slope").count().orderBy("count",ascending = False).show()

+-----------+-----+
|      slope|count|
+-----------+-----+
|       flat|  345|
|       null|  309|
|  upsloping|  203|
|downsloping|   63|
+-----------+-----+



In [0]:
# Imputing slope with flat
df = df.fillna({"slope":"flat"})

In [0]:
# Distribution Of ca

df.groupBy("ca").count().show()

+----+-----+
|  ca|count|
+----+-----+
|null|  611|
|   1|   67|
|   3|   20|
|   2|   41|
|   0|  181|
+----+-----+



In [0]:
# Imputng ca with 0
df = df.fillna({"ca":0})

In [0]:
# Distribution Of thal

df.groupBy("thal").count().orderBy("count",ascending = False).show()

+-----------------+-----+
|             thal|count|
+-----------------+-----+
|             null|  486|
|           normal|  196|
|reversable defect|  192|
|     fixed defect|   46|
+-----------------+-----+



In [0]:
# Imputng thal with normal
df = df.fillna({"thal":"normal"})

In [0]:
# Distribution Of restecg

df.groupBy("restecg").count().orderBy("count",ascending = False).show()

+----------------+-----+
|         restecg|count|
+----------------+-----+
|          normal|  551|
|  lv hypertrophy|  188|
|st-t abnormality|  179|
|            null|    2|
+----------------+-----+



In [0]:
# Imputng restecg with normal
df = df.fillna({"restecg":"normal"})

In [0]:
# Checking For Null Values
df.select([count(when(col(c).isNull(),1)).alias(c) for c in df.columns]).show()

+---+---+---+-------+---+--------+----+---+-------+------+-----+-------+-----+---+----+-----+
| id|age|sex|dataset| cp|trestbps|chol|fbs|restecg|thalch|exang|oldpeak|slope| ca|thal|stage|
+---+---+---+-------+---+--------+----+---+-------+------+-----+-------+-----+---+----+-----+
|  0|  0|  0|      0|  0|       0|   0|  0|      0|     0|    0|      0|    0|  0|   0|    0|
+---+---+---+-------+---+--------+----+---+-------+------+-----+-------+-----+---+----+-----+



In [0]:
# Save cleaned
df.write.format("delta").mode("overwrite").save("/dbfs/FileStore/Heart Disease/heart_cleaned")