<a href="https://colab.research.google.com/github/hargagan/EDA-NYC-Taxi-Data-Analysis/blob/main/pyspark/graded/Mini_Assignment_1_Har_Gagan_Sahai.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cancerRecord').getOrCreate()

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [30]:
df_lungcancer = spark.read.csv('/content/drive/MyDrive/Assignments/EDA/Lung Cancer.csv', header=True)

In [16]:
df_lungcancer.show(5)


+---+----+------+-----------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
| id| age|gender|    country|diagnosis_date|cancer_stage|family_history|smoking_status| bmi|cholesterol_level|hypertension|asthma|cirrhosis|other_cancer|treatment_type|end_treatment_date|survived|
+---+----+------+-----------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
|  1|64.0|  Male|     Sweden|    2016-04-05|     Stage I|           Yes|Passive Smoker|29.4|              199|           0|     0|        1|           0|  Chemotherapy|        2017-09-10|       0|
|  2|50.0|Female|Netherlands|    2023-04-20|   Stage III|           Yes|Passive Smoker|41.2|              280|           1|     1|        0|           0|       Surgery|        2024-06-17|       1|
|  3|65.0|Femal

In [17]:
df_lungcancer.printSchema()

root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- diagnosis_date: string (nullable = true)
 |-- cancer_stage: string (nullable = true)
 |-- family_history: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- bmi: string (nullable = true)
 |-- cholesterol_level: string (nullable = true)
 |-- hypertension: string (nullable = true)
 |-- asthma: string (nullable = true)
 |-- cirrhosis: string (nullable = true)
 |-- other_cancer: string (nullable = true)
 |-- treatment_type: string (nullable = true)
 |-- end_treatment_date: string (nullable = true)
 |-- survived: string (nullable = true)



####**Task 1:** Write a function that removes duplicate rows, ensures correct data types for numerical and date columns and converts all ‘yes’/ ‘no’ type fields into 1/0 format.  

In [31]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType, DoubleType, DateType

def clean_data(df):
  # Add a check for None input to provide a more descriptive error if df is not loaded correctly.
  if df is None:
    raise ValueError("Input DataFrame is None. Please ensure df_lungcancer is loaded correctly by running previous cells.")

  # Remove duplicate rows
  df = df.dropDuplicates()

  # Convert columns to appropriate numerical types
  # Columns that are float-like strings but represent numerical values
  float_cols = ["age", "bmi", "cholesterol_level"]
  for c in float_cols:
    df = df.withColumn(c, col(c).cast(DoubleType()))

  # Columns that represent boolean or integer counts
  int_cols = ["hypertension", "asthma", "cirrhosis", "other_cancer", "survived"]
  for c in int_cols:
    df = df.withColumn(c, col(c).cast(IntegerType()))

  # Convert 'yes'/'no' type fields into 1/0 format. Using 'Yes' based on sample data.
  df = df.withColumn("family_history", when(col("family_history") == "Yes", 1).otherwise(0).cast(IntegerType()))

  # Convert diagnosis_date and end_treatment_date to date type
  df = df.withColumn("diagnosis_date", F.to_date(col("diagnosis_date")).cast(DateType()))
  df = df.withColumn("end_treatment_date", F.to_date(col("end_treatment_date")).cast(DateType()))

  return df

df_lungcancer_clean = clean_data(df_lungcancer)
df_lungcancer_clean.show(5)

+----+----+------+--------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
|  id| age|gender| country|diagnosis_date|cancer_stage|family_history|smoking_status| bmi|cholesterol_level|hypertension|asthma|cirrhosis|other_cancer|treatment_type|end_treatment_date|survived|
+----+----+------+--------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
| 441|55.0|Female| Austria|    2015-06-07|     Stage I|             1|Current Smoker|31.7|            245.0|           1|     0|        0|           0|      Combined|        2017-01-30|       1|
| 552|49.0|Female|Portugal|    2019-01-02|    Stage II|             0| Former Smoker|25.3|            207.0|           0|     0|        0|           0|      Combined|        2020-05-29|       0|
|1046|51.0|  Male| German

####**Task 2:** Write a function that adds a new column, treatment_duration_days, which calculates the number of days between the diagnosis and the end of treatment. Then, return the average treatment duration for each treatment type.  

In [32]:
def add_duration_column(df):
  df = df.withColumn("treatment_duration_days", F.datediff(col("end_treatment_date"), col("diagnosis_date")))
  return df

df_lungcancer_clean = add_duration_column(df_lungcancer_clean)
df_lungcancer_clean.show(5)

+----+----+------+--------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+-----------------------+
|  id| age|gender| country|diagnosis_date|cancer_stage|family_history|smoking_status| bmi|cholesterol_level|hypertension|asthma|cirrhosis|other_cancer|treatment_type|end_treatment_date|survived|treatment_duration_days|
+----+----+------+--------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+-----------------------+
| 441|55.0|Female| Austria|    2015-06-07|     Stage I|             1|Current Smoker|31.7|            245.0|           1|     0|        0|           0|      Combined|        2017-01-30|       1|                    603|
| 552|49.0|Female|Portugal|    2019-01-02|    Stage II|             0| Former Smoker|25.3|            207.0|           0|   

#### **Task 3:** Write a function that returns the smoking_status group with the highest survival rate.  

In [33]:
def smoking_status_with_highest_survival(df):
  df.createOrReplaceTempView("lung_cancer")
  df_smoking_status = spark.sql("""
    SELECT smoking_status, ROUND(AVG(survived), 2) AS survival_rate
    FROM lung_cancer
    GROUP BY smoking_status
    ORDER BY survival_rate DESC
    LIMIT 1
  """)
  return df_smoking_status

df_smoking_status = smoking_status_with_highest_survival(df_lungcancer_clean)
df_smoking_status.show()

+--------------+-------------+
|smoking_status|survival_rate|
+--------------+-------------+
|  Never Smoked|         0.22|
+--------------+-------------+



####**Task 4:** Write a function that returns the top three countries with the highest percentage of patients diagnosed in Stage IV.

In [27]:
def top_3_countries_with_highest_stage_IV(df):
  df.createOrReplaceTempView("lung_cancer")
  df_top_three_countries = spark.sql ("""
    SELECT
    Country,
    ROUND(100.0 * SUM(CASE WHEN cancer_stage = 'Stage IV' THEN 1 ELSE 0 END) / COUNT(*), 2) AS stage_iv_percentage
    FROM lung_cancer
    GROUP BY Country
    ORDER BY stage_iv_percentage DESC
    LIMIT 3
    """)
  return df_top_three_countries
df_top_three_countries = top_3_countries_with_highest_stage_IV(df_lungcancer_clean)
df_top_three_countries.show(5)

+--------------+-------------------+
|       Country|stage_iv_percentage|
+--------------+-------------------+
|        Greece|              25.50|
|       Croatia|              25.43|
|Czech Republic|              25.29|
+--------------+-------------------+



####**Task 5:** Write a function that filters patients who:  

Are male  

Diagnosed in Stage III or IV  

Have a family history of cancer  

Are current smokers  

Have a BMI > 30  

Survived

Return the **average age** and the **percentage of these patients who had hypertension**.

In [29]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, when, avg, count, round
def filter_patients_and_more(df):
  df_filtered = df.filter(
                (col("gender") == "Male") &
                (col("cancer_stage").isin('Stage III', 'Stage IV')) &
                (col("family_history") == 1) &
                (col("smoking_status") == "Current Smoker") &
                (col("bmi") > 30) &
                (col("survived") == 1))
  # Now calculate average age and percentage of these patients who has hypertension
  df_avg_age_percent_hypertension = df_filtered.agg(
        round(avg("Age"), 2).alias("average_age"),
        round(
            100 * (count(when(col("Hypertension") == 1, 1)) / count("*")),
            2
        ).alias("hypertension_percentage")
    )
  #df_filtered.show(5)
  return df_avg_age_percent_hypertension

df_evaluated = filter_patients_and_more(df_lungcancer_clean)
df_evaluated.show()

+-----------+-----------------------+
|average_age|hypertension_percentage|
+-----------+-----------------------+
|      55.18|                  74.77|
+-----------+-----------------------+

