In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT, DenseVector
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType, StringType, ArrayType

# from pyspark.ml.classification import XGBoostClassifier
# from pyspark.ml import Pipeline

from xgboost.spark import SparkXGBClassifier

import numpy as np

In [2]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
spark = SparkSession.builder.appName("XGBoost").getOrCreate()

23/12/01 10:49:50 WARN Utils: Your hostname, NN.local resolves to a loopback address: 127.0.0.1; using 192.168.0.191 instead (on interface en0)
23/12/01 10:49:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/01 10:49:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
data = spark.read.csv("../data/distilbert_imdb2.csv", header=True, inferSchema=True, sep=',')
data = data.withColumnRenamed("sentiment","label")

                                                                                

In [5]:
def parser(x):
    if x is None:
        return None
    elements = x.strip('[]').split(' ')
    result = [float(i) for i in elements if i.strip() != '']
    return (result) if result else None

parse_embedding_udf = udf(lambda x: parser(x), ArrayType(FloatType()))
data = data.withColumn("parsed_embeddings", parse_embedding_udf(data["embeddings_distilbert"]))

In [6]:
data = data.drop("embeddings_distilbert", "reviews_pre")

In [7]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

# UDF to convert array to Vector
vector_udf = udf(lambda a: Vectors.dense(a), VectorUDT())
data = data.withColumn("parsed_embeddings_vector", vector_udf(data["parsed_embeddings"]))

In [8]:
feature_cols = data.columns[1:-2] + data.columns[-1:]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(data)

                                                                                

In [9]:
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

In [10]:
# Create an XGBoost classifier
xgboost = SparkXGBClassifier(
    features_col="features",
    label_col="label",
    prediction_col="prediction",
    eval_metric="logloss",  # Evaluation metric
    max_depth=6
)

In [11]:
# Fit the model to the training data
model = xgboost.fit(train_data)

[Stage 3:>                                                          (0 + 8) / 8]

CodeCache: size=131072Kb used=25739Kb max_used=25890Kb free=105332Kb
 bounds [0x00000001089d8000, 0x000000010a358000, 0x00000001109d8000]
 total_blobs=10142 nmethods=9179 adapters=874
 compilation: disabled (not enough contiguous free space left)


2023-12-01 10:50:28,278 INFO XGBoost-PySpark: _fit Running xgboost-2.0.1 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 6, 'eval_metric': 'logloss', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[10:50:51] task 0 got new rank 0                                    (0 + 1) / 1]
2023-12-01 10:51:25,192 INFO XGBoost-PySpark: _fit Finished xgboost training!   


In [12]:
# Make predictions on the test data
predictions = model.transform(test_data)

In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

2023-12-01 10:51:51,138 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2023-12-01 10:51:51,745 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2023-12-01 10:51:51,749 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2023-12-01 10:51:51,763 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2023-12-01 10:51:51,765 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2023-12-01 10:51:51,896 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2023-12-01 10:51:51,897 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2023-12-01 10:51:51,904 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
                                                                                

AUC: 0.8812946012543919


In [15]:
model_path = "../models/pyspark_distilbert_XGB_model"
model.save(model_path)

In [16]:
spark.stop()