# Exercise 2 - Spark
## Data Intensive Computing - SS2023

Federico Ambrogi, e1449911@student.tuwien.ac.a

Maryna Gutruf, e11901838@student.tuwien.ac.at

Emily Jacob, e12143768@student.tuwien.ac.a

## Part 1: RDDs


The first task is to reproduce the results from Exercise 1.

In particular we will use the following RDDs transformations:


- pyspark.RDD.reduceByKey => Merge the values for each key using an associative and commutative reduce function.

- pyspark.RDD.flatMap => Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

- pyspark.RDD.flatMapValues => Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD’s partitioning. 



In [5]:
# in case Spark context were defined before when running cells multiple times
try:
    sc.stop()
except:
    pass

In [3]:
# Task 0
# here we produce the file categories.txt to be used later for chi square calculation

from pyspark import SparkContext, SparkConf
import json

# Create a SparkContext
conf = SparkConf().setAppName("CategoriesCounter")
sc = SparkContext(conf=conf)

# Load the input file from HDFS
input_file = sc.textFile("hdfs:///user/dic23_shared/amazon-reviews/full/reviews_devset.json")

# Here we read the list of stopwords that must be ignored from the review text stored as global variable
stop_words = open('stopwords.txt').readlines()
stop_words = [w.replace('\n', '') for w in open('stopwords.txt').readlines()]

# Parse each line as JSON
data = input_file.map(json.loads)

# Map operation: Extract category as key and emit count of 1
category_counts = data.map(lambda x: (x["category"], 1))

# Reduce operation: Sum the counts for each category
category_counts = category_counts.reduceByKey(lambda x, y: x + y)

# Filter out stopwords
category_counts = category_counts.filter(lambda x: x[0] not in stop_words)

# Collect the results
results = category_counts.collect()

# Print the results and save it to file "categories.txt"
f = open('categories.txt', 'w')
for result in results:
    print(f'"{result[0]}"\t{result[1]}')
    f.write(f'"{result[0]}"\t{result[1]}'+'\n')
    
f.close()

# Stop Spark
sc.stop()

23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4047. Attempting port 4048.
23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4048. Attempting port 4049.
23/06/04 23:09:20 WARN Utils: Service 'SparkUI' could not bind on port 4049. Attempting port 4050.
23/06/04 2

"Apps_for_Android"	2638
"Book"	22507
"Toys_and_Game"	2253
"Office_Product"	1243
"Digital_Music"	836
"Automotive"	1374
"Beauty"	2023
"Kindle_Store"	3205
"Electronic"	7825
"Movies_and_TV"	4607
"Tools_and_Home_Improvement"	1926
"Grocery_and_Gourmet_Food"	1297
"Musical_Instrument"	500
"CDs_and_Vinyl"	3749
"Clothing_Shoes_and_Jewelry"	5749
"Home_and_Kitche"	4254
"Cell_Phones_and_Accessorie"	3447
"Pet_Supplie"	1235
"Baby"	916
"Health_and_Personal_Care"	2982
"Patio_Lawn_and_Garde"	994
"Sports_and_Outdoor"	3269


In [6]:
from pyspark import SparkContext, SparkConf
import re
import json

# Create a SparkContext
conf = SparkConf().setAppName("ChiSquareCalculation")
sc = SparkContext(conf=conf)


# Load the input file from HDFS 
input_file = sc.textFile("hdfs:///user/dic23_shared/amazon-reviews/full/reviews_devset.json")


# Here we read the list of stopwords that must be ignored from the review text stored as global variable
stop_words = open('stopwords.txt').readlines()
stop_words = [w.replace('\n', '') for w in open('stopwords.txt').readlines() ]

# Function to preprocess the review text and extract terms
def preprocess_review(line):
    """ Tokenize on whitespaces, tabs, digits, and specified characters
    Case fold and remove empty tokens
    Filter out tokens consisting of only one character and those that are
    part of the stopwords """
        
    data = json.loads(line)
    review_text = data["reviewText"]
    category = data["category"]
    
    # Simplify word tokens, unigram preprocessing
    review_words_list = re.split('[^a-zA-Z<>^|]+', review_text)  # splitting words by spaces, characters etc.
    review_words_list = [f.lower() for f in review_words_list] # lower case letters to make items uniform
    review_words_list = [f for f in review_words_list if len(f) > 1 ] # exclude terms with only one letter
    review_words_list = [str(w) for w in review_words_list if w not in stop_words ] # Filter stop words
    review_words_list = list(set(review_words_list)) # Remove duplicated words
    
    return [(category,word) for word in review_words_list]

#Preprocess the review text and extract terms
terms_rdd = input_file.flatMap(preprocess_review)

#Output after the flatMap
#dataColl=term_freq_rdd.collect()  ### To print content of an RDD 
#for row in dataColl:
#    print(row )
#('Patio_Lawn_and_Garde', 'insight')
#('Patio_Lawn_and_Garde', 'things')
#('Patio_Lawn_and_Garde', 'open')


#Calculate term frequency per category
term_freq_rdd = terms_rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(lambda x, y: x + y)

#Output after reducer
#(('Patio_Lawn_and_Garde', 'insight'), 1)
#(('Patio_Lawn_and_Garde', 'things'), 31)
#(('Patio_Lawn_and_Garde', 'open'), 14)
#(('Patio_Lawn_and_Garde', 'raichlen'), 1)


23/06/04 18:51:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/06/04 18:51:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/06/04 18:51:18 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/06/04 18:51:18 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/06/04 18:51:18 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
23/06/04 18:51:18 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
23/06/04 18:51:18 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [9]:
#Calculate term frequency per term across all categories
term_freq_all_rdd = term_freq_rdd.map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
                                .reduceByKey(lambda x, y: x + y) \
                                .mapValues(lambda x: {term: freq for term, freq in x})

'''
Output after the reducer
"Patio_Lawn_and_Garde"	{"interpret":1,"yum":1,"provided":8,"simple":17,"gift":21,"easy":142,
'''

# Join the term frequency per category and term frequency per RDDs
term_freq_mapped_rdd = term_freq_all_rdd.flatMap(lambda x: [(term, [x[0], freq]) for term, freq in x[1].items()])

term_freq_grouped_rdd = term_freq_mapped_rdd.groupByKey()
term_freq_grouped_mapped_rdd = term_freq_grouped_rdd.map(lambda x: (x[0], list(x[1])))

'''Output after mapping
abandonment	[['Book', 20], ['Digital_Music', 1], ['CDs_and_Vinyl', 1], ['Kindle_Store', 3], ['Movies_and_TV', 2]]
abandons	[['Book', 3], ['Movies_and_TV', 1]]
abaord	[['Movies_and_TV', 1]]
'''

""" Loading the precomputed dictionary of total number of reviews per category.
Will be used for chi-square computation.
This file was created for Exercise 1 """
def load_cat_dict():
    cat = open( 'categories.txt', 'r').readlines()
    cats = [eval(l.split('\t')[0].replace("'",'')) for l in cat]
    num = [int(l.split('\t')[1].replace('\n','')) for l in cat]
    cat_dic = dict(zip(cats,num))
    cat_dic['N'] = sum(cat_dic.values())
    
    return cat_dic 

# storing as global variable
cat_dict = load_cat_dict()
N = sum(cat_dict.values()) # Extracting the sum of all categories items


def test(data):
    chi_dict = {}
    term, freqs = data
    term_freq_all = sum(freq[1] for freq in freqs)

    for cat_freq in freqs:
        category = cat_freq[0]
        A = cat_freq[1]
        B = term_freq_all - A
        #B = sum(freq[1] for freq in cat_freq) - A
        C = cat_dict[category] - A
        D = N - A - B - C
        if (A + C) * (B + D) * (A + B) * (C + D)==0:
            chi = 0.0
        else:
            chi = (N * ((A * D) - (B * C)) ** 2) / ((A + C) * (B + D) * (A + B) * (C + D))
        chi_dict[category] = (term, chi)
        
    return chi_dict

# Calculate chi-square values for each term in each category
chi_square_dict_rdd = term_freq_grouped_mapped_rdd.map(test)

# Sort the terms within each category by their chi-square values in descending order
sorted_chi_square_rdd = chi_square_dict_rdd.flatMap(lambda x: x.items()) \
                                          .sortBy(lambda x: (x[0], -x[1][1]), ascending=True)

# Group the sorted terms by category
#grouped_sorted_chi_square_rdd = sorted_chi_square_rdd.groupByKey()

# Select the top 75 terms per category.
top_75_rdd = sorted_chi_square_rdd.groupByKey().flatMapValues(lambda x: list(x)[:75])

# Take the top 75 elements from each line of the sorted RDD
top_75 = top_75_rdd.mapValues(lambda x: list(x)[:75])

# Group the terms by category
grouped_75terms_rdd = top_75.groupByKey()

for category_terms in grouped_75terms_rdd.collect():
    category = category_terms[0]
    terms = category_terms[1]
    #print(f"<{category}>", end="")
    #for term in terms:
    #    print(term, end=" ")
    #print()  # Print a newline after each category
    
# Merge the selected terms into a single string separated by spaces
merged_terms = top_75_rdd.map(lambda x: x[1][0]).distinct().sortBy(lambda x: x).collect()
merged_terms_line = ' '.join(merged_terms)

# Convert floats to strings in merged_terms list
merged_terms = [str(term) for term in merged_terms]

# Merge the terms into a single line, space-separated and ordered alphabetically
merged_terms_line = ' '.join(sorted(merged_terms))
# Print the merged dictionary
print(merged_terms_line[:10])

# Stop the SparkContext
sc.stop()

                                                                                

<Apps_for_Android>['games', 6583.942889917388] ['play', 5002.358815587745] ['kindle', 3380.168965841006] ['graphics', 3181.0699703841055] ['addictive', 2736.5880135953485] ['challenging', 2242.4750226267697] ['fire', 2184.354041916016] ['coins', 2079.641082376314] ['addicting', 2060.046070090142] ['levels', 1833.1218038205284] ['playing', 1704.6523808018524] ['free', 1362.6215979108608] ['ads', 1347.6288905747865] ['puzzles', 1263.0978589113051] ['apps', 1205.2094574661078] ['bingo', 847.4482701753977] ['download', 720.9586465763484] ['mahjong', 658.6773654310135] ['love', 642.4812472048427] ['facebook', 617.2863122995981] ['downloaded', 611.5605887014297] ['faotd', 587.6794287320041] ['hints', 541.6073614675568] ['awesome', 520.9957275705385] ['played', 508.5249120519665] ['android', 484.89374199391636] ['puzzle', 457.86524000753394] ['solitaire', 437.6003777184345] ['gameplay', 413.6092743512669] ['freezes', 405.34935027455583] ['unlock', 401.17193290583026] ['tablet', 400.6131154059

## Part 2: Datasets/DataFrames Spark ML and Pipelines 

For this Task 2 we introduce Spark pipeline.

first of all we take advantage of the standard tokenization and text processing functionalities implemented in PySpark (such as  RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, ...) and most important the chi-square selector. Since we are intrested in the top 2000 terms, we set the parameter "numTopFeatures=2000" accordingly. 

The pipeline is structured as follows:

- Tokenization using RegexTokenizer

- Removal of stop wrods using StopWordsRemover

- CountVectorizer to convert tokens into term frequency vectors

- TF-IDF calculation

- Define the string indexer for the category column

- Define the ChiSqSelector to select the top 2000 features



Important functions:

pyspark.ml.feature.ChiSqSelector => Chi-Squared feature selection, which selects categorical features to use for predicting a categorical label.

pyspark.ml.PipelineA => simple pipeline, which acts as an estimator. A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer.


In [11]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, Normalizer, ChiSqSelector
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml import Pipeline


# Create a New SparkSession
spark = SparkSession.builder \
    .appName("TF-IDF Pipeline") \
    .getOrCreate()

json_path = "hdfs:///user/dic23_shared/amazon-reviews/full/reviews_devset.json"
df1 = spark.read.json(json_path)
df = df1.sample(withReplacement=False, fraction=0.1, seed=123)

# Tokenization
tokenizer = RegexTokenizer(minTokenLength = 2,
                           inputCol="reviewText", 
                           outputCol="tokens", 
                           pattern=r'\s+|\d+|[(){}\[\].!?,;:+=\-_"\'`~#@&*%€$§\\/]')

# StopwordsRemover
stopwords = StopWordsRemover.loadDefaultStopWords("english")
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens", stopWords=stopwords)

# CountVectorizer to convert tokens into term frequency vectors
count_vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="raw_features")

# TF-IDF calculation
idf = IDF(inputCol="raw_features", 
          outputCol="features")

# Define the string indexer for the category column
indexer = StringIndexer(inputCol="category", outputCol="label")

# Define the ChiSqSelector to select the top 2000 features
selector = ChiSqSelector(numTopFeatures=2000, 
                         featuresCol="features", 
                         outputCol="selected_features")

# Defining the Pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer, idf, indexer, selector])
pipeline_model = pipeline.fit(df)

# After transforming the data
transformed_data = pipeline_model.transform(df)

# Select the selected_features column
selected_features = transformed_data.select("selected_features")

# Convert the selected features column to a list
terms_list = selected_features.collect()[0]["selected_features"]

# Get vocabulary list from CountVectorizer model
vocabulary = pipeline_model.stages[2].vocabulary

# Get the CountVectorizerModel from the pipeline
count_vectorizer_model = pipeline_model.stages[2]

# Get the vocabulary list
vocabulary = count_vectorizer_model.vocabulary

# Select the indices of the selected features
selected_indices = pipeline_model.stages[-1].selectedFeatures

# Retrieve the selected terms from the vocabulary
selected_terms = sorted([vocabulary[i] for i in selected_indices])

# Write the selected terms to the output file
with open("output_ds.txt", "w") as file:
    for term in selected_terms:
        file.write(term + "\n")

                                                                                

## Part 3: Train the text classifier using SVM

Here we will use the Support Machine Classifier for a classification task: we will try to extract the category of the item bssed on the review text.


The process can be schematically summarised as follows:

- the dataset is split into training ($60\%$) , validation ($20\%$) and test ($20\%$) sets
A random seed is set to a fixed value in order to ensure reproducibility

- input features are normalized using $L2$ normalization (function "Normalizer")

- the SVM classifier is called, using the "LinearSVC" function and it is encapsulated in the "OneVsRest" function to allow for multiclass classification. 

- two pipeline are introduced: the first one uses the chi square as the main feature selector, the second one based on the varance of the feature. As a threshold, the value 8.0 was used.

- A grid search was used in order to change the parameters: we chose the values [0.01,0.1 and 1.0] for the regularisation parameter, the standardization of features [True, False] and maximum number of iterations allowed [10,20].




In [10]:
# Split the data into training, validation, and test sets
(training_data, validation_data, test_data) = transformed_data.randomSplit([0.6, 0.2, 0.2], seed=123)

# Define L2 Normalizer
normalizer = Normalizer(inputCol="selected_features", outputCol="normalized_features", p=2.0)

# Define SVM classifier
svm = LinearSVC(featuresCol="normalized_features", labelCol="label")

# Encapsulate the SVM classifer inside a OneVsRest to allow for multiclass classification
one_vs_rest = OneVsRest(featuresCol="normalized_features", labelCol="label", classifier=svm)

# Define additional feature selection technique, VarianceThresholdSelector
# Feature selector that removes all low-variance features. 
# Features with a (sample) variance not greater than the threshold will be removed. 

pca_selector = VarianceThresholdSelector(varianceThreshold=8.0, 
                                         featuresCol="features", 
                                         outputCol="selectedFeatures")

# Define the first pipeline estimator comprised of chi_square_selector as the feature selector
pipeline_chi_square = Pipeline(stages=[normalizer, one_vs_rest])

# Define the second line estimator comprised of VarianceThresholdSelector as the feature selector
pipeline_pca = Pipeline(stages=[pca_selector, normalizer, one_vs_rest])

# Define the parameter grid for grid search
param_grid = (ParamGridBuilder()
    .addGrid(svm.regParam, [0.01, 0.1, 1.0])
    .addGrid(svm.standardization, [True, False])
    .addGrid(svm.maxIter, [10, 20])
    .build())

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Perform grid search on the first pipeline of chi-square selector
cv_chi = CrossValidator(estimator=pipeline_chi_square, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds = 3, parallelism = 2.0)
model_chi = cv_chi.fit(training_data)

predictions_chi = model_chi.transform(test_data)
f1_score_chi = evaluator.evaluate(predictions_chi)
print("Chi-Square Selector: F1 score on test set: {}".format(f1_score_chi))

# Perform grid search on the first pipeline of VarianceThresholdSelector
cv_pca = CrossValidator(estimator=pipeline_pca, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds = 3, parallelism = 2.0)
model_pca = cv_pca.fit(training_data)

predictions_pca = model_pca.transform(test_data)
f1_score_pca = evaluator.evaluate(predictions_pca)
print("Variance Threshold Selector: F1 score on test set: {}".format(f1_score_pca))

# Stop Spark
spark.stop()

23/06/04 19:05:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/06/04 19:05:04 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/06/04 19:05:04 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/06/04 19:05:04 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/06/04 19:05:04 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
23/06/04 19:05:04 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
23/06/04 19:05:04 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/06/04 19:06:16 WARN DAGScheduler: Broadcasting large task binary with size 1007.7 KiB
23/06/04 19:06:16 WARN DAGScheduler: Broadcasting large task binary with size 1007.7 KiB
23/06/04 19:06:16 WARN DAGScheduler: Broadcasting large task binary with size 1008.8 KiB