With the given data set, solve the following tasks using PySpark.

Task 1: Write a function that removes duplicate rows, ensures correct data types for numerical and date columns and converts all ‘yes’/ ‘no’ type fields into 1/0 format.  

Task 2: Write a function that adds a new column, treatment_duration_days, which calculates the number of days between the diagnosis and the end of treatment. Then, return the average treatment duration for each treatment type.  

Task 3: Write a function that returns the smoking_status group with the highest survival rate.  

Task 4: Write a function that returns the top three countries with the highest percentage of patients diagnosed in Stage IV.  

Task 5: Write a function that filters patients who:  

Are male  

Diagnosed in Stage III or IV  

Have a family history of cancer  

Are current smokers  

Have a BMI > 30  

Survived 

Return the average age and the percentage of these patients who had hypertension.

 

In [45]:
# Import PySpark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lower,datediff, avg, bround, desc, count
from pyspark.sql.types import IntegerType, DoubleType, DateType

In [4]:
# Initialize SparkSession
spark = SparkSession.builder.appName("Lung Cancer Patient Health and Treatment Records").getOrCreate()

In [5]:
# Load the CSV file and display records
df = spark.read.csv("Lung Cancer.csv", header=True, inferSchema=True)
df.show(5)

+---+----+------+-----------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
| id| age|gender|    country|diagnosis_date|cancer_stage|family_history|smoking_status| bmi|cholesterol_level|hypertension|asthma|cirrhosis|other_cancer|treatment_type|end_treatment_date|survived|
+---+----+------+-----------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
|  1|64.0|  Male|     Sweden|    2016-04-05|     Stage I|           Yes|Passive Smoker|29.4|              199|           0|     0|        1|           0|  Chemotherapy|        2017-09-10|       0|
|  2|50.0|Female|Netherlands|    2023-04-20|   Stage III|           Yes|Passive Smoker|41.2|              280|           1|     1|        0|           0|       Surgery|        2024-06-17|       1|
|  3|65.0|Femal

In [6]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- diagnosis_date: date (nullable = true)
 |-- cancer_stage: string (nullable = true)
 |-- family_history: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- cholesterol_level: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- cirrhosis: integer (nullable = true)
 |-- other_cancer: integer (nullable = true)
 |-- treatment_type: string (nullable = true)
 |-- end_treatment_date: date (nullable = true)
 |-- survived: integer (nullable = true)



#### Task 1: Write a function that removes duplicate rows, ensures correct data types for numerical and date columns and converts all ‘yes’/ ‘no’ type fields into 1/0 format.

In [8]:
def cleaned_patient_data(df):

    # Remove Duplicates
    df = df.dropDuplicates()

    # ensure correct data type for numerical columns
    
    df = (df.withColumn("age", col("age").cast(IntegerType()))
             .withColumn("bmi", col("bmi").cast(DoubleType()))
             .withColumn("cholesterol_level", col("cholesterol_level").cast(IntegerType())))

    # ensure correct data type for date columns
    date_cols= ["diagnosis_date", "end_treatment_date"]
    for col_name in date_cols:
        df = df.withColumn(col_name, col(col_name).cast(DateType()))
    

    #  converts all ‘yes’/ ‘no’ type fields into 1/0 format
    binary_cols= ["family_history", "hypertension", "asthma", "cirrhosis", "other_cancer", "survived"]
    for col_name in binary_cols:
        df = df.withColumn(col_name, when(lower(col(col_name)) == "yes", 1).when(lower(col(col_name)) == "no", 0)
                           .otherwise(col(col_name).cast(IntegerType())))
            
    return df

In [9]:
# Verify
df_cleaned_patient= cleaned_patient_data(df)
df_cleaned_patient.show(5)

+---+---+------+--------------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
| id|age|gender|       country|diagnosis_date|cancer_stage|family_history|smoking_status| bmi|cholesterol_level|hypertension|asthma|cirrhosis|other_cancer|treatment_type|end_treatment_date|survived|
+---+---+------+--------------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
| 72| 54|Female|Czech Republic|    2017-06-30|    Stage IV|             0|Passive Smoker|24.0|              160|           1|     0|        0|           1|       Surgery|        2019-04-06|       0|
|190| 34|  Male|       Estonia|    2015-02-13|     Stage I|             1| Former Smoker|38.8|              252|           1|     1|        1|           0|      Combined|        2016-12-18|       0|
|404|

In [10]:
df_cleaned_patient.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- diagnosis_date: date (nullable = true)
 |-- cancer_stage: string (nullable = true)
 |-- family_history: integer (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- cholesterol_level: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- cirrhosis: integer (nullable = true)
 |-- other_cancer: integer (nullable = true)
 |-- treatment_type: string (nullable = true)
 |-- end_treatment_date: date (nullable = true)
 |-- survived: integer (nullable = true)



#### Task 2: Write a function that adds a new column, treatment_duration_days, which calculates the number of days between the diagnosis and the end of treatment. Then, return the average treatment duration for each treatment type.

In [21]:
def avg_treatment_duration(df):

    # adding new column: 'treatment_duration_days'
    df = df.withColumn("treatment_duration_days", datediff("end_treatment_date","diagnosis_date"))

    # Calculate average treatment duration for each treatment type
    avg_duration = df.groupBy("treatment_type").agg(bround(avg("treatment_duration_days"), 0).alias("avg_treatment_duration_days")).orderBy("treatment_type")

    return avg_duration

In [23]:
avg_treatment_duration_days = avg_treatment_duration(df_cleaned_patient)

In [25]:
print("average treatment duration for each treatment type:")
avg_treatment_duration_days.show(5)

average treatment duration for each treatment type:
+--------------+---------------------------+
|treatment_type|avg_treatment_duration_days|
+--------------+---------------------------+
|  Chemotherapy|                      458.0|
|      Combined|                      458.0|
|     Radiation|                      458.0|
|       Surgery|                      458.0|
+--------------+---------------------------+



#### Task 3: Write a function that returns the smoking_status group with the highest survival rate.

In [33]:
def highest_survival_rate_by_smoking_status(df):

    # Calculating survival rate (%) for each smoking_status group
    survival_rate = df.groupBy("smoking_status").agg(bround(avg(col("survived")) * 100, 2).alias("survival_rate_percentage")).orderBy(desc("survival_rate_percentage"))

    # Returning the group with the highest survival rate
    top_survival_group = survival_rate.limit(1)

    return top_survival_group

In [35]:
# Display result
highest_survival_group= highest_survival_rate_by_smoking_status(df_cleaned_patient)

highest_survival_group.show()


+--------------+------------------------+
|smoking_status|survival_rate_percentage|
+--------------+------------------------+
|  Never Smoked|                   22.09|
+--------------+------------------------+



#### Task 4: Write a function that returns the top three countries with the highest percentage of patients diagnosed in Stage IV.

In [47]:
def top_countries_stageIV(df):

    # Count total patients per country
    total_patients = df.groupBy("country").agg(count("*").alias("total_patients"))

    # Count Stage IV patients per country
    stageIV_patients = df.filter(col("cancer_stage") == "Stage IV").groupBy("country").agg(count("*").alias("stageIV_Patients"))

    # calculate percentage for each country
    stageIV_percent_country_wise = (stageIV_patients.join(total_patients, on="country", how="inner")
        .withColumn("stageIV_percent",bround((col("stageIV_patients") / col("total_patients")) * 100, 2))
        .orderBy(desc("stageIV_percent")))

    # Return top three countries with the highest percentage of stage IV patients
    return stageIV_percent_country_wise.limit(3)


In [49]:
top_countries_stageIV_result = top_countries_stageIV(df_cleaned_patient)
top_countries_stageIV_result.show()

+--------------+----------------+--------------+---------------+
|       country|stageIV_Patients|total_patients|stageIV_percent|
+--------------+----------------+--------------+---------------+
|        Greece|            8429|         33052|           25.5|
|       Croatia|            8426|         33138|          25.43|
|Czech Republic|            8317|         32885|          25.29|
+--------------+----------------+--------------+---------------+



#### Task 5: Write a function that filters patients who:

a. Are male

b. Diagnosed in Stage III or IV

c. Have a family history of cancer

d. Are current smokers

e. Have a BMI > 30

f. Survived

Return the average age and the percentage of these patients who had hypertension.

In [51]:
def filtered_patients_count(df):
   
    # Applying filters based on given conditions
    filtered_patients = df.filter((col("gender") == "Male") & 
        (col("cancer_stage").isin("Stage III", "Stage IV")) &
        (col("family_history") == 1) &
        (col("smoking_status") == "Current Smoker") &
        (col("bmi") > 30) &
        (col("survived") == 1))

    # Computing average age
    average_age = (filtered_patients.agg(bround(avg("age"), 2).alias("average_age")).collect()[0]["average_age"])

    # Computing percentage with hypertension
    hypertension_patient_percent = (filtered_patients.agg(bround(avg("hypertension") * 100, 2).alias("hypertension_percentage"))
        .collect()[0]["hypertension_percentage"] )

    return average_age, hypertension_patient_percent
    

In [53]:
# display result
average_age, hypertension_patient_percent = filtered_patients_count(df_cleaned_patient)
print(f"Average Age: {average_age}")
print(f"Percentage of patients with Hypertension: {hypertension_patient_percent}%")

Average Age: 55.18
Percentage of patients with Hypertension: 74.77%
