# Installing Spark, and Findspark

In [1]:
# Installing pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=343aa64d2e8b6340e1dd526e95d0b6222d0c743565a0e29bb4fd454db85a7d3e
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [5]:
# Installing findspark
!pip install -q findspark

# Importing necessary packages

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, datediff, lit, concat_ws, collect_set, min, max, element_at, expr
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import os
import pandas as pd
import shutil
import glob

#Initializing Spark session

In [6]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

#PySpark code

In [8]:
# Creating a SparkSession
spark = SparkSession.builder.appName("CancerDataAnalysis").getOrCreate()

# At the beginning of your script, after creating the SparkSession:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Read the CSV file
data = spark.read.csv("/content/drive/MyDrive/CareSet-Solution/Files/CancerData0.csv", header=True, inferSchema=True)

# Select needed columns
needed_columns = ['patient_ID', 'drug_date', 'drug', 'days_of_supply', 'class', 'drug_start_date', 'drug_end_date']
cancer_data = data.select(needed_columns)

# Convert date columns
cancer_data = cancer_data.withColumn("drug_date", to_date(col("drug_date"), "dd-MMM-yy"))
cancer_data = cancer_data.withColumn("drug_start_date", to_date(col("drug_start_date"), "M/d/yy"))
cancer_data = cancer_data.withColumn("drug_end_date", to_date(col("drug_end_date"), "M/d/yy"))

# Replace '.' with 0 in days_of_supply and convert to integer
cancer_data = cancer_data.withColumn("days_of_supply",
    when(col("days_of_supply") == ".", 0).otherwise(col("days_of_supply")).cast("int"))

# Handle missing start and end dates
cancer_data = cancer_data.withColumn("drug_start_date",
    when((col("days_of_supply") == 0) & col("drug_start_date").isNull(), col("drug_date"))
    .when((col("days_of_supply") != 0) & col("drug_start_date").isNull(), col("drug_date"))
    .otherwise(col("drug_start_date")))

cancer_data = cancer_data.withColumn("drug_end_date",
    when((col("days_of_supply") == 0) & col("drug_end_date").isNull(), col("drug_date"))
    .when((col("days_of_supply") != 0) & col("drug_end_date").isNull(),
          expr('date_add(drug_date, days_of_supply)'))
    .otherwise(col("drug_end_date")))

# Check and adjust drug_end_date
cancer_data = cancer_data.withColumn("drug_end_date",
    when(datediff(col("drug_end_date"), col("drug_start_date")) != col("days_of_supply"),
         expr('date_add(drug_date, days_of_supply)'))
    .otherwise(col("drug_end_date")))

# Sort the data
cancer_data = cancer_data.orderBy("patient_ID", "drug_start_date", "drug_end_date")

# Define window for lag operations
window = Window.partitionBy("patient_ID").orderBy("drug_start_date")

# Calculate gap and add lot_num
cancer_data = cancer_data.withColumn("prev_end_date", F.lag("drug_end_date").over(window))
cancer_data = cancer_data.withColumn("gap",
    when(col("prev_end_date").isNull(), 0)
    .otherwise(datediff(col("drug_start_date"), col("prev_end_date"))))

cancer_data = cancer_data.withColumn("lot_num",
    F.sum(when(col("gap") > 45, 1).otherwise(0)).over(window) + 1)

# Calculate type1, type2, flag, and type
cancer_data = cancer_data.withColumn("lot_start_date",
    F.first("drug_start_date").over(Window.partitionBy("patient_ID", "lot_num")))

cancer_data = cancer_data.withColumn("type1",
    when(col("gap") > 45, 1).otherwise(0))

cancer_data = cancer_data.withColumn("type2",
    when((datediff(col("drug_start_date"), col("lot_start_date")) <= 29) &
         (~F.lag("class").over(window).contains(col("class"))), 1)
    .otherwise(0))

cancer_data = cancer_data.withColumn("flag",
    when((col("lot_num") > 1) | (col("type2") == 1), 1).otherwise(0))

cancer_data = cancer_data.withColumn("type",
    when((col("lot_num") > 1) | (col("type2") == 1), 1).otherwise(0))

# Calculate cumulative drug names
cancer_data = cancer_data.withColumn("cumulative_drug_names",
    F.collect_set("drug").over(Window.partitionBy("patient_ID", "lot_num")
                               .orderBy("drug_start_date")
                               .rangeBetween(Window.unboundedPreceding, 0)))

cancer_data = cancer_data.withColumn("cumulative_drug_names",
    concat_ws(" + ", F.sort_array("cumulative_drug_names")))

def save_as_single_csv(df, output_path):
    # Save to a temporary directory
    temp_dir = output_path + "_temp"

    # Check if the temporary directory exists and remove it if it does
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)  # Remove the existing directory

    # Use coalesce(1) to ensure a single partition and set maxRecordsPerFile to a large number
    df.coalesce(1).write.option("header", "true").csv(temp_dir)

    # Find all CSV files in the temp directory (excluding _SUCCESS file)
    csv_files = [f for f in glob.glob(os.path.join(temp_dir, "*.csv")) if not f.endswith("_SUCCESS")]

    if csv_files:
        # Read and combine all CSV files
        combined_df = pd.concat([pd.read_csv(f) for f in csv_files])

        # Sort the combined DataFrame
        if 'patient_ID' in combined_df.columns and 'drug_start_date' in combined_df.columns:
            combined_df['drug_start_date'] = pd.to_datetime(combined_df['drug_start_date'])
            combined_df = combined_df.sort_values(['patient_ID', 'drug_start_date'])
        elif 'patient_ID' in combined_df.columns and 'line_start_date' in combined_df.columns:
            combined_df['line_start_date'] = pd.to_datetime(combined_df['line_start_date'])
            combined_df = combined_df.sort_values(['patient_ID', 'line_start_date'])

        # Write the combined DataFrame to a single CSV file
        combined_df.to_csv(output_path, index=False)

        # Remove the temporary directory
        shutil.rmtree(temp_dir)

        print(f"File saved: {output_path}")
    else:
        print("No CSV files found in the temporary directory.")

# sorting output data by date and patient id
cancer_data = cancer_data.orderBy("patient_ID", "drug_start_date")

# Save output data to a single CSV file
output_path = "output_data.csv"
save_as_single_csv(cancer_data, output_path)

# Create summary data
summary_data = cancer_data.groupBy("patient_ID", "lot_num").agg(
    F.min("drug_start_date").alias("line_start_date"),
    concat_ws(" + ", F.collect_set("drug")).alias("line_regimen"),
    concat_ws(" + ", F.collect_set("class")).alias("trgt_category_regimen")
)

summary_data = summary_data.withColumnRenamed("lot_num", "line_num")

# sorting summary data by date and patient id
summary_data = summary_data.orderBy("patient_ID", "line_start_date")

# Save summary data to a single CSV file
summary_path = "summary_details.csv"
save_as_single_csv(summary_data, summary_path)

# Show summary data
summary_data.show()

File saved: output_data.csv
File saved: summary_details.csv
+----------+--------+---------------+--------------------+---------------------+
|patient_ID|line_num|line_start_date|        line_regimen|trgt_category_regimen|
+----------+--------+---------------+--------------------+---------------------+
| PATID_001|       1|     2013-12-05|PACLITAXEL + LETR...|        CHEMO + HORMO|
| PATID_001|       2|     2014-03-30|PACLITAXEL + CARB...|                CHEMO|
| PATID_001|       3|     2014-10-25|PACLITAXEL + CARB...|                CHEMO|
| PATID_001|       4|     2015-01-30|PACLITAXEL + CARB...|                CHEMO|
| PATID_001|       5|     2015-03-19|PACLITAXEL + CARB...|                CHEMO|
| PATID_001|       6|     2015-07-14|PACLITAXEL + CARB...|                CHEMO|
| PATID_001|       7|     2015-09-08|PACLITAXEL + CARB...|                CHEMO|
| PATID_001|       8|     2015-12-29|PACLITAXEL + CARB...|                CHEMO|
| PATID_001|       9|     2016-06-14|PACLITAXEL +