In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Heart Disease Prediction") \
    .getOrCreate()

# Load the dataset
df = spark.read.csv('data/heart_disease.csv', header=True, inferSchema=True)

In [None]:
df.show(5)

In [None]:
# List of columns to retain
columns_to_keep = [
    'age', 'sex', 'painloc', 'painexer', 'cp', 'trestbps', 'smoke', 'fbs', 
    'prop', 'nitr', 'pro', 'diuretic', 'thaldur', 'thalach', 'exang', 
    'oldpeak', 'slope', 'target'
]

# Select the columns to retain
df = df.select(columns_to_keep)
df.show(5)

In [None]:
from pyspark.sql.functions import col as spark_col, when
from pyspark.ml.feature import Imputer

# Define the columns explicitly
binary_categorical_cols = ['sex', 'painloc', 'painexer', 'fbs', 'prop', 'nitr', 'pro', 'diuretic', 'exang', 'slope', 'target']
numerical_cols = ['age', 'thaldur', 'trestbps', 'oldpeak', 'thalach']

# Cleaning 'trestbps': Replace values less than 100 mm Hg with the median of values >= 100
median_trestbps = df.filter(spark_col('trestbps') >= 100).approxQuantile('trestbps', [0.5], 0.001)[0]
df = df.withColumn('trestbps', when(spark_col('trestbps') < 100, median_trestbps).otherwise(spark_col('trestbps')))

# Cleaning 'oldpeak': Replace values less than 0 and those greater than 4 with the median of values within range
median_oldpeak = df.filter((spark_col('oldpeak') >= 0) & (spark_col('oldpeak') <= 4)).approxQuantile('oldpeak', [0.5], 0.001)[0]
df = df.withColumn('oldpeak', when((spark_col('oldpeak') < 0) | (spark_col('oldpeak') > 4), median_oldpeak).otherwise(spark_col('oldpeak')))

# Impute remaining numerical columns using median
imputer = Imputer(inputCols=numerical_cols, outputCols=[f"{col}_imputed" for col in numerical_cols]).setStrategy("median")
df_imputed = imputer.fit(df).transform(df)

# Drop the original columns and rename imputed columns
for num_col in numerical_cols:
    df_imputed = df_imputed.drop(num_col).withColumnRenamed(f"{num_col}_imputed", num_col)

# Ensure valid binary values (0 or 1) where applicable for specific columns
for binary_col in ['fbs', 'prop', 'nitr', 'pro', 'diuretic']:
    df_imputed = df_imputed.withColumn(binary_col, when(spark_col(binary_col) > 1, None).otherwise(spark_col(binary_col)))

# Note: KNN imputation and classification should be handled differently in Spark; using Spark's MLlib for similar functionality

In [None]:
from pyspark.sql import functions as F

# Function to get the mode of a column
def get_mode(df, col_name):
    return df.groupBy(col_name).count().orderBy(F.desc('count')).first()[0]

# Fill missing values in categorical columns with the mode
for col in binary_categorical_cols:
    mode_value = get_mode(df_imputed, col)
    df_imputed = df_imputed.fillna({col: mode_value})

# Verify if there are any missing values left
for col in binary_categorical_cols:
    print(f"Missing values in {col}: {df_imputed.filter(F.col(col).isNull()).count()}")

# Drop rows where the slope value is 0
df_imputed = df_imputed.filter(df_imputed.slope != 0)

# Verify if there are any slope values of 0 left
print(f"Remaining rows with slope value 0: {df_imputed.filter(F.col('slope') == 0).count()}")

In [None]:
# Identify binary and categorical columns
binary_categorical_cols = ['sex', 'painloc', 'painexer', 'fbs', 'prop', 'nitr', 'pro', 'diuretic', 'exang', 'slope', 'target']

# Display value counts for each binary or categorical column
for column in binary_categorical_cols:
    print(f"\nValue Counts for {column}:")
    df_imputed.groupBy(column).count().show()

In [None]:
numerical_cols = ['age', 'thaldur', 'trestbps', 'oldpeak', 'thalach']

# Display basic statistics for numerical columns
print("Basic Statistics for Numerical Columns:")
df_imputed.select(numerical_cols).describe().show()


In [None]:
!pip install pandas
!pip install matplotlib
!pip install seaborn
import pandas
import matplotlib.pyplot as plt
import seaborn as sns

# Configure the notebook to show plots inline
%matplotlib inline

# Optional: Configure the style for seaborn plots
sns.set(style="whitegrid")

# Convert the DataFrame to Pandas for plotting
numerical_df = df_imputed.select(numerical_cols).toPandas()

# Creating a combined box plot
plt.figure(figsize=(12, 6))  # Set the figure size
sns.boxplot(data=numerical_df, orient="v")  # Vertical box plots
plt.title('Combined Box Plot for Numerical Columns')
plt.show()


In [None]:
# Convert the DataFrame to Pandas for plotting
binary_df = df_imputed.select(binary_categorical_cols).toPandas()

# Determine the layout size
n_cols = 4  # Increase number of columns in the grid for a more compact display
n_rows = (len(binary_categorical_cols) + n_cols - 1) // n_cols  # Calculate required number of rows in the grid

# Create figure and axes for the subplots
plt.figure(figsize=(12, 10))  # Adjust overall figure size to reduce plot width

for i, col in enumerate(binary_categorical_cols):
    plt.subplot(n_rows, n_cols, i + 1)  # Create a subplot for each column
    sns.countplot(x=binary_df[col], color='skyblue')  # Specify the color here
    plt.title(f'Value Counts for {col}')
    plt.xticks(rotation=0)  # Rotate x labels for better readability if necessary
    plt.subplots_adjust(wspace=0.5, hspace=0.5)  # Adjust the space between plots

# Add a central title for the entire figure
plt.suptitle('Distribution of Binary and Categorical Variables', fontsize=16, fontweight='bold')

plt.tight_layout()  # Adjust subplots to fit into the figure area nicely
plt.show()


In [None]:
import requests
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


# URL of the page to be scraped
url = "https://www.abs.gov.au/statistics/health/health-conditions-and-risks/smoking/latest-release"

# Send a GET request
response = requests.get(url)
if response.status_code == 200:
    soup = BeautifulSoup(response.text, 'html.parser')

    # Attempt to find the table by a unique characteristic such as a caption
    caption_text = "Proportion of people 15 years and over who were current daily smokers by age, 2011–12 to 2022"
    table = soup.find("caption", string=lambda text: caption_text in text if text else False).find_parent('table') if soup.find("caption", string=lambda text: caption_text in text if text else False) else None

    if table:
        # Extracting headers from the table's header row
        headers = [th.get_text(strip=True) for th in table.find('thead').find_all('th')[1:]]  # Skip the first empty <th>
        headers.insert(0, "Age Group")  # Manually add "Age Group" as the first header

        # Collecting data rows
        data = []
        for row in table.find('tbody').find_all('tr'):
            cells = [td.get_text(strip=True) for td in row.find_all('td')]
            age_group = row.find('th').get_text(strip=True) if row.find('th') else ""
            cells.insert(0, age_group)  # Insert the age group at the start of the list of cells

            if len(cells) == len(headers):  # Ensure the row has the correct number of elements
                data.append(cells)

        # Creating a Spark DataFrame
        df_age_group = spark.createDataFrame(data, schema=headers)

        # Convert relevant columns to float
        for col_name in ['2011–12 (%)', '2014–15 (%)', '2017–18 (%)', '2022 (%)']:
            df_age_group = df_age_group.withColumn(col_name, col(col_name).cast('float'))

        # Compute the average smoking rate across all specified years for each row
        df_age_group = df_age_group.withColumn(
            'Average Smoking Rate (%)',
            (col('2011–12 (%)') + col('2014–15 (%)') + col('2017–18 (%)') + col('2022 (%)')) / 4
        )

        # Select only the age group and the average smoking rate
        df_age_group = df_age_group.select('Age Group', 'Average Smoking Rate (%)')

        df_age_group.show(8)
    else:
        print("Table not found.")
else:
    print("Failed to retrieve data, status code:", response.status_code)


In [None]:
import requests
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


# URL of the page to be scraped
url = "https://www.abs.gov.au/statistics/health/health-conditions-and-risks/smoking/latest-release"

# Send a GET request to the URL
response = requests.get(url)
if response.status_code == 200:
    soup = BeautifulSoup(response.text, 'html.parser')

    # Attempt to find the table by searching for a specific caption
    caption_text = "Proportion of people 15 years and over who were current daily smokers by age and sex, 2022"
    table = soup.find("caption", string=lambda text: caption_text in text if text else False).find_parent('table') if soup.find("caption", string=lambda text: caption_text in text if text else False) else None

    if table:
        # Extract headers from the table's thead element
        headers = [th.get_text(strip=True) for th in table.find('thead').find_all('th')[1:]]  # Skip the first empty <th>
        headers.insert(0, "Age Group")  # Manually add "Age Group" as the first header

        # Collecting data rows
        data = []
        for row in table.find('tbody').find_all('tr'):
            cells = [td.get_text(strip=True) for td in row.find_all('td')]
            age_group = row.find('th').get_text(strip=True) if row.find('th') else ""
            cells.insert(0, age_group)  # Insert the age group at the start of the list of cells

            if len(cells) == len(headers):  # Ensure the row has the correct number of elements
                data.append(cells)

        # Creating a Spark DataFrame
        df_age_sex_group = spark.createDataFrame(data, schema=headers)

        # Convert relevant columns to float
        for col_name in ['Males (%)', 'Females (%)']:
            df_age_sex_group = df_age_sex_group.withColumn(col_name, col(col_name).cast('float'))

        # Select only the age group and the converted columns
        df_age_sex_group = df_age_sex_group.select('Age Group', 'Males (%)', 'Females (%)')

        df_age_sex_group.show(8)
    else:
        print("Table not found under this caption.")
else:
    print("Failed to retrieve data, status code:", response.status_code)


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the function to categorize age
def categorize_age(age):
    if age < 18:
        return "15–17"
    elif age < 25:
        return "18–24"
    elif age < 35:
        return "25–34"
    elif age < 45:
        return "35–44"
    elif age < 55:
        return "45–54"
    elif age < 65:
        return "55–64"
    elif age < 75:
        return "65–74"
    else:
        return "75 years and over"

# Convert the function to a UDF
categorize_age_udf = udf(categorize_age, StringType())

# Apply the UDF to create a new column 'Age Group'
df_imputed = df_imputed.withColumn('Age Group', categorize_age_udf(col('age')))

# Show the result
df_imputed.show(5)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType, StringType
from pyspark import SparkContext

sc = SparkContext.getOrCreate()


# Broadcast the data required for the UDF
age_group_broadcast = sc.broadcast({row['Age Group']: row['Average Smoking Rate (%)'] for row in df_age_group.collect()})
age_sex_group_broadcast = sc.broadcast({row['Age Group']: (row['Males (%)'], row['Females (%)']) for row in df_age_sex_group.collect()})

# Define the impute_smoking function
def impute_smoking(age_group, gender):
    age_group_dict = age_group_broadcast.value
    age_sex_group_dict = age_sex_group_broadcast.value
    
    if gender == 1:  # Male
        if age_group in age_sex_group_dict:
            age_rate = age_group_dict.get(age_group, None)
            male_rate, female_rate = age_sex_group_dict[age_group]
            rate = (age_rate * male_rate) / female_rate if female_rate != 0 else male_rate
        else:
            rate = None 
    else:  # Female
        if age_group in age_group_dict:
            rate = age_group_dict.get(age_group, None)
        else:
            rate = None  

    return rate

# Convert the function to a UDF
impute_smoking_udf = udf(impute_smoking, FloatType())

# Assuming df_imputed is your main DataFrame with 'Age Group' and 'sex' columns

# Define the function to categorize age
def categorize_age(age):
    if age < 18:
        return "15–17"
    elif age < 25:
        return "18–24"
    elif age < 35:
        return "25–34"
    elif age < 45:
        return "35–44"
    elif age < 55:
        return "45–54"
    elif age < 65:
        return "55–64"
    elif age < 75:
        return "65–74"
    else:
        return "75 years and over"

# Convert the function to a UDF
categorize_age_udf = udf(categorize_age, StringType())

# Load your actual main DataFrame
# df_imputed = spark.read.csv('path_to_your_actual_data.csv', header=True, inferSchema=True)

# Apply the UDF to create a new column 'Age Group'
df_imputed = df_imputed.withColumn('Age Group', categorize_age_udf(col('age')))

# Apply the UDF to create a new column 'smoke'
df_imputed = df_imputed.withColumn('smoke', impute_smoking_udf(col('Age Group'), col('sex')))

# Round and convert the 'smoke' column to integer
df_imputed = df_imputed.withColumn('smoke', col('smoke').cast('int'))

# Show the result
df_imputed.show(5)
