In [69]:
import os,sys
project_root = os.path.abspath(os.path.join(os.getcwd(),".."))

if project_root  not in sys.path:
    sys.path.insert(0,project_root)

from utils import create_spark_session
from schemas import cancer_schema

### Use spark session

In [68]:
spark = create_spark_session("Cancer analysis")


### Load and read dataset using spark,inferschema - cancer_schema

In [33]:

dataset_path = os.path.abspath(os.path.join(os.getcwd(), "..", "datasets", "Cancer_Data.csv"))

cancer_df = spark.read.format('csv')\
            .option("header",True)\
            .schema(cancer_schema)\
            .load(dataset_path)\

cancer_df.show(5,truncate=False)


+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+
|id      |diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave_points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave_points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave_points_worst|symmetry_worst|fractal_dimension_worst|
+--------+---------+-----------+------------+---

# check for null or missing values
### For every column in the DataFrame, count how many values are missing (either null or empty string) and show them in a table.

In [37]:
from pyspark.sql import functions as F

cancer_df.select([F.count(F.when(F.col(c).isNull() | (F.col(c) == ""), c)).alias(c) for c in cancer_df.columns]).show()


+---+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+
| id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave_points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave_points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave_points_worst|symmetry_worst|fractal_dimension_worst|
+---+---------+-----------+------------+--------------+---

# Remove duplicates

In [44]:
total_rows = cancer_df.count()
print(f"Total row count is:{total_rows}")

unique_rows = cancer_df.dropDuplicates().count()
print(f"Total unique rows:{unique_rows}")

duplicate_rows = total_rows - unique_rows
print(f"Total duplicate rows:{duplicate_rows}")

Total row count is:569
Total unique rows:569
Total duplicate rows:0


# EDA 
### Basic structure of the dataset 


In [46]:
print(f"Rows:{cancer_df.count()},Columns:{len(cancer_df.columns)}")
cancer_df.printSchema()


Rows:569,Columns:32
root
 |-- id: string (nullable = true)
 |-- diagnosis: string (nullable = true)
 |-- radius_mean: double (nullable = true)
 |-- texture_mean: double (nullable = true)
 |-- perimeter_mean: double (nullable = true)
 |-- area_mean: double (nullable = true)
 |-- smoothness_mean: double (nullable = true)
 |-- compactness_mean: double (nullable = true)
 |-- concavity_mean: double (nullable = true)
 |-- concave_points_mean: double (nullable = true)
 |-- symmetry_mean: double (nullable = true)
 |-- fractal_dimension_mean: double (nullable = true)
 |-- radius_se: double (nullable = true)
 |-- texture_se: double (nullable = true)
 |-- perimeter_se: double (nullable = true)
 |-- area_se: double (nullable = true)
 |-- smoothness_se: double (nullable = true)
 |-- compactness_se: double (nullable = true)
 |-- concavity_se: double (nullable = true)
 |-- concave_points_se: double (nullable = true)
 |-- symmetry_se: double (nullable = true)
 |-- fractal_dimension_se: double (nullabl

# Descriptive Statistics
### Summary statistics for all numeric columns

In [48]:
cancer_df.describe().show()

[Stage 59:>                                                         (0 + 1) / 1]

+-------+--------------------+---------+------------------+-----------------+-----------------+-----------------+--------------------+-------------------+-------------------+--------------------+--------------------+----------------------+------------------+------------------+------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+-----------------+--------------------+-------------------+-------------------+--------------------+-------------------+-----------------------+
|summary|                  id|diagnosis|       radius_mean|     texture_mean|   perimeter_mean|        area_mean|     smoothness_mean|   compactness_mean|     concavity_mean| concave_points_mean|       symmetry_mean|fractal_dimension_mean|         radius_se|        texture_se|      perimeter_se|          area_se|       smoothness_se|      compactness_se|  

                                                                                

### Target column distribution

In [49]:
cancer_df.groupBy("diagnosis").count().show()

+---------+-----+
|diagnosis|count|
+---------+-----+
|        B|  357|
|        M|  212|
+---------+-----+



# Spark MLlib imports for classification
### StringIndexer – Converts string labels (M, B) into numeric labels for ML algorithms.

### VectorAssembler – Combines multiple feature columns into a single vector column (features).

### StandardScaler – Scales features to have zero mean and unit variance (optional but improves performance).
### LogisticRegression, DecisionTreeClassifier, RandomForestClassifier – Different classification algorithms available in Spark MLlib.

### MulticlassClassificationEvaluator – Measures model performance (accuracy, precision, recall, F1 score).

### Pipeline – Chains all transformation and model steps together into one workflow.

In [79]:

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline




# Convert target column into numeric form, we use StringIndexer
### Convert 'diagnosis' from M/B to numeric (1/0)

In [80]:
label_indexer = StringIndexer(inputCol="diagnosis",outputCol="label")

## Assemble features into a single vector
### Exclude 'diagnosis' ,'id'

In [81]:
feature_cols = [col for col in cancer_df.columns if col not in ("diagnosis","id")]
assembler = VectorAssembler(inputCols=feature_cols,outputCol="features")


### Choose a classfication a model,we will use LogisticRegression,or Decision Tree and Random Forest later

In [82]:
lr = LogisticRegression(featuresCol="features",labelCol="label")

### Use Pipeline to chain all transformations and model steps in one workflow

In [83]:
pipeline = Pipeline(stages=[label_indexer,assembler,lr])

### split cancer_df into train_df,test_df

In [84]:
train_df,test_df = cancer_df.randomSplit([0.8,0.2],seed=42)

### fit the model

In [85]:
model = pipeline.fit(train_df)

                                                                                