## Part 3) Datasets/DataFrames: Spark ML and Pipelines

This is the Jupyter Notebook for Part 3 of exercise 2.

In the first code cell we simply import necessary libraries and create a SparkSession object with application name 'Ex2_Part3' using SparkSession from the pyspark library

In the following cell the code is identical to part2 of the exercise. For explanation have a look at the jupyter notebook of part2. 

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, ChiSqSelector, RegexTokenizer, CountVectorizer, StringIndexer, Normalizer
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.linalg import Vectors
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
import json
import random
import time
import os

spark = SparkSession \
    .builder \
    .appName("Ex2_Part3") \
    .master("local[*]") \
    .getOrCreate()


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
23/05/18 15:25:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/18 15:25:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/18 15:25:11 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/05/18 15:25:11 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/05/18 15:25:11 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/05/18 15:25:1

In [2]:
input_file = "hdfs:///user/dic23_shared/amazon-reviews/full/reviews_devset.json"
data = spark.read.json(input_file)

notebook_path = os.path.abspath("")

# Load the stopwords into a list
stopwords_file = open(f"{notebook_path}/stopwords.txt", "r")
words_data = stopwords_file.read()
stopwords = words_data.split("\n")
stopwords_file.close()

                                                                                

In the following cell the data is split into training and test sets, with 80% of the data going to the training set and 20% to the test set. A seed is used for reproducible random splitting.

In [3]:
# Split the data into training and test sets
training_data, test_data = data.randomSplit([0.8, 0.2], seed=1205)

In the following two cells the code is identical to part2 of the exercise. For explanation have a look at the jupyter notebook of part2. 

In [4]:
# Build the transformation pipeline
tokenizer = RegexTokenizer(pattern=r'[\s\d\(\)\[\]\{\}\.,!?\-,;:+\=_"\'`~#@&*%€$§\\/]+',
                           inputCol="reviewText",
                           outputCol="tokens")

In [5]:
stop_words_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_words", stopWords=stopwords)
count_vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features", minDF=5)
idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=5)
labelIndexer = StringIndexer(inputCol="category", outputCol="label")
chi_sq_selector = ChiSqSelector(numTopFeatures=2000, featuresCol="features", outputCol="selected_features", labelCol="label")

### New to part 3

This section of the pipeline introduces a normalized Support Vector Machine (SVM) classifier within a OneVsRest strategy for multi-class classification.

The Normalizer streamlines each data sample in the dataset to have a unit norm. It employs L2 normalization according to the p=2.0 parameter.

The Normalizer processes the "selected_features" obtained from the ChiSqSelector output, generating "normalized_features".

In [6]:
normalizer = Normalizer(inputCol="selected_features", outputCol="normalized_features", p=2.0)

For the classification task we use a SVM classifier. It tries to find the best hyperplane to separate different classes. In this case, we use the normalized features as input to the classifier.

In [7]:
svm = LinearSVC(featuresCol="normalized_features")

In the next chunk of code we define OneVsRest. It is a strategy for multi-class classification. It involves training a single classifier per class, with the samples of that class as positive samples and all other samples as negatives. In this case, the binary classifier used is the SVM classifier defined earlier.

In [8]:
ovr = OneVsRest(classifier=svm, labelCol="label", featuresCol="normalized_features")

Now we define the whole pipeline.

In [9]:
pipeline = Pipeline(stages=[tokenizer, stop_words_remover, count_vectorizer, idf, labelIndexer, chi_sq_selector, normalizer, ovr]) 

In the next piece of code we define a grid to fine-tune our machine learning model using Spark's 'ParamGridBuilder()'. 
Parameters to be tuned include the number of top features selected (`numTopFeatures`), the regularization parameter (`regParam`), maximum iterations (`maxIter`), and whether to standardize training features (`standardization`).
We defined several values as given in the exercise. 
The 'build()' function is used to create all possible mixtures of these parameter values, ready for the grid search.

In [10]:
# Define the parameter grid for grid search
paramGrid = ParamGridBuilder() \
    .addGrid(chi_sq_selector.numTopFeatures, [400, 2000]) \
    .addGrid(ovr.getClassifier().regParam, [0.01, 0.1, 0.4]) \
    .addGrid(ovr.getClassifier().maxIter, [7, 14]) \
    .addGrid(ovr.getClassifier().standardization, [False, True]) \
    .build()

In the next piece of code we're initializing a MulticlassClassificationEvaluator. The evaluator will use the F1 score to evaluate the model's performance.

In [11]:
# Set up the evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="f1")


This block of code sets up a Train-Validation Split model selection method. It takes as input the previously defined pipeline estimator, the parameter grid for tuning, and the evaluator for model assessment. The `trainRatio` parameter is set to 0.8, indicating that 80% of the data will be used for training, while the remaining 20% is for validation. The `seed` parameter is set for reproducibility, ensuring that the same split occurs every time the code is run.

In [12]:
# Set up the train-validation split
tvs = TrainValidationSplit(estimator=pipeline, 
                           estimatorParamMaps=paramGrid, 
                           evaluator=evaluator, 
                           trainRatio=0.8, 
                           seed=1205)

Now we train our model using the defined train-validation split. 

In [13]:
# Train the model using the train-validation split
start_time = time.time()
model = tvs.fit(training_data)
end_time = time.time()
print("Training time: {} seconds".format(end_time - start_time))

23/05/18 15:25:52 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/05/18 15:25:52 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/05/18 15:25:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/05/18 15:25:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/05/18 16:12:26 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/18 16:12:43 WARN BlockManager: Asked to remove block broadcast_26562, which does not exist
23/05/18 16:12:43 WARN BlockManager: Asked to remove block broadcast_26562_piece0, which does not exist
23/05/18 16:13:08 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/18 16:13:40 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/18 16:13:55 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaN

Training time: 6664.386291503906 seconds


Now we identify and analyze the top-performing model from our training. 

First, we retrieve the hyperparameters associated with the best model. Then we apply this model to the test data to generate predictions. Lastly, we compute and print out the F1 score, precision, and recall based on these predictions.

In [14]:
# Get the best model
best_model = model.bestModel

# Get the stages of the best model
best_model_stages = best_model.stages

# Get the list of parameter maps
param_maps = tvs.getEstimatorParamMaps()

# Get the validation metrics
validation_metrics = model.validationMetrics

# Get the index of the best model
best_model_index = validation_metrics.index(max(validation_metrics))

# Get the parameter map of the best model
best_model_params = param_maps[best_model_index]

# Print the hyperparameters of the best model
print("Hyperparameters of the best model:")
for param in best_model_params:
    print(f"{param.name}: {best_model_params[param]}")

# Get the validation metric of the best model
best_model_metric = validation_metrics[best_model_index]

# Instantiate evaluators for the different metrics
f1_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="f1")
precision_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall")

# Make predictions on the validation data
predictions = best_model.transform(test_data)

# Get the metrics of the best model
best_model_f1 = f1_evaluator.evaluate(predictions)
best_model_precision = precision_evaluator.evaluate(predictions)
best_model_recall = recall_evaluator.evaluate(predictions)

# Print the metrics
print("\nMetrics of the best model:")
print(f"F1 score: {best_model_f1}")
print(f"Precision: {best_model_precision}")
print(f"Recall: {best_model_recall}")

Hyperparameters of the best model:
numTopFeatures: 2000
regParam: 0.01
maxIter: 14
standardization: True


23/05/18 17:16:34 WARN DAGScheduler: Broadcasting large task binary with size 1283.4 KiB
23/05/18 17:17:10 WARN DAGScheduler: Broadcasting large task binary with size 1283.4 KiB
23/05/18 17:17:21 WARN DAGScheduler: Broadcasting large task binary with size 1283.4 KiB


Metrics of the best model:
F1 score: 0.6069700943577272
Precision: 0.5923226672490524
Recall: 0.6369466800804827


                                                                                

Lastly we print the performance of all the models we trained with the train validation split, to get an idea how important which hyperparameters are. 
The snippet retrieves the list of hyperparameters used during model training and their corresponding validation metrics. It then loops through this list, printing out the F1 score and specific hyperparameters for each model.

In [15]:
# Get the list of hyperparameters and their corresponding validation metrics
param_maps = tvs.getEstimatorParamMaps()
validation_metrics = model.validationMetrics

# Iterate over the list and print the hyperparameters and metrics
for i in range(len(param_maps)):
    print(f"Model {i+1} metrics:")
    print(f"Validation metric (F1 score): {validation_metrics[i]}")
    print("Hyperparameters:")
    for param in param_maps[i]:
        print(f"  {param.name}: {param_maps[i][param]}")
    print("\n")


Model 1 metrics:
Validation metric (F1 score): 0.05181221746653008
Hyperparameters:
  numTopFeatures: 400
  regParam: 0.01
  maxIter: 7
  standardization: False


Model 2 metrics:
Validation metric (F1 score): 0.4546486697658312
Hyperparameters:
  numTopFeatures: 400
  regParam: 0.01
  maxIter: 7
  standardization: True


Model 3 metrics:
Validation metric (F1 score): 0.39225529544944576
Hyperparameters:
  numTopFeatures: 400
  regParam: 0.01
  maxIter: 14
  standardization: False


Model 4 metrics:
Validation metric (F1 score): 0.4529093251337653
Hyperparameters:
  numTopFeatures: 400
  regParam: 0.01
  maxIter: 14
  standardization: True


Model 5 metrics:
Validation metric (F1 score): 0.24522917274215697
Hyperparameters:
  numTopFeatures: 400
  regParam: 0.1
  maxIter: 7
  standardization: False


Model 6 metrics:
Validation metric (F1 score): 0.44388646368741896
Hyperparameters:
  numTopFeatures: 400
  regParam: 0.1
  maxIter: 7
  standardization: True


Model 7 metrics:
Validation

In [16]:
# Stop the Spark session
spark.stop()