In [1]:
from pyspark import SparkContext
import json
import re
from collections import defaultdict

sc = SparkContext(appName="ChiSquare")

SLF4J: Class path contains multiple SLF4J bindings.

In [2]:
with open("stopwords.txt", "r") as f:
    STOPWORDS = set(line.strip() for line in f)

# Part 1) RDDs

In [3]:
def map_category(line):
    """
    The function parses the input line and extracts the category of the review.
    Input:
        line: json representing the review
    Output:
        category
    """
    review = json.loads(line)
    return review["category"]

def map_words(line, stopwords):
    """
    The function parses the input line and extracts the category and text from the review.
    Then it carries out the tokenization, case folding and stopword filtering on the review text.
    Finally, it yields a tuple per every unique word extracted, holding the information regarding the category
    Input:
        line: line from the text file containing a json representing a review
        stopwords
    Output:
        key: word
        value: (category, 1)
    """
    regex = re.compile(r"[\s\t\d\[\]\{\}().!?,;:+=\-_\"'`~#@&*%€$§\\/]+")
    review = json.loads(line)
    cat = review["category"]
    word_list = map(lambda word: word.lower(), regex.split(review["reviewText"]))
    unique_words = {word for word in word_list if word and word not in stopwords}
    return [(word, (cat, 1)) for word in unique_words]

def reducer_words(values):
   """
    Aggregates word counts per category and calculates the total occurrences of each word.
    Input:
        values: a list of tuples where each tuple contains a category and a count (e.g., (category, 1))
    Output:
        A list of tuples, each containing:
            - cat: category
            - all_occur: the total occurrences of the word across all categories
            - c: the count of the word within that category
    """
    all_occur = 0
    cat_counts = {}
    for cat, c in values:
        all_occur += c
        cat_counts[cat] = cat_counts.get(cat, 0) + c
    return [(cat, all_occur, c) for cat, c in cat_counts.items()]

def calculate_chi_squared(n, cat_count, all_occur, a):
    """
    Calculates the chi_squared score for category and word based on the provided metrics.
    Input:
        n: total number of reviews
        cat_count: total number of reviews in category
        all_occur: total number of occurrences of word
        a: number of reviews in given category which contain the given word
    Return:
        -1 if the calculation encounters division by zero
        float chi_squared score otherwise
    """
    c = cat_count - a
    b = all_occur - a
    d = n - cat_count - b
    denom = (a + b) * (a + c) * (b + d) * (c + d)
    if denom == 0:
        return -1
    return n * (a * d - b * c) ** 2 / denom

def reducer_scores(values, n, cat_count):
    """
    Reduces intermediate word data to Chi-Square scores and selects the top 75 terms per category.
    Input:
        values: a list of tuples containing word data (word, all_occurrences, count in category)
        n: total number of reviews
        cat_count: total number of reviews in the category
    Output:
        A sorted list of tuples containing Chi-Square scores and words, limited to the top 75.
    """
    scores = []
    for word, all_occur, a in values:
        score = calculate_chi_squared(n, cat_count, all_occur, a)
        scores.append((score, word))
    return sorted(scores, reverse=True)[:75]

def run_chi_squared(input_file, output_file):
    """
    Executes the Chi-Square calculation on the input data, using the helper functions defined above, and writes the results to the output file.
    Input:
        input_file: path to the input file containing review data
        output_file: path to the output file where results will be written
    """
    stopwords_broad = sc.broadcast(STOPWORDS)

    reviews = sc.textFile(input_file)

    cat_counts = reviews.map(map_category).countByValue()
    n = sum(cat_counts.values())

    first_step = reviews.flatMap(lambda x: map_words(x, stopwords_broad.value)).groupByKey().flatMapValues(reducer_words).map(lambda x: (x[1][0], (x[0], x[1][1], x[1][2]))).cache()
    second_step = first_step.groupByKey().map(lambda x: (x[0], reducer_scores(x[1], n, cat_counts[x[0]]))).cache()

    with open(output_file, "w") as f:
        for category, scores in sorted(second_step.collect()):
            chi_sq = [f"{word}:{score}" for score, word in scores]
            f.write(f"<{category}> {' '.join(chi_sq)}\n")

        all_words = sorted(set(second_step.flatMap(lambda x: x[1]).map(lambda x: x[1]).collect()))
        f.write(f"{' '.join(all_words)}\n")

In [4]:
%%time
run_chi_squared("reviews_devset.json","output_rdd.txt")

                                                                                

CPU times: user 180 ms, sys: 47.8 ms, total: 227 ms
Wall time: 34.5 s


In [None]:
%%time
run_chi_squared("/user/dic24_shared/amazon-reviews/full/reviewscombined.json","output_rdd_fulldata.txt")

# Part 2) Datasets/DataFrames: Spark ML and Pipelines

## DataFrame Creation

In this section we create the dataframe from the reviews file, which will contain two columns `reviewText` and `category` extracted from the  review JSON.

In [5]:
FILE = "reviews_devset.json"

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TextClassification").getOrCreate()

In [7]:
reviews = spark.read.json(FILE)
df = reviews.select("reviewText", "category")

                                                                                

In [8]:
df.printSchema()

root
 |-- reviewText: string (nullable = true)
 |-- category: string (nullable = true)



In [9]:
df.take(1)

[Row(reviewText="This was a gift for my other husband.  He's making us things from it all the time and we love the food.  Directions are simple, easy to read and interpret, and fun to make.  We all love different kinds of cuisine and Raichlen provides recipes from everywhere along the barbecue trail as he calls it. Get it and just open a page.  Have at it.  You'll love the food and it has provided us with an insight into the culture that produced it. It's all about broadening horizons.  Yum!!", category='Patio_Lawn_and_Garde')]

## ML Pipeline

In the following section we set up the transformation pipeline.

In [3]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, ChiSqSelector, StringIndexer, CountVectorizer
from pyspark.ml import Pipeline

In [11]:
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="allWords", pattern=r"[\s\t\d\[\]\{\}().!?,;:+=\-_\"'`~#@&*%€$§\\/]+") # converts to lowercase and then tokenizes
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="words", stopWords=list(STOPWORDS))
cv = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="rawFeatures")
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") # category has to be numeric
selector = ChiSqSelector(numTopFeatures=2000, featuresCol=cv.getOutputCol(),
                         outputCol="selectedFeatures", labelCol=indexer.getOutputCol())

pipeline = Pipeline(stages=[tokenizer, remover, cv, indexer, selector])

model = pipeline.fit(df)

24/05/26 15:23:17 WARN DAGScheduler: Broadcasting large task binary with size 1074.0 KiB
24/05/26 15:23:18 WARN DAGScheduler: Broadcasting large task binary with size 1076.1 KiB
24/05/26 15:23:28 WARN DAGScheduler: Broadcasting large task binary with size 1078.2 KiB
                                                                                

In [12]:
model.transform(df).show()

24/05/26 15:24:10 WARN DAGScheduler: Broadcasting large task binary with size 1078.9 KiB


+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+
|          reviewText|            category|            allWords|               words|         rawFeatures|categoryIndex|    selectedFeatures|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+
|This was a gift f...|Patio_Lawn_and_Garde|[this, was, a, gi...|[gift, husband, m...|(96130,[2,3,7,8,3...|         18.0|(2000,[2,3,7,8,35...|
|This is a very ni...|Patio_Lawn_and_Garde|[this, is, a, ver...|[nice, spreader, ...|(96130,[0,1,3,21,...|         18.0|(2000,[0,1,3,21,3...|
|The metal base wi...|Patio_Lawn_and_Garde|[the, metal, base...|[metal, base, hos...|(96130,[4,10,29,1...|         18.0|(2000,[4,10,174,3...|
|For the most part...|Patio_Lawn_and_Garde|[for, the, most, ...|[part, works, pre...|(96130,[1,3,4,9,1...|         18.0|(2000,[1,3,4,9,10...|
|This 

                                                                                

## Top 2000 Terms

By fitting the pipeline on all the data we can extract the top 2000 terms according to the chi-squared feature selector.

In [13]:
top_features = model.stages[-1].selectedFeatures
vocabulary = model.stages[-3].vocabulary[:(max(top_features) + 1)]

In [14]:

top_terms_model = [vocabulary[i] for i in top_features]

In [15]:
top_terms_model[:20]

['great',
 'good',
 'love',
 'time',
 'work',
 'recommend',
 'back',
 'easy',
 'make',
 'bought',
 'made',
 'find',
 'buy',
 'price',
 'put',
 'reading',
 'quality',
 'people',
 'works',
 'quot']

In [18]:
with open("output_ds.txt", "w") as f:
    f.write(" ".join(top_terms_model))

## Comparison With Assignement 1

We performed the chi-square calculation in the first assignement to calculate the top 75 terms in each review category. We now want ot compare these results.

In [None]:
top_terms_1 = []
with open("output_1.txt") as f:
    lines = f.readlines()
    top_terms_1 = lines[-1].strip().split(" ")

In [None]:
len(top_terms_1)

1464

In the first assignemnt we ended up with 1464 terms in the vocabulary.

In [None]:
extra = [word for word in top_terms_model if word not in top_terms_1]
missing = [word for word in top_terms_1 if word not in top_terms_model]

In [None]:
len(extra)

1249

In [None]:
len(missing)

713

Based solely on the number of extra words, and the number of missing words we can conclud that these two results differ greatly.

# PART 3

### Import libraries

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, StringIndexer, ChiSqSelector, Normalizer
from pyspark.ml import PipelineModel, Pipeline
from pyspark.ml.feature import PCA
import difflib

spark = SparkSession.builder.appName("TextClassification_Pt2").getOrCreate()

df = spark.read.json("reviews_devset.json").select("reviewText", "category")
df.printSchema()

spark = SparkSession.builder \
    .appName("TextClassification") \
    .getOrCreate()

root
 |-- reviewText: string (nullable = true)
 |-- category: string (nullable = true)



# Split the data

In [5]:
# Split the data into training (70%) and test (30%) sets
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)

## Extend the pipeline and Train the model 

In [6]:
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="allWords", pattern=r"[\s\t\d\[\]\{\}().!?,;:+=\-_\"'`~#@&*%€$§\\/]+") # converts to lowercase and then tokenizes
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="words")
cv = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="rawFeatures")
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") # category has to be numeric
selector = ChiSqSelector(numTopFeatures=2000, featuresCol=cv.getOutputCol(), outputCol="selectedFeatures", labelCol=indexer.getOutputCol())
normalizer = Normalizer(inputCol=selector.getOutputCol(), outputCol="normFeatures")
svc = LinearSVC(maxIter=10, regParam=0.1)
ovr = OneVsRest(classifier=svc, featuresCol="normFeatures", labelCol="categoryIndex")

pipeline = Pipeline(stages=[tokenizer, remover, cv, indexer, selector, normalizer, ovr])

# Fit the model
# model = pipeline.fit(train_df)

### Grid Search for Parameter Optimization¶

In [12]:
paramGrid = ParamGridBuilder() \
    .addGrid(svc.regParam, [0.01, 0.1, 0.6]) \
    .addGrid(svc.maxIter, [3, 5]) \
    .build()

evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="f1")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [None]:
model1 = crossval.fit(train_df)

### Save the model 

In [None]:
model1.bestModel.write().overwrite().save("model_devset")

24/05/25 16:57:43 WARN TaskSetManager: Stage 6832 contains a task of very large size (1487 KiB). The maximum recommended task size is 1000 KiB.


### Load the saved model 

In [7]:
model_path = "model_devset"
loaded_model = PipelineModel.load(model_path)

### Transform train and test set 

In [8]:
loaded_model.transform(train_df).show()

+--------------------+------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+----------+
|          reviewText|          category|            allWords|               words|         rawFeatures|categoryIndex|    selectedFeatures|        normFeatures|       rawPrediction|prediction|
+--------------------+------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+----------+
|                    |              Book|                  []|                  []|       (80126,[],[])|          0.0|        (2000,[],[])|        (2000,[],[])|[-0.9507807344758...|       0.0|
|                    |              Book|                  []|                  []|       (80126,[],[])|          0.0|        (2000,[],[])|        (2000,[],[])|[-0.9507807344758...|       0.0|
|                    |     CDs_and_

In [9]:
predictions = loaded_model.transform(test_df)

In [10]:
predictions.show()

+--------------------+------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+----------+
|          reviewText|          category|            allWords|               words|         rawFeatures|categoryIndex|    selectedFeatures|        normFeatures|       rawPrediction|prediction|
+--------------------+------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+----------+
|                    |              Book|                  []|                  []|       (80126,[],[])|          0.0|        (2000,[],[])|        (2000,[],[])|[-0.9507807344758...|       0.0|
|                    |Sports_and_Outdoor|                  []|                  []|       (80126,[],[])|          7.0|        (2000,[],[])|        (2000,[],[])|[-0.9507807344758...|       0.0|
|                    |     Toys_and

### Create PCA 

In [26]:
pca = PCA(k=100, inputCol= selector.getOutputCol(), outputCol="pca_features")

### Extend the pipeline using pca 

In [27]:
pipeline = Pipeline(stages=[tokenizer, remover, cv, indexer, selector, normalizer, pca, ovr])

In [28]:
pca_model = pipeline.fit(train_df)

24/05/26 15:34:26 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/05/26 15:36:28 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

### Save the pca model and transform train and test sets 

In [30]:
pca_model.write().overwrite().save("pca_model_devset")

24/05/26 15:44:11 WARN TaskSetManager: Stage 1304 contains a task of very large size (1504 KiB). The maximum recommended task size is 1000 KiB.
24/05/26 15:44:26 WARN TaskSetManager: Stage 1317 contains a task of very large size (1603 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Load the saved pca model

In [37]:
model_path = "pca_model_devset"
loaded_model_pca = PipelineModel.load(model_path)

                                                                                

In [38]:
loaded_model_pca.transform(train_df).show()

24/05/26 15:57:47 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
[Stage 1644:>                                                       (0 + 1) / 1]

+--------------------+----------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|          reviewText|        category|            allWords|               words|         rawFeatures|categoryIndex|    selectedFeatures|        normFeatures|        pca_features|       rawPrediction|prediction|
+--------------------+----------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|                    |            Book|                  []|                  []|       (80870,[],[])|          0.0|        (2000,[],[])|        (2000,[],[])|[0.0,0.0,0.0,0.0,...|[-0.8677515927506...|       0.0|
|"A River Runs Thr...|            Book|[a, river, runs, ...|[river, runs, rem...|(80870,[1,2,4,12,...|          0.0|(2000,[1,2,4,11,1...|(2000,[1,2,4,11

                                                                                

In [39]:
pca_predictions = loaded_model_pca.transform(test_df).show()

24/05/26 15:58:25 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
[Stage 1645:>                                                       (0 + 1) / 1]

+--------------------+--------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|          reviewText|category|            allWords|               words|         rawFeatures|categoryIndex|    selectedFeatures|        normFeatures|        pca_features|       rawPrediction|prediction|
+--------------------+--------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|"Canada" is the f...|    Book|[canada, is, the,...|[canada, first, r...|(80870,[0,5,12,15...|          0.0|(2000,[0,5,11,14,...|(2000,[0,5,11,14,...|[-2.6607313311248...|[2.67063565264003...|       0.0|
|"Flee sexual immo...|    Book|[flee, sexual, im...|[flee, sexual, im...|(80870,[0,1,3,12,...|          0.0|(2000,[0,1,3,11,1...|(2000,[0,1,3,11,1...|[-5.0690469242637...|[1.4894937357

                                                                                

## Compare chi square overall top 2000 filtered features with another, heavier filtering with much less dimensionality

By fitting the training data in the into the pipeline model using pca (dimention reduction method)

In [40]:
chi_selector_model = loaded_model_pca.stages[4]
selected_features = chi_selector_model.selectedFeatures
count_vectorizer_model = loaded_model_pca.stages[2]

vocabulary = count_vectorizer_model.vocabulary
top_terms_pca = [vocabulary[index] for index in selected_features]
top_terms_pca[:20]

['book',
 'one',
 'great',
 'like',
 'good',
 'read',
 'well',
 'love',
 'time',
 'really',
 'much',
 'story',
 'also',
 'use',
 'first',
 'even',
 'product',
 'way',
 'work',
 'new']

In [41]:
with open("output_pca_model.txt", "w") as f:
    f.write(" ".join(top_terms_pca))

In [42]:
with open('output_ds.txt', 'r') as file1, open('output_pca_model.txt', 'r') as file2:
    file1_words = set(file1.read().split())
    file2_words = set(file2.read().split())

common_words = file1_words & file2_words
unique_to_file1 = file1_words - file2_words
unique_to_file2 = file2_words - file1_words

print(f'Number of words common in both files: {len(common_words)}')
print(f'Number of words unique to file1: {len(unique_to_file1)}')
print(f'Number of words unique to file2: {len(unique_to_file2)}')

print("unique words in output_ds:", unique_to_file1)
print("unique words in output_pca_model:", unique_to_file2)



Number of words common in both files: 1560
Number of words unique to file1: 440
Number of words unique to file2: 440
unique words in output_ds: {'thicker', 'format', 'finish', 'place', 'sized', 'deeper', 'defective', 'personalities', 'relief', 'dreams', 'waiting', 'explanations', 'alpha', 'amusing', 'wild', 'discovery', 'lighter', 'wooden', 'dust', 'day', 'curious', 'meeting', 'doubt', 'cartoon', 'beautifully', 'paranormal', 'useless', 'escape', 'killer', 'thing', 'de', 'excitement', 'editing', 'perfection', 'lift', 'politics', 'addicted', 'bold', 'shared', 'workouts', 'formula', 'blow', 'minds', 'city', 'job', 'lover', 'finally', 'classy', 'finding', 'ancient', 'media', 'facebook', 'surface', 'illustrated', 'helping', 'brain', 'joy', 'uncomfortable', 'explaining', 'foam', 'xbox', 'draws', 'secondary', 'beginner', 'helped', 'pretty', 'arm', 'driven', 'rocks', 'included', 'bar', 'blue', 'engaged', 'check', 'modem', 'wax', 'crisp', 'yoga', 'likes', 'colorful', 'money', 'roku', 'importanc

# F1 Score 

In [15]:
f1_score = evaluator.evaluate(predictions)

In [16]:
print(f1_score)

0.6614756080207185
