# Part 1: RDDs

Setup Spark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ChiSquaredRDD").getOrCreate()
sc = spark.sparkContext

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
23/05/25 10:04:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/25 10:04:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/25 10:04:15 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/05/25 10:04:15 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/05/25 10:04:15 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/05/25 10:04:1

## Load reviews as RDD

In [2]:
# review_path = "hdfs:///user/dic23_shared/amazon-reviews/full/reviewscombined.json"
review_path = "hdfs:///user/dic23_shared/amazon-reviews/full/reviews_devset.json"
# review_path = "hdfs:///user/e11809642/reviews/reduced_devset.json"
input_rdd = sc.textFile(review_path)

## Obtain stopwords

Load stopwords into (local) memory (Note: file contains duplicates, so convert to set)

In [48]:
stopwords_path = 'stopwords.txt'

def load_unique_lines(filename):
    lines = set()

    with open(filename, 'r') as file:
        for line in file:
            line = line.strip()  # Remove leading/trailing whitespace and newline characters
            lines.add(line)

    return lines


stopwords = load_unique_lines(stopwords_path) 

Note: local variables like this one will be automatically broadcast to all data nodes if accessed in any RDD transformation

## Parse JSON strings, extract the category + review text

In [5]:
import json
category_review_rdd = input_rdd \
    .map(lambda json_str: json.loads(json_str)) \
    .map(lambda json_obj: (json_obj['category'], json_obj['reviewText']))

## Compute total number of documents

In [35]:
review_count = category_review_rdd.count()

                                                                                

## Compute number of documents per category

define RDD with required transformations

In [37]:
category_counts_rdd = category_review_rdd \
    .map(lambda pair: (pair[0], 1)) \
    .reduceByKey(lambda x, y: x + y)

Next, collect the number of documents per category (values of the RDD above) into a local dictionary. This dict is really small (one key-value pair for each category) and will easily fit into memory on the datanodes.

In [127]:
category_counts = dict(category_counts_rdd.collect())
#category_counts = category_counts_rdd.collect()

                                                                                

In [128]:
category_counts['Musical_Instrument']

500

## Obtain number of ocurrences of each term by category

In [97]:
# define pattern for splitting/tokenizing
import re
pattern = re.compile(r"[^a-zA-Z<>^|]+")

def map_review_data(pair):
    category, review_text = pair
    # obtain terms via tokenization followed by stopword removal
    terms = [
        t
        for t in set(token.lower() for token in pattern.split(review_text))
        if t not in stopwords and len(t) >= 2
    ]
    return [((term, category), 1) for term in terms]

def remap(pair):
    term_and_cat, count = pair
    term, cat = term_and_cat
    return term, (cat, count)

term_cat_occ_rdd = (
    category_review_rdd.flatMap(
        map_review_data
    )
    .reduceByKey(lambda x, y: x + y)
    .map(remap) # can I avoid having to do this somehow?
)

In [98]:
term_cat_occ_rdd.take(3)

                                                                                

[('mic', ('Musical_Instrument', 24)),
 ('setups', ('Musical_Instrument', 1)),
 ('tape', ('Musical_Instrument', 4))]

## Compute the number of occurrences of each term across all reviews

In [84]:
term_occ_rdd = term_cat_occ_rdd \
    .map(lambda pair: (pair[0][0], pair[1])) \
    .reduceByKey(lambda x, y: x + y)

## Calculate Chi-square

In [141]:
def calculate_chi_square(pair):
    term, term_counts_for_categories = pair
    doc_count_for_cat = dict(term_counts_for_categories) # use to retrieve no. of documents containing term for a particular category
    total_doc_count_for_term = sum(doc_count_for_cat.values()) # total number of documents containing the term
    
    term_and_cat_chi_squared = []
    
    for category, count in doc_count_for_cat.items():
        a = count # number of documents in c which contain t
        b = total_doc_count_for_term - a # number of documents not in c which contain t
        total_doc_count_for_cat = category_counts[category] # total no. of documents for current category
        c = total_doc_count_for_cat - a # number of documents in c without t
        d = review_count - a - b - c # number of documents not in c without t
        term_and_cat_chi_squared.append(
            (
                (term, category),
                review_count * (a * d - b * c) ** 2 / ((a + b) * (a + c) * (b + d) * (c + d))
            )
        )
    return term_and_cat_chi_squared

In [142]:
# quick test
cat1, cat2, cat3 = [c for c in category_counts.keys()][:3]
pair = ('term', [(cat1, 10), (cat2, 5), (cat3, 3)])
calculate_chi_square(pair)

[(('term', 'Kindle_Store'), 122.37658536801779),
 (('term', 'Electronic'), 6.4167149414309375),
 (('term', 'Movies_and_TV'), 3.8321073999754605)]

In [143]:
# Compute the chi-squared value for each unique term and category pair
# (term, category) -> chi-square
term_cat_chi_squared_rdd = (term_cat_occ_rdd
    .map(calculate_chi_square)
    .groupByKey()
)

In [144]:
term_cat_chi_squared_rdd.take(3)

23/05/25 17:57:33 WARN TaskSetManager: Lost task 0.0 in stage 136.0 (TID 190) (dn08.os.hpc.tuwien.ac.at executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 619, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 609, in process
    out_iter = func(split_index, iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 417, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2236, in combine
    merger.mergeValues(iterator)
  File "/usr/lib/spark/python/pyspark/shuffle.py", line 240, in mergeValues
    for k, v in iterator:
  File "/usr/lib/spark/python/pyspark/util.py", line 74, in wra

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage 136.0 (TID 196) (dn08.os.hpc.tuwien.ac.at executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 619, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 609, in process
    out_iter = func(split_index, iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 417, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2236, in combine
    merger.mergeValues(iterator)
  File "/usr/lib/spark/python/pyspark/shuffle.py", line 240, in mergeValues
    for k, v in iterator:
  File "/usr/lib/spark/python/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_3695275/3218419316.py", line 3, in calculate_chi_square
ValueError: dictionary update sequence element #0 has length 20; 2 is required

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:556)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:762)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:744)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:509)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 619, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 609, in process
    out_iter = func(split_index, iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2918, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 417, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2236, in combine
    merger.mergeValues(iterator)
  File "/usr/lib/spark/python/pyspark/shuffle.py", line 240, in mergeValues
    for k, v in iterator:
  File "/usr/lib/spark/python/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_3695275/3218419316.py", line 3, in calculate_chi_square
ValueError: dictionary update sequence element #0 has length 20; 2 is required

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:556)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:762)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:744)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:509)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [11]:
# Select the top 75 tokens with the highest chi-square value for each category
# (category) -> [(token, chi-square)]
chi_square_rdd = term_category_chi_squared_rdd \
    .map(lambda pair: (pair[0], sorted(pair[1], key=lambda x: x[1], reverse=True)[:75]))

NameError: name 'term_category_chi_squared_rdd' is not defined

In [None]:
# Select all unique tokens from the top 75 tokens with the highest chi-square value for each category
tokens = chi_square_rdd \
    .flatMap(lambda pair: (token for token, chi_square in pair[1])) \
    .distinct() \
    .collect()

In [None]:
# Sort the tokens in alphabetical order
tokens.sort()

In [None]:
chi_square_rdd = chi_square_rdd.sortByKey()

# Save the top 75 tokens with the highest chi-square value for each category to a file in the local file system
# in the format: "<category> term1:chi_squared1 term2:chi_squared2 ... term75:chi_squared75" for each line and append the list of tokens to the end of the file
with open("chi_squared.txt", "a") as file:
    for pair in chi_square_rdd.collect():
        file.write("<%s>" % pair[0] + " ")
        for token, chi_square in pair[1]:
            file.write("%s:%f" % (token, chi_square) + " ")
        file.write("\n")
    file.write(" ".join(tokens) + "\n")

In [None]:
sc.stop()
