# Exercise 2 - Text Processing and Classification using Spark

## Part 2 

In [1]:
#importing libraries

from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, ChiSqSelector,  StringIndexer, Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.stat import ChiSquareTest
from pyspark import SparkContext
from pyspark import SparkConf
import json 
from operator import add
import re
from heapq import nlargest

from pyspark.ml.classification import LinearSVC,  OneVsRest

In [2]:
#starting spark session

spark = SparkSession.builder.getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
24/05/29 16:31:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/29 16:31:41 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [3]:
#we are using the review_devset from the cluster. After that we keep only the necessary columns

textDF = spark.read.json("hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json").createOrReplaceTempView("review")
df = spark.sql("SELECT category,reviewText FROM review")

                                                                                

In [4]:
df.show()

+--------------------+--------------------+
|            category|          reviewText|
+--------------------+--------------------+
|Patio_Lawn_and_Garde|This was a gift f...|
|Patio_Lawn_and_Garde|This is a very ni...|
|Patio_Lawn_and_Garde|The metal base wi...|
|Patio_Lawn_and_Garde|For the most part...|
|Patio_Lawn_and_Garde|This hose is supp...|
|Patio_Lawn_and_Garde|This tool works v...|
|Patio_Lawn_and_Garde|This product is a...|
|Patio_Lawn_and_Garde|I was excited to ...|
|Patio_Lawn_and_Garde|I purchased the L...|
|Patio_Lawn_and_Garde|Never used a manu...|
|Patio_Lawn_and_Garde|Good price. Good ...|
|Patio_Lawn_and_Garde|I have owned the ...|
|Patio_Lawn_and_Garde|I had "won" a sim...|
|Patio_Lawn_and_Garde|The birds ate all...|
|Patio_Lawn_and_Garde|Bought last summe...|
|Patio_Lawn_and_Garde|I knew I had a mo...|
|Patio_Lawn_and_Garde|I was a little wo...|
|Patio_Lawn_and_Garde|I have used this ...|
|Patio_Lawn_and_Garde|I actually do not...|
|Patio_Lawn_and_Garde|Just what 

In [5]:
stopwordsPath = "DIC2/stopwords.txt"
stopwords = spark.sparkContext.textFile(stopwordsPath).collect()

                                                                                

In [6]:
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\s+|\\d+|[()\\[\\]{}.,;!?:+=\\-_\"'`~#@&*%€$§\\/]+", toLowercase=True)
remover = StopWordsRemover(inputCol="words", outputCol="filtered", caseSensitive=False)
vectorizer = CountVectorizer(inputCol="filtered", outputCol="vectorized")
idf = IDF(inputCol="vectorized", outputCol="tfidf")
encoder = StringIndexer(inputCol="category", outputCol="label")
chi2000 = ChiSqSelector(featuresCol="tfidf", labelCol="label", outputCol="selected", numTopFeatures=2000)

In [7]:
pipeline = Pipeline().setStages([tokenizer, remover, vectorizer, idf, encoder, chi2000])

In [8]:
pipelineModel = pipeline.fit(df)
transformedData = pipelineModel.transform(df)

24/05/29 16:32:31 WARN DAGScheduler: Broadcasting large task binary with size 1060.2 KiB
24/05/29 16:32:44 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/05/29 16:32:44 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/05/29 16:32:55 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
                                                                                

In [9]:
selectedFeatures = pipelineModel.stages[5].selectedFeatures
words = pipelineModel.stages[2].vocabulary

output = set()
for i in selectedFeatures:
    output.add(words[i])

sorted_output = sorted(list(output))

In [10]:
with open('output_ds.txt', 'w') as f:  
    f.write(str(re.sub(",|'|[0-9]|\[|\]|\.","", str(sorted_output))))
f.close()

## Part 3

In [11]:
df=transformedData

In [12]:
#downsampling because the dataframe is too big and we're getting too much warning about it. Also the training is superlong. To use the whole dataframe simply delete this cell

df=df.sample(fraction=0.1, seed=4242)

In [13]:
df2=df.select("label", "selected").toDF("label", "selected")

In [14]:
normalizer = Normalizer().setInputCol("selected").setOutputCol("normalized").setP(2.0)
df_norm =normalizer.transform(df2)

In [15]:
df3=df_norm.select("label", "normalized").toDF("label", "normalized")

In [16]:
df3.show()

24/05/29 16:33:51 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB


+-----+--------------------+
|label|          normalized|
+-----+--------------------+
| 18.0|(2000,[5,7,8,9,27...|
| 18.0|(2000,[2,25,45,17...|
| 18.0|(2000,[1,4,9,15,1...|
| 18.0|(2000,[3,4,8,24,2...|
| 18.0|(2000,[4,558,1265...|
| 18.0|(2000,[74,265,288...|
| 18.0|(2000,[11,28,49,3...|
| 18.0|(2000,[0,2,5,18,8...|
| 18.0|(2000,[1,3,26,57,...|
| 18.0|(2000,[10,49,202,...|
| 18.0|(2000,[1,2,6,16,3...|
| 18.0|(2000,[1,2,15,25,...|
| 18.0|(2000,[1,31,38,39...|
| 18.0|(2000,[1,3,4,20,3...|
| 18.0|(2000,[1,8,9,14,1...|
| 18.0|(2000,[3,49,77,16...|
| 18.0|(2000,[13,185,630...|
| 18.0|(2000,[14,80,166,...|
| 18.0|(2000,[9,43,49,50...|
| 18.0|(2000,[3,4,7,26,3...|
+-----+--------------------+
only showing top 20 rows



In [17]:
#splitting and making it reproducible
train,val, test = df3.randomSplit([0.8,0.1, 0.1], seed = 4242)

In [None]:
lsvc = LinearSVC(featuresCol="normalized", labelCol="label", maxIter=10)
ovr = OneVsRest(classifier=lsvc, featuresCol="normalized", labelCol="label")
ovr_model = ovr.fit(train)

In [19]:
grid= ParamGridBuilder().addGrid(lsvc.regParam, [0.001, 0.01, 0.1] ).addGrid(lsvc.standardization, [True, False] ).addGrid(lsvc.maxIter, [10, 8]).build()

In [20]:
evaluator=MulticlassClassificationEvaluator(metricName="f1")

In [21]:
cv=CrossValidator(estimator=lsvc, estimatorParamMaps=grid, evaluator=evaluator, numFolds=2)

In [None]:
cv_model=cv.fit(train)

In [None]:
best_model=cv_model.bestModel

In [None]:
predictions_test=best_model.transform(test)

In [None]:
ovr_predictions_test = ovr_model.transform(test)
ovr_f1_score = evaluator.evaluate(ovr_predictions_test)
print(f"OVR F1 Score: {ovr_f1_score}")

In [None]:
best_model_predictions_test = best_model.transform(test)
best_model_f1_score = evaluator.evaluate(best_model_predictions_test)
print(f"Best Model F1 Score: {best_model_f1_score}")