In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder \
    .appName("LogisticRegression with PySpark MLlib") \
    .getOrCreate()

24/09/27 15:19:25 WARN Utils: Your hostname, AI-CJB-LAP-459 resolves to a loopback address: 127.0.1.1; using 192.168.1.164 instead (on interface wlp0s20f3)
24/09/27 15:19:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/27 15:19:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [23]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

In [36]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("diagnosis", StringType(), True),
    StructField("radius_mean", FloatType(), True),
    StructField("texture_mean", FloatType(), True),
    StructField("perimeter_mean", FloatType(), True),
    StructField("area_mean", FloatType(), True),
    StructField("smoothness_mean", FloatType(), True),
    StructField("compactness_mean", FloatType(), True),
    StructField("concavity_mean", FloatType(), True),
    StructField("concave points_mean", FloatType(), True),
    StructField("symmetry_mean", FloatType(), True),
    StructField("fractal_dimension_mean", FloatType(), True),
    StructField("radius_se", FloatType(), True),
    StructField("texture_se", FloatType(), True),
    StructField("perimeter_se", FloatType(), True),
    StructField("area_se", FloatType(), True),
    StructField("smoothness_se", FloatType(), True),
    StructField("compactness_se", FloatType(), True),
    StructField("concavity_se", FloatType(), True),
    StructField("concave points_se", FloatType(), True),
    StructField("symmetry_se", FloatType(), True),
    StructField("fractal_dimension_se", FloatType(), True),
    StructField("radius_worst", FloatType(), True),
    StructField("texture_worst", FloatType(), True),
    StructField("perimeter_worst", FloatType(), True),
    StructField("area_worst", FloatType(), True),
    StructField("smoothness_worst", FloatType(), True),
    StructField("compactness_worst", FloatType(), True),
    StructField("concavity_worst", FloatType(), True),
    StructField("concave points_worst", FloatType(), True),
    StructField("symmetry_worst", FloatType(), True),
    StructField("fractal_dimension_worst", FloatType(), True),
    StructField("_c32 ",StringType(),True)
])


In [21]:
import requests
import tempfile
from io import StringIO

In [43]:
df=spark.read.csv("data.csv",header=True,inferSchema=True)
# df.show(5)
df=df.drop("_c32")
df.show(5)

+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+
|      id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|
+--------+---------+-----------+------------+---

In [51]:
# Rename the columns for better readability
columns = ['id', 'diagnosis'] + [f'feature_{i}' for i in range(1, 31)]
data = df.toDF(*columns)
# data.show()
# 
#Map 'M' (malignant) to 1 and 'B' (benign) to 0
data = data.withColumn("label", (data["diagnosis"] == "M").cast("integer")).drop("diagnosis")

feature_columns = [f'feature_{i}' for i in range(1, 25)]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

data = assembler.transform(data)

train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [52]:
logistic_regression = LogisticRegression(featuresCol="features", labelCol="label")
model = logistic_regression.fit(train_data)

24/09/27 17:01:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [53]:
coefficients = model.coefficients
intercept = model.intercept

print("Coefficients: ", coefficients)
print("Intercept: {:.3f}".format(intercept))

Coefficients:  [-2.0108645643741054,-0.33157920470462404,-0.7534866037350904,0.01838471570516052,70.84419799144752,-152.3553523507126,104.52830085756855,88.86169536342123,61.07570550895772,819.0310909016275,28.221271198662986,-6.976363503263637,-6.722500793225637,0.48279541725268227,261.5296572323381,77.51812188280694,-119.2654382087256,640.1628849202443,114.26164350307137,-2178.1532266520385,-1.411484355635807,1.2113816280674292,0.9059950938956373,0.02056295264439967]
Intercept: -95.705


In [54]:
predictions = model.transform(test_data)

# AUC-ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
auc = evaluator.evaluate(predictions)

# Accuracy, Precision, and Recall
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})

print(f"AUC-ROC: {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

AUC-ROC: 0.9989
Accuracy: 0.9651
Precision: 0.9653
Recall: 0.9651
