In [None]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DBAS-Step2-Data Understanding').getOrCreate()

# Enable pandas-on-Spark
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
print(pyspark.__version__)


In [None]:
## Load data from csv file
spk_df = spark.read.csv("Data/heart_failure_dataset_raw.csv", header=True, inferSchema=True)
spk_df.printSchema()

# Convert Spark DataFrame to pandas-on-Spark DataFrame using to_pandas_on_spark()
spkpd_df = spk_df.to_pandas_on_spark()
spkpd_df.info()
spkpd_df.head()


In [None]:
#### ----------03-DP 

# Add any data preparation approaches for following DM steps

## Convert the data types
print("--------- Before converting -----------------------------------")
print(spkpd_df.dtypes)
spkpd_df = spkpd_df.astype({
    'anaemia' : bool, 
    'high_blood_pressure' : bool, 
    'smoking' : bool, 
    'diabetes' : bool, 
    'DEATH_EVENT' : bool,
    'sex' : bool,
    })

print("--------- After converting -----------------------------------")
print(spkpd_df.dtypes)

In [None]:
spkpd_df.info()
print("---------")

In [None]:
## Change age from numeric to categorical 1
from pyspark.sql.functions import when

# Define the bins and labels
bins = [40, 50, 60, 70, 80, 100]
labels = ['age_40', 'age_50', 'age_60', 'age_70', 'age_80']

# Initialize Age_Level column with null values
spk_df = spkpd_df.to_spark()
spk_df = spk_df.withColumn("Age_Level", when(spk_df["age"].isNull(), None).otherwise(None))

# Create the Age_Level column using when conditions
for i in range(len(bins) - 1):
    spk_df = spk_df.withColumn(
        "Age_Level",
        when((spk_df["age"] >= bins[i]) & (spk_df["age"] < bins[i + 1]), labels[i]).otherwise(spk_df["Age_Level"])
    )


# Convert Spark DataFrame to pandas-on-Spark DataFrame
spkpd_df = spk_df.to_pandas_on_spark()
print("--------- Age_Level was added -----------------------------------")
spkpd_df.info()
spkpd_df.head()

In [None]:
# First, we have to register the DataFrame as a SQL temporary view.
spk_df.createOrReplaceTempView('patients')

# Here's another example:
results = spark.sql("SELECT age, Age_Level FROM patients")
results.show()

In [None]:
## Change age from numeric to categorical 1
# Add Visualisations
import matplotlib.pyplot as plt
from pyspark.pandas.config import set_option
set_option("plotting.backend", "matplotlib")

# Order by counts
fig, ax = plt.subplots()
spkpd_df['Age_Level'].value_counts().plot.bar(ax=ax)
ax.set_xlabel('Age_Level')
ax.set_ylabel('Count')
ax.set_title('Distribution of Age_Level')

# Order by labels
fig, ax = plt.subplots()
spkpd_df['Age_Level'].value_counts().reindex(labels).plot.bar(ax=ax)
ax.set_xlabel('Age_Level')
ax.set_ylabel('Count')
ax.set_title('Distribution of Age_Level')


In [None]:
import seaborn as sns

columns_to_cap = ['serum_sodium', 'serum_creatinine', 'creatinine_phosphokinase', 'platelets']

for col in columns_to_cap:
    
    # Boxplot before capping
    sns.boxplot(x=spkpd_df[col].to_numpy())
    #plt.title("Boxplot before capping")
    plt.title(f"Boxplot of {col} before capping")
    plt.show()


In [None]:

#### Handle extremes

columns_to_cap = ['serum_sodium', 'serum_creatinine', 'creatinine_phosphokinase', 'platelets']

for col in columns_to_cap:

    Q1 = spkpd_df[col].quantile(0.25)
    Q3 = spkpd_df[col].quantile(0.75)
    IQR = Q3 - Q1

    lower_limit = Q1 - 3 * IQR
    upper_limit = Q3 + 3 * IQR
    
    
    # Handle extremes
    spkpd_df[col] = spkpd_df[col].apply(lambda x: upper_limit 
                                        if x > upper_limit 
                                        else (lower_limit if x < lower_limit 
                                              else x))                                
    

In [None]:
columns_to_cap = ['serum_sodium', 'serum_creatinine', 'creatinine_phosphokinase', 'platelets']

for col in columns_to_cap:
    # Boxplot after capping
    sns.boxplot(x=spkpd_df[col].to_numpy(), color='skyblue')
    #plt.title("Boxplot after capping")
    plt.title(f"Boxplot of {col} after capping")
    plt.show()

In [None]:
spkpd_df.info()
spk_df.printSchema()

In [None]:
## Change days to months
from pyspark.sql import functions as F

# Change days to months
spk_df = spk_df.withColumn('follow_up_month', spk_df['time'] / 30)


spk_df.printSchema()
spkpd_df.info()
print("-----------------------")

# Register the DataFrame as a SQL temporary view
spk_df.createOrReplaceTempView('patients')

# Execute SQL query
results = spark.sql("SELECT time, follow_up_month FROM patients")
results.show()


In [None]:
# Format the follow_up_month
spk_df = spk_df.withColumn('follow_up_month', F.round(spk_df['follow_up_month'], 2))

# Register the DataFrame as a SQL temporary view
spk_df.createOrReplaceTempView('patients')

# Execute SQL query
results = spark.sql("SELECT time, follow_up_month FROM patients")
results.show()


In [None]:

# Convert Spark DataFrame to pandas-on-Spark DataFrame
spkpd_df = spk_df.to_pandas_on_spark()
spkpd_df.info()
spkpd_df.head()

In [None]:
# 保存为 CSV 文件
spk_df.write.csv("Data/3DP", header=True)

In [None]:
# Stop Spark session
spark.stop()