In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import warnings
warnings.filterwarnings("ignore")

In [2]:
# Create a Spark session
spark = SparkSession.builder.appName("PySparkMLExample").getOrCreate()

23/11/14 08:38:48 WARN Utils: Your hostname, jordan-Lenovo-Y720-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.50.141 instead (on interface enp3s0)
23/11/14 08:38:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/11/14 08:38:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/14 08:38:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/11/14 08:38:49 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
# Load data
#data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("data.csv")
data = spark.read.csv("data.csv", header=True, inferSchema=True)
data

DataFrame[id: int, diagnosis: string, radius_mean: double, texture_mean: double, perimeter_mean: double, area_mean: double, smoothness_mean: double, compactness_mean: double, concavity_mean: double, concave points_mean: double, symmetry_mean: double, fractal_dimension_mean: double, radius_se: double, texture_se: double, perimeter_se: double, area_se: double, smoothness_se: double, compactness_se: double, concavity_se: double, concave points_se: double, symmetry_se: double, fractal_dimension_se: double, radius_worst: double, texture_worst: double, perimeter_worst: double, area_worst: double, smoothness_worst: double, compactness_worst: double, concavity_worst: double, concave points_worst: double, symmetry_worst: double, fractal_dimension_worst: double, _c32: string]

In [4]:
# performing label encoding on the target variable "diagnosis" by creating a new named "label" (now double dtype)
from pyspark.ml.feature import StringIndexer

string_indexer = StringIndexer(inputCol="diagnosis", outputCol="label")
model = string_indexer.fit(data)
data = model.transform(data)

In [5]:
data.columns

['id',
 'diagnosis',
 'radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'radius_se',
 'texture_se',
 'perimeter_se',
 'area_se',
 'smoothness_se',
 'compactness_se',
 'concavity_se',
 'concave points_se',
 'symmetry_se',
 'fractal_dimension_se',
 'radius_worst',
 'texture_worst',
 'perimeter_worst',
 'area_worst',
 'smoothness_worst',
 'compactness_worst',
 'concavity_worst',
 'concave points_worst',
 'symmetry_worst',
 'fractal_dimension_worst',
 '_c32',
 'label']

In [6]:
# changing the dtype of "label" column from double to integer.
data = data.withColumn("label", data["label"].cast("integer"))

In [7]:
# remove the columns "diagnosis", "id" and _c32
data = data.drop("diagnosis", "id", "_c32")

In [8]:
# print the 30 first values for "label" column
data.select("label").show(30)

+-----+
|label|
+-----+
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    0|
|    0|
|    0|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
+-----+
only showing top 30 rows



In [23]:
data.show(20, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                                                                 |label|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|[17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,0.2419,0.07871,1.095,0.9053,8.589,153.4,0.006399,0.04904,0.05373,0.01587,0.03003,0.006193,25.38,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189]      |1    |
|[20.57,17.77,132.9,1326.0,0.08474,0.07864,0.0869,0.07017,0.1812,0.05667,0.5435,0.7339,3.398,74.

In [10]:
data.printSchema()

root
 |-- radius_mean: double (nullable = true)
 |-- texture_mean: double (nullable = true)
 |-- perimeter_mean: double (nullable = true)
 |-- area_mean: double (nullable = true)
 |-- smoothness_mean: double (nullable = true)
 |-- compactness_mean: double (nullable = true)
 |-- concavity_mean: double (nullable = true)
 |-- concave points_mean: double (nullable = true)
 |-- symmetry_mean: double (nullable = true)
 |-- fractal_dimension_mean: double (nullable = true)
 |-- radius_se: double (nullable = true)
 |-- texture_se: double (nullable = true)
 |-- perimeter_se: double (nullable = true)
 |-- area_se: double (nullable = true)
 |-- smoothness_se: double (nullable = true)
 |-- compactness_se: double (nullable = true)
 |-- concavity_se: double (nullable = true)
 |-- concave points_se: double (nullable = true)
 |-- symmetry_se: double (nullable = true)
 |-- fractal_dimension_se: double (nullable = true)
 |-- radius_worst: double (nullable = true)
 |-- texture_worst: double (nullable = tr

In [11]:
# Prepare data
assembler = VectorAssembler(inputCols=data.columns[:-1], outputCol="features")
data = assembler.transform(data).select("features", "label")

In [12]:
data.select("features").show()

+--------------------+
|            features|
+--------------------+
|[17.99,10.38,122....|
|[20.57,17.77,132....|
|[19.69,21.25,130....|
|[11.42,20.38,77.5...|
|[20.29,14.34,135....|
|[12.45,15.7,82.57...|
|[18.25,19.98,119....|
|[13.71,20.83,90.2...|
|[13.0,21.82,87.5,...|
|[12.46,24.04,83.9...|
|[16.02,23.24,102....|
|[15.78,17.89,103....|
|[19.17,24.8,132.4...|
|[15.85,23.95,103....|
|[13.73,22.61,93.6...|
|[14.54,27.54,96.7...|
|[14.68,20.13,94.7...|
|[16.13,20.68,108....|
|[19.81,22.15,130....|
|[13.54,14.36,87.4...|
+--------------------+
only showing top 20 rows



In [13]:
data.select("label").show()

+-----+
|label|
+-----+
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    0|
+-----+
only showing top 20 rows



In [14]:
std_scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
pca = PCA(k=5, inputCol="scaled_features", outputCol="pca_features")

clf = SparkXGBClassifier(tree_method="hist",
                         device="gpu",  # "cuda" or "gpu"
                         features_col="pca_features",
                         label_col="label")

stages = [std_scaler, pca, clf]

In [15]:
# Create the pipeline
pipeline = Pipeline(stages=stages)

In [16]:
# Define parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(clf.max_depth, [3, 5]) \
    .addGrid(clf.min_child_weight, [1, 3]) \
    .addGrid(clf.subsample, [0.6, 0.8]) \
    .build()

In [17]:
# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

In [18]:
# Define cross-validator
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5)

In [19]:
# Split data into training and test sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

In [20]:
# Train model
model = cv.fit(train_data)

23/11/14 08:40:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/11/14 08:40:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/11/14 08:40:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/11/14 08:40:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
2023-11-14 08:40:29,285 INFO XGBoost-PySpark: _fit Running xgboost-2.0.2 on 1 workers with
	booster params: {'device': 'gpu', 'max_depth': 3, 'min_child_weight': 1, 'objective': 'binary:logistic', 'subsample': 0.6, 'tree_method': 'hist', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2023-11-14 08:40:29,376 INFO SparkXGBClassifier: _skip_stage_level_scheduling Stage-level scheduling in xgboost requires spark version 3.4.0+
2023-11-14 08:40:31,233 INFO XGBoost-PySpark: _train_b

In [21]:
# Make predictions on the test set
predictions = model.transform(test_data)

In [22]:
# Evaluate the model
auc = evaluator.evaluate(predictions)
print(f"areaUnderROC: {auc}")

[Stage 901:>                                                        (0 + 1) / 1]

areaUnderROC: 0.9884381338742392


2023-11-14 08:43:52,619 INFO XGBoost-PySpark: predict_udf CUDF is unavailable, fallback the inference on the CPUs
                                                                                

In [24]:
# Stop the Spark session
spark.stop()