# Data Ingestion

In [1]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
  .appName("Heart Disease Data Preprocessing") \
  .getOrCreate()

# Load the CSV file from local into a DataFrame
df = spark.read.csv(r"D:\heart_data\raw\heart_disease.csv", header=True, inferSchema=True)

# Check if Spark session is active
if spark is None:
  raise Exception("Spark session is not active.")

# Display the first 20 rows with truncated strings (default behavior)
df.show()

+---------+------------+----+-------+---------------+------+--------------+------------+------+-----------+--------+---------+---+-----------+-----------+----------+-----------+---------+------+
|PatientID|HeartDisease| BMI|Smoking|AlcoholDrinking|Stroke|PhysicalHealth|MentalHealth|Gender|AgeCategory|Diabetic|GenHealth|age|cholesterol|resting ecg|SystolicBP|DiastolicBP|HeartRate|target|
+---------+------------+----+-------+---------------+------+--------------+------------+------+-----------+--------+---------+---+-----------+-----------+----------+-----------+---------+------+
|     3942|          No|32.9|    Yes|            Yes|    No|            23|          15|Female|      65-69|     Yes|     Good| 66|      195.6|     Normal|       120|         84|       71|     0|
|     4635|         Yes|20.7|    Yes|            Yes|    No|            10|          14|Female|80 or older|     Yes|     Good| 82|      215.5|        LVH|       152|         97|       90|     1|
|     3978|          No|3

# Data Preprocessing

### 1. Handling missing values


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count

# Check for missing values in the DataFrame
missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0]

# Print information about missing values
print("Missing values in the DataFrame:")
for col_name, count in zip(df.columns, missing_counts):
    if count > 0:
        print(f"Column '{col_name}' has {count} missing values.")

# Check if any missing values exist before dropping
if any(missing_counts):
    # Optionally drop rows with missing values
    df = df.dropna()  # Drop rows with missing values in all columns
    print("Missing values have been dropped.")
else:
    print("No missing values to drop.")

Missing values in the DataFrame:
No missing values to drop.


### 2. Check for duplicate values

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, Column

# ... your existing code to initialize Spark session and load data ...

# Check for duplicate values
duplicate_count = df.count() - df.distinct().count()

# Print statement based on the result
if duplicate_count > 0:
  print(f"There are {duplicate_count} duplicate rows in the DataFrame.")
else:
  print("No duplicate rows are present in the DataFrame.")

No duplicate rows are present in the DataFrame.


In [5]:
# Get the total number of rows in the DataFrame
total_rows = df.count()

# Print the total number of rows
print(f"Total number of rows in the DataFrame: {total_rows}")


Total number of rows in the DataFrame: 75000


### Summary Statistics

In [7]:
from pyspark.sql import SparkSession
# Describe the DataFrame (if available)
try:
  df.describe().show()
except AttributeError:
  print("`describe` function not available in this PySpark version. Consider upgrading or using summary statistics for individual columns.")

+-------+-----------------+------------+------------------+-------+---------------+------+------------------+------------------+------+-----------+--------+---------+------------------+------------------+-----------+-----------------+------------------+------------------+-----------------+
|summary|        PatientID|HeartDisease|               BMI|Smoking|AlcoholDrinking|Stroke|    PhysicalHealth|      MentalHealth|Gender|AgeCategory|Diabetic|GenHealth|               age|       cholesterol|resting ecg|       SystolicBP|       DiastolicBP|         HeartRate|           target|
+-------+-----------------+------------+------------------+-------+---------------+------+------------------+------------------+------+-----------+--------+---------+------------------+------------------+-----------+-----------------+------------------+------------------+-----------------+
|  count|            75000|       75000|             75000|  75000|          75000| 75000|             75000|             75000

## 3. Normalization

In [8]:
from pyspark.sql import SparkSession, functions as F

# List of columns to normalize
columns_to_normalize = ['BMI', 'PhysicalHealth', 'MentalHealth', 'Age', 'Cholesterol', 'SystolicBP', 'DiastolicBP', 'HeartRate']

# Apply min-max scaling to specified columns
for c in columns_to_normalize:
    min_value = df.select(F.min(c)).collect()[0][0]
    max_value = df.select(F.max(c)).collect()[0][0]
    df = df.withColumn(f"normalized_{c}", (F.col(c) - min_value) / (max_value - min_value))

# Show the normalized DataFrame
df.show()


+---------+------------+----+-------+---------------+------+--------------+------------+------+-----------+--------+---------+---+-----------+-----------+----------+-----------+---------+------+-------------------+-------------------------+-----------------------+-------------------+----------------------+---------------------+----------------------+--------------------+
|PatientID|HeartDisease| BMI|Smoking|AlcoholDrinking|Stroke|PhysicalHealth|MentalHealth|Gender|AgeCategory|Diabetic|GenHealth|age|cholesterol|resting ecg|SystolicBP|DiastolicBP|HeartRate|target|     normalized_BMI|normalized_PhysicalHealth|normalized_MentalHealth|     normalized_Age|normalized_Cholesterol|normalized_SystolicBP|normalized_DiastolicBP|normalized_HeartRate|
+---------+------------+----+-------+---------------+------+--------------+------------+------+-----------+--------+---------+---+-----------+-----------+----------+-----------+---------+------+-------------------+-------------------------+------------