In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
spark = SparkSession.builder.appName("CancerDiagnosis").getOrCreate()

In [None]:
df = spark.read.csv("CD_data.csv", header=True, inferSchema=True)

In [None]:
feature_columns = ['Radius_mean', 'Texture_mean', 'perimeter_mean', 'area_mean', 'smoothness_mean', 'compactness_mean',
                   'concavity_mean', 'concave points_mean', 'symmetry_mean', 'fractal_dimension_mean', 'radius_se',
                   'texture_se', 'perimeter_se', 'area_se', 'smoothness_se', 'compactness_se', 'concavity_se',
                   'concave points_se', 'symmetry_se', 'fractal_dimension_se', 'radius_worst', 'texture_worst',
                   'perimeter_worst', 'area_worst', 'smoothness_worst', 'compactness_worst', 'concavity_worst',
                   'concave points_worst', 'symmetry_worst', 'fractal_dimension_worst']

label_col = 'diagnosis'

In [None]:
df = df.withColumn(label_col, df[label_col].cast(StringType()))

In [None]:
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [None]:
label_indexer = StringIndexer(inputCol=label_col, outputCol="label")

In [None]:
rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="label")

In [None]:
pipeline = Pipeline(stages=[vector_assembler, label_indexer, rf_classifier])

In [None]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
model = pipeline.fit(train_data)

In [None]:
predictions = model.transform(test_data)

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC curve: {auc}")

In [None]:
feature_importances = model.stages[-1].featureImportances
print("Feature Importances:")
for i, importance in enumerate(feature_importances):
    print(f"Feature {feature_columns[i]}: {importance}")

In [None]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = multi_evaluator.evaluate(predictions)
print(f"Weighted Precision: {precision}")

In [None]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = multi_evaluator.evaluate(predictions)
print(f"Weighted Recall: {recall}")

In [None]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = multi_evaluator.evaluate(predictions)
print(f"F1-Score: {f1_score}")

In [None]:
param_grid = ParamGridBuilder().addGrid(rf_classifier.numTrees, [10, 20, 30]).build()

In [None]:
cross_validator = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

In [None]:
cv_model = cross_validator.fit(train_data)
best_model = cv_model.bestModel

In [None]:
spark.stop()