<a href="https://colab.research.google.com/github/hatkiet/Project_4/blob/main/1_HeartAttack_SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Dataframe Basics

In [None]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.1'
spark_version = 'spark-3.5.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HeartAttack").getOrCreate()

In [None]:
#Download datasets from kaggle
!kaggle datasets download kamilpytlak/personal-key-indicators-of-heart-disease -f 2022/heart_2022_with_nans.csv --force

#unzip
!unzip -jo heart_2022_with_nans.csv.zip "heart_2022_with_nans.csv" -d /content/

In [None]:
#Read in dataset from CSV file into spark
from pyspark import SparkFiles
df = spark.read.csv(SparkFiles.get("/content/heart_2022_with_nans.csv"), sep=",", header=True)

In [None]:
# Show DataFrame
df.show()

In [None]:
# Print our schema
df.printSchema()

In [None]:
# Show the columns
df.columns

In [None]:
# Describe our data
df.describe()

In [None]:
df.select(["PhysicalHealthDays", "MentalHealthDays", "SleepHours", "BMI"]).describe().show()

In [None]:
columns_to_remove = [
    'LastCheckupTime', 'RemovedTeeth', 'HadCOPD',
    'HadArthritis', 'DeafOrHardOfHearing', 'BlindOrVisionDifficulty',
    'DifficultyConcentrating', 'DifficultyDressingBathing', 'DifficultyErrands',
    'HeightInMeters', 'WeightInKilograms', 'HIVTesting', 'FluVaxLast12',
    'PneumoVaxEver', 'TetanusLast10Tdap', 'HighRiskLastYear', 'CovidPos'
]

df = df.drop(*columns_to_remove)
df.show()

In [None]:
df.columns

In [None]:
column_order = [
        # User Info
        'State', 'Sex', 'AgeCategory', 'RaceEthnicityCategory','BMI',
        # General health
        'GeneralHealth', 'PhysicalHealthDays', 'MentalHealthDays', 'PhysicalActivities', 'ChestScan',
        # risks
        'SleepHours', 'DifficultyWalking', 'SmokerStatus', 'ECigaretteUsage', 'AlcoholDrinkers',
        # Medication History
        'HadHeartAttack', 'HadAngina',  'HadStroke', 'HadAsthma',
        'HadSkinCancer', 'HadDepressiveDisorder', 'HadKidneyDisease','HadDiabetes',
]
df = df.select(*column_order)
df.show()

In [None]:
rename_mapping = {
    'AgeCategory': "Age",
    'RaceEthnicityCategory': "Race",
    'GeneralHealth': "GenHealth",
    'PhysicalHealthDays': "PhysicalHealth",
    'PhysicalActivities': "PhysicalActivity",
    'MentalHealthDays': "MentalHealth",
    'HadAngina': "Angina",
    'HadHeartAttack': "HeartAttack",
    'HadStroke': "Stroke",
    'HadAsthma': "Asthma",
    'HadSkinCancer': "SkinCancer",
    'HadDepressiveDisorder': "Depressed",
    'HadKidneyDisease': "KidneyDisease",
    'DifficultyWalking': "DiffWalking",
    'HadDiabetes': "Diabetes",
    'SmokerStatus': "Smoking",
    'AlcoholDrinkers': "Drinking"
}

for name, rename in rename_mapping.items():
    df = df.withColumnRenamed(name, rename)

df.show()

In [None]:
from pyspark.sql.functions import col

# Count NaN values for each column
nan_counts = df.select([col(c).isNull().alias(c) for c in df.columns]) \
               .groupBy() \
               .count() \
               .collect()[0]

# Display NaN counts
print("NaN Value Distribution:")
for col_name, nan_count in zip(df.columns, nan_counts):
    print(f"{col_name}: {nan_count}")

In [None]:
# Drop rows with NaN values
df1 = df.dropna()

# Show DataFrame after dropping NaN values
df1.show()

In [None]:
df1.dtypes

In [None]:
# Create a temporary view of the Dataframe
df1.createOrReplaceTempView("heart_data")

In [None]:
# SQL query to calculate percentage of 'Yes' and 'No' in 'HeartAttack' column
# Use Spark SQL query to count the occurrences of each value and then calculate the percentages.

heart_attack_percentages = spark.sql("""
    SELECT ROUND(COUNT(CASE WHEN HeartAttack = 'Yes' THEN 1 END) / COUNT(*) * 100, 2) AS yes_percentage,
           ROUND(COUNT(CASE WHEN HeartAttack = 'No' THEN 1 END) / COUNT(*) * 100, 2) AS no_percentage
    FROM heart_data
""")

# Show the result
heart_attack_percentages.show()

# Converting PySpark DataFrame to Pandas DataFrame

In [None]:
import pandas as pd
pandas_df = df1.toPandas()

In [None]:
pandas_df

In [None]:
# Save to CSV
pandas_df.to_csv("/content/heart_2022_Spark.csv", index=False)