<a href="https://colab.research.google.com/github/animesh-11/AI_ML/blob/main/Mini_Assignment_1_Animesh_Kumar.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark



In [1]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("ReadData") \
    .getOrCreate()


try:
    df = spark.read.csv('Lung Cancer.csv', header=True, inferSchema=True)
    print("Successfully read data from 'path/to/your/dataset.csv'")
    df.show(5)
    df.printSchema()
except Exception as e:
    print(f"Error reading data: {e}")
    print("Please ensure the file path is correct and the file exists.")



Successfully read data from 'path/to/your/dataset.csv'
+---+----+------+-----------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
| id| age|gender|    country|diagnosis_date|cancer_stage|family_history|smoking_status| bmi|cholesterol_level|hypertension|asthma|cirrhosis|other_cancer|treatment_type|end_treatment_date|survived|
+---+----+------+-----------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
|  1|64.0|  Male|     Sweden|    2016-04-05|     Stage I|           Yes|Passive Smoker|29.4|              199|           0|     0|        1|           0|  Chemotherapy|        2017-09-10|       0|
|  2|50.0|Female|Netherlands|    2023-04-20|   Stage III|           Yes|Passive Smoker|41.2|              280|           1|     1|        0|           0|    

In [2]:
from pyspark.sql.functions import col, when

def clean_lung_cancer_data(df):
    # 1. Remove duplicate rows
    df_cleaned = df.dropDuplicates()

    # 2. Convert 'yes'/'no' type fields into 1/0 format
    # Identify columns that are likely 'yes'/'no' (e.g., family_history)
    # Based on the schema, 'family_history' is the primary candidate.
    # Other columns like hypertension, asthma, cirrhosis, other_cancer, survived are already int (0/1).

    if 'family_history' in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn(
            'family_history',
            when(col('family_history') == 'Yes', 1)
            .when(col('family_history') == 'No', 0)
            .otherwise(col('family_history'))
            .cast('int')
        )

    # 3. Ensure correct data types for numerical and date columns
    # Based on the initial df.printSchema(), Spark's inferSchema=True has already done a good job:
    # - id: integer
    # - age: double
    # - gender: string
    # - country: string
    # - diagnosis_date: date
    # - cancer_stage: string
    # - family_history: string (will be converted to int above)
    # - smoking_status: string
    # - bmi: double
    # - cholesterol_level: integer
    # - hypertension: integer
    # - asthma: integer
    # - cirrhosis: integer
    # - other_cancer: integer
    # - treatment_type: string
    # - end_treatment_date: date
    # - survived: integer

    # No further explicit casting is needed for numerical and date columns beyond what inferSchema provided,
    # and the family_history conversion handles its type.

    return df_cleaned

# Apply the cleaning function to your DataFrame
df_processed = clean_lung_cancer_data(df)

print("\n--- Processed DataFrame Schema ---")
df_processed.printSchema()

print("\n--- First 5 rows of Processed DataFrame ---")
df_processed.show(5)


--- Processed DataFrame Schema ---
root
 |-- id: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- diagnosis_date: date (nullable = true)
 |-- cancer_stage: string (nullable = true)
 |-- family_history: integer (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- cholesterol_level: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- cirrhosis: integer (nullable = true)
 |-- other_cancer: integer (nullable = true)
 |-- treatment_type: string (nullable = true)
 |-- end_treatment_date: date (nullable = true)
 |-- survived: integer (nullable = true)


--- First 5 rows of Processed DataFrame ---
+---+----+------+--------------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+----

In [3]:
from pyspark.sql.functions import datediff, col, avg

def analyze_treatment_duration(df):
    # Calculate treatment_duration_days
    df_with_duration = df.withColumn(
        "treatment_duration_days",
        datediff(col("end_treatment_date"), col("diagnosis_date"))
    )

    # Calculate the average treatment duration for each treatment type
    average_duration_by_treatment = df_with_duration.groupBy("treatment_type") \
                                                   .agg(avg("treatment_duration_days").alias("average_treatment_duration"))

    return average_duration_by_treatment, df_with_duration


average_duration_df, df_final = analyze_treatment_duration(df_processed)

print("\n DataFrame with Treatment Duration ")
df_final.select("diagnosis_date", "end_treatment_date", "treatment_duration_days", "treatment_type").show(5)

print("\n Average Treatment Duration by Treatment Type ")
average_duration_df.show()


 DataFrame with Treatment Duration 
+--------------+------------------+-----------------------+--------------+
|diagnosis_date|end_treatment_date|treatment_duration_days|treatment_type|
+--------------+------------------+-----------------------+--------------+
|    2017-06-30|        2019-04-06|                    645|       Surgery|
|    2015-02-13|        2016-12-18|                    674|      Combined|
|    2016-07-29|        2017-08-27|                    394|  Chemotherapy|
|    2014-06-16|        2016-01-14|                    577|      Combined|
|    2017-12-25|        2019-03-31|                    461|  Chemotherapy|
+--------------+------------------+-----------------------+--------------+
only showing top 5 rows

 Average Treatment Duration by Treatment Type 
+--------------+--------------------------+
|treatment_type|average_treatment_duration|
+--------------+--------------------------+
|     Radiation|        458.40320462900917|
|  Chemotherapy|        458.395400919099

In [4]:
from pyspark.sql.functions import avg, col, desc

def get_highest_survival_smoking_group(df):
    # Calculate the survival rate for each smoking_status group
    survival_rate_by_smoking = df.groupBy("smoking_status") \
                                 .agg(avg(col("survived")).alias("average_survival_rate"))

    # Find the smoking_status group with the highest survival rate
    highest_survival_group = survival_rate_by_smoking.orderBy(desc("average_survival_rate")) \
                                                   .first()

    if highest_survival_group:
        return highest_survival_group['smoking_status'], highest_survival_group['average_survival_rate']
    else:
        return None, None

# Get the smoking status group with the highest survival rate
highest_smoking_group, highest_survival_rate = get_highest_survival_smoking_group(df_final)

if highest_smoking_group:
    print(f"The smoking status group with the highest survival rate is: '{highest_smoking_group}' with an average survival rate of: {highest_survival_rate:.4f}")
else:
    print("Could not determine the smoking status group with the highest survival rate.")

The smoking status group with the highest survival rate is: 'Never Smoked' with an average survival rate of: 0.2209


In [6]:
from pyspark.sql.functions import col, count, lit

def get_top_countries_stage_iv(df):
    # Calculate total patients per country
    total_patients_per_country = df.groupBy("country").agg(count("id").alias("total_patients"))

    # Calculate patients diagnosed in Stage IV per country
    stage_iv_patients_per_country = df.filter(col("cancer_stage") == "Stage IV") \
                                      .groupBy("country").agg(count("id").alias("stage_iv_patients"))

    # Join the two dataframes to calculate percentage
    country_stage_iv_percentage = total_patients_per_country.join(
        stage_iv_patients_per_country,
        on="country",
        how="left"
    ).fillna(0) # Fill with 0 for countries with no Stage IV patients

    # Calculate the percentage
    country_stage_iv_percentage = country_stage_iv_percentage.withColumn(
        "percentage_stage_iv",
        (col("stage_iv_patients") / col("total_patients")) * 100
    )

    # Order by percentage and get the top 3
    top_3_countries = country_stage_iv_percentage.orderBy(col("percentage_stage_iv").desc()).limit(3)

    return top_3_countries

# Get the top three countries
top_countries_stage_iv_df = get_top_countries_stage_iv(df_final)

print("\n Top 3 Countries with Highest Percentage of Stage IV Diagnoses ")
top_countries_stage_iv_df.show()


 Top 3 Countries with Highest Percentage of Stage IV Diagnoses 
+--------------+--------------+-----------------+-------------------+
|       country|total_patients|stage_iv_patients|percentage_stage_iv|
+--------------+--------------+-----------------+-------------------+
|        Greece|         33052|             8429|  25.50223889628464|
|       Croatia|         33138|             8426| 25.427002233085883|
|Czech Republic|         32885|             8317| 25.291166185190818|
+--------------+--------------+-----------------+-------------------+



In [7]:
from pyspark.sql.functions import col, avg, count

def analyze_filtered_patients(df):
    # Filter patients based on the specified criteria
    filtered_df = df.filter(
        (col("gender") == "Male") &
        (col("cancer_stage").isin("Stage III", "Stage IV")) &
        (col("family_history") == 1) &
        (col("smoking_status") == "Current Smoker") &
        (col("bmi") > 30) &
        (col("survived") == 1)
    )

    # Calculate the average age of these filtered patients
    average_age = filtered_df.agg(avg("age")).collect()[0][0]

    # Calculate the total number of filtered patients
    total_filtered_patients = filtered_df.count()

    # Calculate the number of filtered patients with hypertension
    patients_with_hypertension = filtered_df.filter(col("hypertension") == 1).count()

    # Calculate the percentage of these patients who had hypertension
    percentage_hypertension = (patients_with_hypertension / total_filtered_patients) * 100 if total_filtered_patients > 0 else 0

    return average_age, percentage_hypertension

# Get the results by calling the function on df_final
average_age_result, percentage_hypertension_result = analyze_filtered_patients(df_final)


print(f"Average age of filtered patients: {average_age_result:.2f} years")
print(f"Percentage of filtered patients with hypertension: {percentage_hypertension_result:.2f}%")

Average age of filtered patients: 55.18 years
Percentage of filtered patients with hypertension: 74.77%
