# Diabetes prediction: gradient boosting model with XGBoost & Spark

## 1. Notebooks set-up

In [1]:
import pickle
import pandas as pd
from pyspark.sql import SparkSession
from sklearn.model_selection import train_test_split
from sklearn.impute import KNNImputer
from xgboost.spark import SparkXGBRegressor

spark=SparkSession\
    .builder\
    .master('spark://0.0.0.0:7077')\
    .appName('GradientBoostingClassifier')\
    .getOrCreate()

25/03/16 02:56:23 WARN Utils: Your hostname, codespaces-b9e35a resolves to a loopback address: 127.0.0.1; using 10.0.2.147 instead (on interface eth0)
25/03/16 02:56:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/16 02:56:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 2. Data preparation

### 2.1. Data loading

In [2]:
url='https://raw.githubusercontent.com/4GeeksAcademy/decision-tree-project-tutorial/main/diabetes.csv'
data_df=pd.read_csv(url)
data_df.drop_duplicates().reset_index(drop=True, inplace=True)
data_df.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


### 2.2. Train-test split

In [3]:
training_df, testing_df=train_test_split(
    data_df,
    test_size=0.25,
    random_state=315
)

### 2.3. Imputation of zeros

In [4]:
imputed_features=['Insulin','SkinThickness','BloodPressure','BMI','Glucose']
knn_imputer=KNNImputer(missing_values=0.0, weights='distance')
knn_imputer.fit(training_df[imputed_features])
training_df[imputed_features]=knn_imputer.transform(training_df[imputed_features])
testing_df[imputed_features]=knn_imputer.transform(testing_df[imputed_features])

### 2.4. Convert to Spark dataframe

In [5]:
training_sdf=spark.createDataFrame(training_df)
testing_sdf=spark.createDataFrame(testing_df)

training_sdf.show()

                                                                                

+-----------+------------------+-------------+------------------+------------------+----+------------------------+---+-------+
|Pregnancies|           Glucose|BloodPressure|     SkinThickness|           Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+------------------+-------------+------------------+------------------+----+------------------------+---+-------+
|          4|             110.0|         92.0| 30.53953744302311|139.60582970412264|37.6|                   0.191| 30|      0|
|          0|             111.0|         65.0| 20.26110582322676| 145.4968462354687|24.6|                    0.66| 31|      0|
|          9|             122.0|         56.0|31.994558582756465| 122.4768621322398|33.3|                   1.114| 33|      1|
|          1|             128.0|         98.0|              41.0|              58.0|32.0|                   1.321| 33|      1|
|          1|             172.0|         68.0|              49.0|             579.0|42.4|                   0.7

## 3. XGBoost model

In [6]:
label_name = 'Outcome'

# get a list with feature column names
feature_names = [x.name for x in training_sdf.schema if x.name != label_name]

# create a xgboost pyspark regressor estimator and set device="cuda"
regressor = SparkXGBRegressor(
    features_col=feature_names,
    label_col=label_name,
    num_workers=1
)

# train and return the model
model = regressor.fit(training_sdf)

# predict on test data
predict_df = model.transform(testing_sdf)
predict_df.show()

2025-03-16 02:56:38,933 INFO XGBoost-PySpark: _fit Running xgboost-3.0.0 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
25/03/16 02:56:41 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3) (10.0.2.147 executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1231, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 912, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(3, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1231, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 912, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 529, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in read_command
    command = serializer._read_with_length(file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 174, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'pandas'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:751)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2228)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3054)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


## End SparkSession

In [None]:
spark.stop()