# Part 3 - Data Preparation

In [1]:
import pyspark
from pyspark.sql.functions import col, count, isnan, when
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("PySpark in Jupyter") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/23 11:46:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.csv('Dataset/dementia_data.csv', header=True, inferSchema=True)

# 3.1 Data Selection

In [3]:
# Initialize the list to store the column names with missing values
columns_with_missing = []

# Loop through each column in the DataFrame
for column in df.columns:
    # Count the number of null values, empty strings, "Unknown", and "None" for the column
    count_missing = df.filter(
        (col(column).isNull()) |
        (col(column) == "") |
        (col(column) == "Unknown") |
        (col(column) == "None")
    ).count()
    
    # If there are any missing values, add the column name to the list
    if count_missing > 0:
        columns_with_missing.append(column)


In [4]:
# Display the header of the table
print("Columns with Missing Values")
print("----------------------------")

# Display the column names with missing values
for column in columns_with_missing:
    print(column)


Columns with Missing Values
----------------------------
Prescription
Dosage in mg
Education_Level
Chronic_Health_Conditions


In [5]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

# we use Z-score to find outliers and extreme values

In [6]:
def count_outliers_iqr(df):
    # Define an empty dictionary to store the results
    results = {}
    
    # Loop through each numerical column in the DataFrame
    for column in df.columns:
        if df.select(column).dtypes[0][1] in ['int', 'bigint', 'float', 'double']:
            # Calculate quartiles for the column
            quartiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
            q1 = quartiles[0]
            q3 = quartiles[1]
            
            # Calculate IQR for the column
            iqr = q3 - q1
            
            # Calculate lower and upper bounds for outliers
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            
            # Count the number of values less than the lower bound or greater than the upper bound
            count_outliers = df.filter((col(column) < lower_bound) | (col(column) > upper_bound)).count()
            
            # Count the number of values less than the lower bound or greater than the upper bound by 3*IQR
            count_extremes = df.filter((col(column) < (q1 - 3 * iqr)) | (col(column) > (q3 + 3 * iqr))).count()
            
            # Add the counts to the dictionary
            results[column] = (count_outliers, count_extremes)
    
    # Return the dictionary containing counts of outliers and extreme values for each column
    return results


In [7]:
# Count outliers and extreme values for each column using IQR method
outlier_counts_iqr = count_outliers_iqr(df)


In [8]:
# Display the header of the table
print("Column Name            | Outliers   | Extreme Values")
print("-------------------------------------------------")

# Display the counts of outliers and extreme values for each column using IQR method
for column, (outliers, extremes) in outlier_counts_iqr.items():
    print(f"{column: <22} | {outliers: <10} | {extremes: <13}")


Column Name            | Outliers   | Extreme Values
-------------------------------------------------
Diabetic               | 0          | 0            
AlcoholLevel           | 0          | 0            
HeartRate              | 9          | 6            
BloodOxygenLevel       | 0          | 0            
BodyTemperature        | 20         | 20           
Weight                 | 10         | 8            
MRI_Delay              | 0          | 0            
Dosage in mg           | 0          | 0            
Age                    | 11         | 6            
Cognitive_Test_Scores  | 0          | 0            
Dementia               | 0          | 0            


# 3.2 Clean the Data

In [9]:
# Define the columns to drop
columns_to_drop = [
    'Dominant_Hand',
    'Medication_History',
    'Prescription',
    'Dosage in mg',
    'BloodOxygenLevel'
]

# Create a new DataFrame by dropping the specified columns
df_new = df.drop(*columns_to_drop)


In [10]:
# Display the total number of rows and columns of the new DataFrame
total_rows = df.count()
total_columns = len(df.columns)

print(f"Total Rows: {total_rows}, Total Columns: {total_columns}")

# After dropped
# Display the total number of rows and columns of the new DataFrame
total_rows_new = df_new.count()
total_columns_new = len(df_new.columns)

print(f"Total Rows after dropped: {total_rows_new}, Total Columns after dropped: {total_columns_new}")



Total Rows: 1000, Total Columns: 24
Total Rows after dropped: 1000, Total Columns after dropped: 19


In [11]:
def spark_info(df):
    # Get the schema of the DataFrame
    schema = df.schema
    
    # Create a list to hold column information
    columns_info = []
    
    # Iterate through the schema to get column information
    for field in schema:
        column_name = field.name
        column_type = field.dataType.simpleString()
        
        # Count non-null values
        non_null_count = df.filter((col(column_name).isNotNull()) & 
                                   (~col(column_name).isin("Unknown", "None")) & 
                                   (~isnan(col(column_name)))).count()
        
        # Count null values
        null_count = df.filter((col(column_name).isNull()) | 
                               (col(column_name).isin("Unknown", "None")) | 
                               isnan(col(column_name))).count()
        
        columns_info.append({
            "column_name": column_name,
            "column_type": column_type,
            "non_null_count": non_null_count,
            "null_count": null_count
        })
    
    summary = {
        "total_rows": total_rows,
        "total_columns": total_columns,
        "columns_info": columns_info
    }
    
    return summary

# Call the function to describe the DataFrame
summary = spark_info(df_new)


In [12]:
columns_info = summary['columns_info']

print("DataFrame Schema and Summary:")
print(f"{'Column':<25} {'Non-Null Count':<15} {'Null Count':<10} {'Dtype':<10}")
print("-" * 60)
for column_info in columns_info:
    print(f"{column_info['column_name']:<25} {column_info['non_null_count']:<15} {column_info['null_count']:<10} {column_info['column_type']:<10}")


DataFrame Schema and Summary:
Column                    Non-Null Count  Null Count Dtype     
------------------------------------------------------------
Diabetic                  1000            0          int       
AlcoholLevel              1000            0          double    
HeartRate                 1000            0          int       
BodyTemperature           1000            0          double    
Weight                    1000            0          double    
MRI_Delay                 1000            0          double    
Age                       1000            0          int       
Education_Level           845             155        string    
Gender                    1000            0          string    
Family_History            1000            0          string    
Smoking_Status            1000            0          string    
APOE_ε4                   1000            0          string    
Physical_Activity         1000            0          string    
Depression_St

In [13]:
from pyspark.sql.functions import when

# Replace null values in "Education_Level" column with "Unknown"
df_new_edu_1 = df_new.withColumn("Education_Level", 
                                  when(col("Education_Level").isNull() | 
                                       col("Education_Level").isin("None"),
                                       "Unknown").otherwise(col("Education_Level")))


In [14]:
# Calculate the null count for "Education_Level" column
edu_null_count = df_new_edu_1.filter(col("Education_Level").isNull()).count()
print(f"Null Count for 'Education_Level' column: {edu_null_count}")


Null Count for 'Education_Level' column: 0


In [15]:
# Replace null values in "Chronic_Health_Conditions" column with "None"
df_new_edu_1 = df_new_edu_1.withColumn("Chronic_Health_Conditions", 
                                        when(col("Chronic_Health_Conditions").isNull() | 
                                             col("Chronic_Health_Conditions").isin("Unknown"),
                                             "None").otherwise(col("Chronic_Health_Conditions")))


In [16]:
# Calculate the null count for "Chronic_Health_Conditions" column
chronic_health_null_count = df_new_edu_1.filter(col("Chronic_Health_Conditions").isNull()).count()
print(f"Null Count for 'Chronic_Health_Conditions' column: {chronic_health_null_count}")


Null Count for 'Chronic_Health_Conditions' column: 0


In [17]:
from pyspark.sql.functions import rand, when

# Define probabilities for each education level
primary_prob = 0.4
secondary_prob = 0.4
diploma_degree_prob = 0.2

# Randomly assign education levels based on probabilities
df_new_edu_2 = df_new.withColumn("Education_Level", 
                                 when(rand() <= primary_prob, "Primary School")
                                .when(rand() <= primary_prob + secondary_prob, "Secondary School")
                                .otherwise("Diploma/Degree"))


In [18]:
# Calculate the null count for "Education_Level" column
edu_null_count2 = df_new_edu_2.filter(col("Education_Level").isNull()).count()
print(f"Null Count for 'Education_Level' column: {edu_null_count2}")


Null Count for 'Education_Level' column: 0


In [19]:
# Replace null values and "None" in "Chronic_Health_Conditions" column with "Not healthy"
df_new_edu_2 = df_new_edu_2.withColumn("Chronic_Health_Conditions", 
                                        when((col("Chronic_Health_Conditions").isNull()) | 
                                             (col("Chronic_Health_Conditions") == "None") | 
                                             (col("Chronic_Health_Conditions") == "Unknown"),
                                             "Not healthy").otherwise(col("Chronic_Health_Conditions")))


In [20]:
# Calculate the null count for "Chronic_Health_Conditions" column
chronic_health_null_count_2 = df_new_edu_2.filter(col("Chronic_Health_Conditions").isNull()).count()
print(f"Null Count for 'Chronic_Health_Conditions' column: {chronic_health_null_count_2}")


Null Count for 'Chronic_Health_Conditions' column: 0


In [21]:
# Display counts for "Education_Level" column in df_new_edu_1
print("Counts for 'Education_Level' column in df_new_edu_1:")
df_new_edu_1.groupBy("Education_Level").count().show()

# Display counts for "Chronic_Health_Conditions" column in df_new_edu_1
print("Counts for 'Chronic_Health_Conditions' column in df_new_edu_1:")
df_new_edu_1.groupBy("Chronic_Health_Conditions").count().show()


Counts for 'Education_Level' column in df_new_edu_1:
+----------------+-----+
| Education_Level|count|
+----------------+-----+
|Secondary School|  304|
|  Primary School|  389|
|         Unknown|  155|
|  Diploma/Degree|  152|
+----------------+-----+

Counts for 'Chronic_Health_Conditions' column in df_new_edu_1:
+-------------------------+-----+
|Chronic_Health_Conditions|count|
+-------------------------+-----+
|                     None|  179|
|            Heart Disease|  155|
|                 Diabetes|  513|
|             Hypertension|  153|
+-------------------------+-----+



In [22]:
# Display counts for "Education_Level" column in df_new_edu_2
print("Counts for 'Education_Level' column in df_new_edu_2:")
df_new_edu_2.groupBy("Education_Level").count().show()

# Display counts for "Chronic_Health_Conditions" column in df_new_edu_2
print("Counts for 'Chronic_Health_Conditions' column in df_new_edu_2:")
df_new_edu_2.groupBy("Chronic_Health_Conditions").count().show()


Counts for 'Education_Level' column in df_new_edu_2:
+----------------+-----+
| Education_Level|count|
+----------------+-----+
|Secondary School|  476|
|  Primary School|  396|
|  Diploma/Degree|  128|
+----------------+-----+

Counts for 'Chronic_Health_Conditions' column in df_new_edu_2:
+-------------------------+-----+
|Chronic_Health_Conditions|count|
+-------------------------+-----+
|              Not healthy|  179|
|            Heart Disease|  155|
|                 Diabetes|  513|
|             Hypertension|  153|
+-------------------------+-----+



In [23]:
# Create a new DataFrame df_new_2
df_new_2 = df_new_edu_2


In [24]:
def spark_info(df):
    # Get the schema of the DataFrame
    schema = df.schema
    
    # Create a list to hold column information
    columns_info = []
    
    # Iterate through the schema to get column information
    for field in schema:
        column_name = field.name
        column_type = field.dataType.simpleString()
        
        # Count non-null values
        non_null_count = df_new_2.filter(col(column_name).isNotNull()).count()
        
        # Count null values
        null_count = df_new_2.filter(col(column_name).isNull() | isnan(col(column_name))).count()
        
        columns_info.append((column_name, column_type, non_null_count, null_count))
    
    # Display the DataFrame schema and summary
    total_rows = df_new_2.count()
    total_columns = len(schema)
    
    # Print the summary table
    print(f"DataFrame Summary:")
    print(f"{'Total Rows':<15}: {total_rows}")
    print(f"{'Total Columns':<15}: {total_columns}")
    print("\nDataFrame Schema:")
    print(f"{'Column':<25} {'Non-Null Count':<15} {'Null Count':<10} {'Dtype':<10}")
    print("-" * 60)
    for column_info in columns_info:
        print(f"{column_info[0]:<25} {column_info[2]:<15} {column_info[3]:<10} {column_info[1]:<10}")

# Call the function to describe the DataFrame
spark_info(df_new_2)


DataFrame Summary:
Total Rows     : 1000
Total Columns  : 19

DataFrame Schema:
Column                    Non-Null Count  Null Count Dtype     
------------------------------------------------------------
Diabetic                  1000            0          int       
AlcoholLevel              1000            0          double    
HeartRate                 1000            0          int       
BodyTemperature           1000            0          double    
Weight                    1000            0          double    
MRI_Delay                 1000            0          double    
Age                       1000            0          int       
Education_Level           1000            0          string    
Gender                    1000            0          string    
Family_History            1000            0          string    
Smoking_Status            1000            0          string    
APOE_ε4                   1000            0          string    
Physical_Activity         1

In [25]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

# we use Z-score to find outliers and extreme values

In [26]:
def count_outliers_iqr(df):
    # Define an empty dictionary to store the results
    results = {}
    
    # Loop through each numerical column in the DataFrame
    for column in df.columns:
        if df.select(column).dtypes[0][1] in ['int', 'bigint', 'float', 'double']:
            # Calculate quartiles for the column
            quartiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
            q1 = quartiles[0]
            q3 = quartiles[1]
            
            # Calculate IQR for the column
            iqr = q3 - q1
            
            # Calculate lower and upper bounds for outliers
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            
            # Count the number of values less than the lower bound or greater than the upper bound
            count_outliers = df.filter((col(column) < lower_bound) | (col(column) > upper_bound)).count()
            
            # Count the number of values less than the lower bound or greater than the upper bound by 3*IQR
            count_extremes = df.filter((col(column) < (q1 - 3 * iqr)) | (col(column) > (q3 + 3 * iqr))).count()
            
            # Add the counts to the dictionary
            results[column] = (count_outliers, count_extremes)
    
    # Return the dictionary containing counts of outliers and extreme values for each column
    return results

# Count outliers and extreme values for each column using IQR method for df_new_2
outlier_counts_iqr_new = count_outliers_iqr(df_new_2)

# Display the header of the table
print("Column Name            | Outliers   | Extreme Values")
print("-------------------------------------------------")

# Display the counts of outliers and extreme values for each column using IQR method for df_new_2
for column, (outliers, extremes) in outlier_counts_iqr_new.items():
    print(f"{column: <22} | {outliers: <10} | {extremes: <13}")


Column Name            | Outliers   | Extreme Values
-------------------------------------------------
Diabetic               | 0          | 0            
AlcoholLevel           | 0          | 0            
HeartRate              | 9          | 6            
BodyTemperature        | 20         | 20           
Weight                 | 10         | 8            
MRI_Delay              | 0          | 0            
Age                    | 11         | 6            
Cognitive_Test_Scores  | 0          | 0            
Dementia               | 0          | 0            


In [27]:
# show the values of outliers and extreme values
from pyspark.sql.functions import col

def find_outliers_iqr(df):
    # Define an empty dictionary to store the results
    results = {}
    
    # Loop through each numerical column in the DataFrame
    for column in df.columns:
        if df.select(column).dtypes[0][1] in ['int', 'bigint', 'float', 'double']:
            # Calculate quartiles for the column
            quartiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
            q1 = quartiles[0]
            q3 = quartiles[1]
            
            # Calculate IQR for the column
            iqr = q3 - q1
            
            # Calculate lower and upper bounds for outliers
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            
            # Find outliers and extreme values
            outliers = df.filter((col(column) < lower_bound) | (col(column) > upper_bound)).select(column).collect()
            extremes = df.filter((col(column) < (q1 - 3 * iqr)) | (col(column) > (q3 + 3 * iqr))).select(column).collect()
            
            # If there are outliers or extreme values, add them to the results dictionary
            if outliers or extremes:
                results[column] = {
                    "outliers": [row[column] for row in outliers],
                    "extremes": [row[column] for row in extremes]
                }
    
    # Return the dictionary containing outliers and extreme values for each column
    return results


In [28]:
# Call the function to find outliers and extreme values for df_new_2
outlier_values_iqr = find_outliers_iqr(df_new_2)

# Display the outliers and extreme values
for column, values in outlier_values_iqr.items():
    print(f"Column: {column}")
    if "outliers" in values:
        print("Outliers:")
        print(values["outliers"])
    if "extremes" in values:
        print("Extremes:")
        print(values["extremes"])


Column: HeartRate
Outliers:
[194, 20, 182, 150, 190, 210, 320, 32, 1]
Extremes:
[194, 182, 190, 210, 320, 1]
Column: BodyTemperature
Outliers:
[41.34388999, 40.27682345, 42.25475742, 316.9464443, 32.10684275, 31.39061679, 30.94032093, 46.93897455, 42.41509512, 42.62742142, 46.42802935, 356.760164, 42.12994276, 43.97082575, 41.56382146, 45.2072102, 29.81030025, 30.06031921, 51.16408019, 22.06352268]
Extremes:
[41.34388999, 40.27682345, 42.25475742, 316.9464443, 32.10684275, 31.39061679, 30.94032093, 46.93897455, 42.41509512, 42.62742142, 46.42802935, 356.760164, 42.12994276, 43.97082575, 41.56382146, 45.2072102, 29.81030025, 30.06031921, 51.16408019, 22.06352268]
Column: Weight
Outliers:
[717.5014573, 519.5217171, 161.4996653, 287.3927216, 128.5190244, 540.7445893, 911.6282937, 796.9402277, 631.0099286, 200.3169645]
Extremes:
[717.5014573, 519.5217171, 287.3927216, 540.7445893, 911.6282937, 796.9402277, 631.0099286, 200.3169645]
Column: Age
Outliers:
[120, 200, 157, 111, 234, 113, 112, 

In [29]:
df_new_2.printSchema()

root
 |-- Diabetic: integer (nullable = true)
 |-- AlcoholLevel: double (nullable = true)
 |-- HeartRate: integer (nullable = true)
 |-- BodyTemperature: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- MRI_Delay: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Education_Level: string (nullable = false)
 |-- Gender: string (nullable = true)
 |-- Family_History: string (nullable = true)
 |-- Smoking_Status: string (nullable = true)
 |-- APOE_ε4: string (nullable = true)
 |-- Physical_Activity: string (nullable = true)
 |-- Depression_Status: string (nullable = true)
 |-- Cognitive_Test_Scores: integer (nullable = true)
 |-- Nutrition_Diet: string (nullable = true)
 |-- Sleep_Quality: string (nullable = true)
 |-- Chronic_Health_Conditions: string (nullable = true)
 |-- Dementia: integer (nullable = true)



In [30]:
# Save DataFrame to a CSV file
#df_new_2.write.csv("Dataset/df_new_2.csv", header=True)


AnalysisException: path file:/home/ubuntu/722-Iteration4/Dataset/df_new_2.csv already exists.

In [None]:
# Continue in Part 2