In [8]:
import os
import sys
import glob

# --- Environment Setup (Crucial for Jupyter/PySpark) ---
# Set SPARK_HOME to your Spark installation directory.
# Based on your previous output, it is /home/talentum/spark.
os.environ["SPARK_HOME"] = "/home/talentum/spark"

# Set Python executable for PySpark
# Using 'python3' as confirmed by sys.executable output.
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"

# Add PySpark and Py4J to the Python path
# These lines help Python find the necessary PySpark libraries if not configured globally.
spark_python_path = os.path.join(os.environ["SPARK_HOME"], "python")
pyspark_zip_path = os.path.join(spark_python_path, "lib", "pyspark.zip")

# Dynamically find the py4j zip file (its version number changes)
py4j_zip_files = glob.glob(os.path.join(spark_python_path, "lib", "py4j-*.zip"))
py4j_zip_path = py4j_zip_files[0] if py4j_zip_files else None

if pyspark_zip_path not in sys.path:
    sys.path.insert(0, pyspark_zip_path)
if py4j_zip_path and py4j_zip_path not in sys.path:
    sys.path.insert(0, py4j_zip_path)

# --- End of Environment Setup ---


# Import necessary PySpark modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType

# Initialize SparkSession
# .master("local[*]") tells Spark to run on your local machine using all CPU cores.
# .config("spark.hadoop.fs.defaultFS", "file:///") explicitly tells Spark to use the local filesystem.
spark = SparkSession.builder \
    .appName("LungCancerPreprocessing") \
    .master("local[*]") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .getOrCreate()

# Load the dataset using its absolute path with 'file:///' prefix.
# *** UPDATED PATH TO A NON-SHARED LOCATION FOR RELIABILITY ***
csv_file_path = "file:///home/talentum/spark_data/Lung_Cancer.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# --- Preprocessing Steps ---

# 1. Inspect the schema and some data
print("Original Schema:")
df.printSchema()
print("First 5 rows of original data:")
df.show(5)
print(f"Total rows before preprocessing: {df.count()}")

# 2. Identify Numerical, Categorical, Date and Target Columns
numerical_cols = ['age', 'bmi', 'cholesterol_level', 'hypertension', 'asthma', 'cirrhosis', 'other_cancer']
categorical_cols = ['gender', 'country', 'cancer_stage', 'family_history', 'smoking_status', 'treatment_type']
date_cols = ['diagnosis_date', 'end_treatment_date']
target_col = 'survived'
id_col = 'id'

# Ensure all identified columns actually exist in the DataFrame
existing_columns = df.columns
numerical_cols = [c for c in numerical_cols if c in existing_columns]
categorical_cols = [c for c in categorical_cols if c in existing_columns]
date_cols = [c for c in date_cols if c in existing_columns]
if id_col not in existing_columns:
    id_col = None

print(f"\nNumerical Columns: {numerical_cols}")
print(f"Categorical Columns: {categorical_cols}")
print(f"Date Columns (excluded from this preprocessing for CSV output): {date_cols}")

df_processed = df

# 3. Handle Missing Values
for col_name in numerical_cols:
    df_processed = df_processed.withColumn(col_name, col(col_name).cast(DoubleType()))
    mean_val = df_processed.agg({col_name: "avg"}).collect()[0][0]
    if mean_val is not None:
        df_processed = df_processed.fillna(mean_val, subset=[col_name])

for col_name in categorical_cols:
    df_processed = df_processed.fillna('Unknown', subset=[col_name])

print(f"\nTotal rows after handling missing values (imputed/filled): {df_processed.count()}")


# 4. Feature Engineering / Encoding Categorical Variables
indexers_for_csv = [
    StringIndexer(inputCol=column, outputCol=column + "_indexed", handleInvalid="keep")
    for column in categorical_cols
]

pipeline_for_csv_indexed = Pipeline(stages=indexers_for_csv)
pipeline_model_for_csv_indexed = pipeline_for_csv_indexed.fit(df_processed)
df_indexed = pipeline_model_for_csv_indexed.transform(df_processed)

# 5. Select columns for the final CSV output
final_csv_columns_indexed = []
if id_col:
    final_csv_columns_indexed.append(id_col)
final_csv_columns_indexed.extend(numerical_cols)
final_csv_columns_indexed.extend([c + "_indexed" for c in categorical_cols])
if target_col in existing_columns:
    final_csv_columns_indexed.append(target_col)

final_df_for_csv = df_indexed.select(*final_csv_columns_indexed)


print("\nSchema of data to be saved to CSV:")
final_df_for_csv.printSchema()
print("First 5 rows of data to be saved to CSV (with indexed categories):")
final_df_for_csv.show(5)

# Define the output path to a non-shared directory in your home folder
# *** UPDATED OUTPUT PATH TO A NON-SHARED LOCATION FOR RELIABILITY ***
output_dir_local = "/home/talentum/spark_data/preprocessed_lung_cancer_output" # New sub-directory name for clarity
output_path_vm_absolute = "file://" + output_dir_local # Build file:// path

# Ensure the output directory exists
os.makedirs(output_dir_local, exist_ok=True)

# Save the processed data as a single CSV file
final_df_for_csv.coalesce(1).write.csv(output_path_vm_absolute, header=True, mode="overwrite")

# Stop SparkSession
spark.stop()

print(f"\nProcessed data saved to: {output_path_vm_absolute}")
print(f"You will find the actual CSV file (e.g., part-00000-....csv) inside the '{output_dir_local}' directory.")
print("You will need to manually copy this file from your VM's home directory to your shared folder if you want to access it from Windows.")

Original Schema:
root
 |-- id: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- diagnosis_date: timestamp (nullable = true)
 |-- cancer_stage: string (nullable = true)
 |-- family_history: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- cholesterol_level: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- cirrhosis: integer (nullable = true)
 |-- other_cancer: integer (nullable = true)
 |-- treatment_type: string (nullable = true)
 |-- end_treatment_date: timestamp (nullable = true)
 |-- survived: integer (nullable = true)

First 5 rows of original data:
+---+----+------+-----------+-------------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+-------------------+--------+
| id| age|gende

In [3]:
import sys
print(sys.executable)

/usr/bin/python3
