## Part 2 Datasets/DataFrames: Spark ML and Pipelines
Convert the review texts to a classic vector space representation with TFIDF-weighted features based on the Spark DataFrame/Dataset API by building a transformation pipeline. The primary goal of this part is the preparation of the pipeline for Part 3 (see below). Note: although parts of this pipeline will be very similar to Assignment 1 or Part 1 above, do not expect to obtain identical results or have access to all intermediate outputs to compare the individual steps.

Use built-in functions for tokenization to unigrams at whitespaces, tabs, digits, and the delimiter characters ()[]{}.!?,;:+=-_"'`~#@&*%€$§\/, casefolding, stopword removal, TF-IDF calculation, and chi square selection ) (using 2000 top terms overall). Write the terms selected this way to a file output_ds.txt and compare them with the terms selected in Assignment 1. Describe your observations briefly in the submission report (see Part 3).

[Provided link for ML pipeline](https://spark.apache.org/docs/latest/ml-pipeline.html)  
[Provided link for feature extraction](https://spark.apache.org/docs/latest/ml-features.html)

## Imports

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import UnivariateFeatureSelector, ChiSqSelectorModel 
from pyspark.sql import Row

from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.feature import Normalizer

from datetime import datetime

In [2]:
import warnings
warnings.filterwarnings(action='once')

## Initiliaze Spark

In [3]:
# Initialize Spark context and session
conf = SparkConf().setAppName("Part2")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
24/05/21 20:09:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/21 20:09:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/05/21 20:09:50 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/05/21 20:09:53 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
----------------------------------------
Exception occurred during process

In [4]:
spark

### Importing data
Stopwords file as well as test data

In [5]:
# Define the stopwords file and the counters file
stopwords_file = "stopwords.txt"

# Load stopwords into a set
with open(stopwords_file, "r") as f:
    stopwords = set(f.read().strip().split())
    
# Load and preprocess the Amazon reviews dataset
input_file = "hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json"
reviews_df = spark.read.json(input_file)

24/05/21 20:10:42 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/05/21 20:10:57 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/05/21 20:11:12 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/05/21 20:11:27 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/05/21 20:11:42 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/05/21 20:11:57 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registere

In [6]:
reviews_df = reviews_df.select("category", "reviewText")

# Show the DataFrame with selected columns
reviews_df.show()

+--------------------+--------------------+
|            category|          reviewText|
+--------------------+--------------------+
|Patio_Lawn_and_Garde|This was a gift f...|
|Patio_Lawn_and_Garde|This is a very ni...|
|Patio_Lawn_and_Garde|The metal base wi...|
|Patio_Lawn_and_Garde|For the most part...|
|Patio_Lawn_and_Garde|This hose is supp...|
|Patio_Lawn_and_Garde|This tool works v...|
|Patio_Lawn_and_Garde|This product is a...|
|Patio_Lawn_and_Garde|I was excited to ...|
|Patio_Lawn_and_Garde|I purchased the L...|
|Patio_Lawn_and_Garde|Never used a manu...|
|Patio_Lawn_and_Garde|Good price. Good ...|
|Patio_Lawn_and_Garde|I have owned the ...|
|Patio_Lawn_and_Garde|I had "won" a sim...|
|Patio_Lawn_and_Garde|The birds ate all...|
|Patio_Lawn_and_Garde|Bought last summe...|
|Patio_Lawn_and_Garde|I knew I had a mo...|
|Patio_Lawn_and_Garde|I was a little wo...|
|Patio_Lawn_and_Garde|I have used this ...|
|Patio_Lawn_and_Garde|I actually do not...|
|Patio_Lawn_and_Garde|Just what 

## Create RegexTokenizer

In [7]:
pattern = r'\s+|\d+|[(){}\[\].!?,;:+=_"\'`~#@&*%€$§\\/\-]'
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern=pattern)

## Create StopWordsRemover

In [8]:
remover = StopWordsRemover(stopWords = list(stopwords), inputCol="words", outputCol="filtered_words")


## Create CountVectorizer

In [9]:
#hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures")
countV = CountVectorizer(inputCol="filtered_words", outputCol="rawFeatures")


## Create IDF model

In [10]:
idf = IDF(inputCol="rawFeatures", outputCol="features")


## Create StringIndexer

In [11]:
# Apply StringIndexer to convert categorical column to numerical
indexer = StringIndexer(inputCol="category", outputCol="label")


## Create Selector
Here, we add a SelectionThreshold of 2000 to have the top 2000 terms overall.

In [12]:
selector = UnivariateFeatureSelector(
    featuresCol="features",outputCol="selectedFeatures",
    labelCol="label",  selectionMode="numTopFeatures",
)
selector.setFeatureType("categorical").setLabelType("categorical").setSelectionThreshold(2000)

UnivariateFeatureSelector_493b1e778294

## Create Pipeline

In [13]:
pipeline = Pipeline(stages=[regexTokenizer, remover, countV, idf, indexer, selector])

In [14]:
#model = pipeline.fit(reviews_df)

In [15]:
#result = model.transform(reviews_df)

In [16]:
#result.select("selectedFeatures").show(truncate=False)

## Get top 2000 terms

In [17]:
# Extract vocabulary from CountVectorizer
vocabulary = model.stages[2].vocabulary

# Map selected feature indices to terms
selected_terms = [vocabulary[i] for i in model.stages[-1].selectedFeatures]

In [18]:
len(selected_terms)

2000

In [19]:
vocabulary[:30] == selected_terms[:30]

False

In [20]:
type(selected_terms)

list

In [21]:
with open("output_ds.txt", "w") as f:
    for term in selected_terms:
        f.write(term + "\n")

# Part 3

In this part, you will train a text classifier from the features extracted in Part 2. The goal is to learn a model that can predict the product category from a review's text.

To this end, extend the pipeline from Part 2 such that a Support Vector Machine classifier is trained. Since we are dealing with multi-class problems, make sure to put a strategy in place that allows binary classifiers to be applicable. Apply vector length normalization before feeding the feature vectors into the classifier (use Normalizer with L2 norm).

Follow best practices for machine learning experiment design and investigate the effects of parameter settings using the functions provided by Spark:

- Split the review data into training, validation, and test set.

- Make experiments reproducible.

- Use a grid search for parameter optimization:

- Compare chi square overall top 2000 filtered features with another, heavier filtering with much less dimensionality (see Spark ML documentation for options).

- Compare different SVM settings by varying the regularization parameter (choose 3 different values), standardization of training features (2 values), and maximum number of iterations (2 values).

Use the MulticlassClassificationEvaluator to estimate performance of your trained classifiers on the test set, using F1 measure as criterion.

## Split into Train/Test/Val and set seed

In [17]:
(reviews_train, reviews_test) = reviews_df.randomSplit([0.9, 0.1], 123)

In [18]:
reviews_train.count()

                                                                                

71031

In [19]:
reviews_test.count()

                                                                                

7798

## Extend Pipeline

In [20]:
normalizer = Normalizer(inputCol="selectedFeatures", outputCol="normalized_features")

In [21]:
#scaler = StandardScaler(inputCol="normalized_features", outputCol="scaled_features", withMean=False)

In [22]:
svm = LinearSVC(featuresCol="normalized_features")
ovr = OneVsRest(classifier=svm)

In [23]:
extended_pipeline = Pipeline(stages=[regexTokenizer, remover, countV, idf, indexer, selector, normalizer, ovr])

## Define Evaluator

In [24]:
evaluator = MulticlassClassificationEvaluator(metricName="f1")

## Perform Grid Search using different SVM settings

In [25]:
param_grid = ParamGridBuilder() \
    .addGrid(svm.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(svm.maxIter, [10, 20]) \
    .addGrid(normalizer.p, [1, 2]) \
    .build()

cross_validator = CrossValidator(estimator=extended_pipeline, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3, seed=123)

In [26]:
start_time = datetime.now()
# Train the model
cv_model = cross_validator.fit(reviews_train)
end_time = datetime.now()

# Make predictions on the test set
predictions = cv_model.transform(reviews_test)

# Evaluate the model
f1_score = evaluator.evaluate(predictions)
print(f"F1 Score: {f1_score}")

24/05/21 20:14:49 WARN DAGScheduler: Broadcasting large task binary with size 1990.9 KiB
24/05/21 20:14:50 WARN DAGScheduler: Broadcasting large task binary with size 1993.0 KiB
24/05/21 20:14:54 WARN DAGScheduler: Broadcasting large task binary with size 1996.0 KiB
  self._sock = None
24/05/21 20:15:12 WARN DAGScheduler: Broadcasting large task binary with size 1994.0 KiB
24/05/21 20:15:16 WARN DAGScheduler: Broadcasting large task binary with size 1995.2 KiB
24/05/21 20:15:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/21 20:15:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
24/05/21 20:15:20 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/05/21 20:15:20 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/05/21 20:15:20 WARN DAGScheduler: Broadcasting large task binary with size 

KeyboardInterrupt: 

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/sw/venv/python39/dic24/lib/python3.9/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/sw/venv/python39/dic24/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/sw/venv/python39/dic24/lib/python3.9/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [None]:
print("start time:", start_time)
print("end time:", end_time)

In [None]:
# Get the best model
bestModel = cv_model.bestModel

# Show the best parameters
print("Best Params:")
print("RegParam:", bestModel.stages[-1]._java_obj.getRegParam())
print("MaxIter:", bestModel.stages[-1]._java_obj.getMaxIter())
#print("P value:", bestModel.stages[-1]._java_obj.getP())

24/05/21 21:31:28 WARN DAGScheduler: Broadcasting large task binary with size 2004.0 KiB
24/05/21 21:31:34 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:40 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:41 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:41 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:41 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:41 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:41 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:41 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:42 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:42 WARN DAGScheduler: Broadcasting large task binary with size 2005.2 KiB
24/05/21 21:31:42 WAR

## Compare Chi Square with another filtering method