# 1. Setup & Configuration

In [1]:
# Import required packages and initialize Spark
import json
import logging
import time
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Part1_ChiSquare").getOrCreate()
sc = spark.sparkContext

# Set up logging for better visibility of stages
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("DIC25_Part1")

print("✅ Spark session initialized.")


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4047. Attempting port 4048.
25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4048. Attempting port 4049.
25/05/13 19:06:05 WARN Utils: Service 'SparkUI' could not bind on port 4049. Attempting port 4050.
25/05/13 1

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 46110)
Traceback (most recent call last):
  File "/usr/lib64/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/lib/spark/python/pyspark/serializers.py", lin

In [2]:
# Define all required paths
REVIEWS_DEVSET_PATH = "hdfs:///user/dic25_shared/amazon-reviews/full/reviews_devset.json"
STOPWORDS_PATH = "../stopwords.txt"
OUTPUT_PATH = "output_rdd.txt"

# Verify current working directory and file presence
import os
print("📁 Current working directory:", os.getcwd())
print("📂 Files in directory:")
for fn in sorted(os.listdir(".")):
    print("  ", fn)

📁 Current working directory: /home/e12433762/DIC2025_Ex2/src
📂 Files in directory:
   .ipynb_checkpoints
   Assignment_2_pt1.ipynb
   output


## Load Data and Stopwords

Load the Amazon reviews development set from HDFS and stopword list from a local file. Only reviews with both category and reviewText fields are kept.


In [3]:
# Load stopwords into a Python set
def load_stopwords(path):
    with open(path, encoding="utf-8") as f:
        return set(w.strip() for w in f if w.strip())

stopwords = load_stopwords(STOPWORDS_PATH)
log.info("📌 Loaded %d stopwords.", len(stopwords))

INFO:DIC25_Part1:📌 Loaded 591 stopwords.


In [4]:
# Load the reviews as an RDD of (category, reviewText)
def get_reviews_rdd(path: str):
    df = (
        spark.read
             .json(path)
             .select("category", "reviewText")
             .na.drop(subset=["category", "reviewText"])
    )
    return df.rdd.map(lambda row: (row["category"], row["reviewText"]))

# Timer: Load reviews and cache for reuse
start = time.time()
reviews = get_reviews_rdd(REVIEWS_DEVSET_PATH).cache()
count = reviews.count()
log.info("📈 Loaded %d reviews in %.2f seconds.", count, time.time() - start)

INFO:DIC25_Part1:📈 Loaded 78829 reviews in 18.10 seconds.                      


# 2.Preprocessing Functions & RDD Creation

## Preprocessing and Tokenization

Reviews are tokenized using the specified delimiters, lowercased, and filtered for stopwords and one-character terms.


In [5]:
# Preprocessing: tokenization, lowercasing, stopword removal, length filtering
import re

DELIMITERS = (
    r'[\s\t\d\(\)\[\]\{\}\.!?,;:+=\-_"]|\'|`|~|#|@|&|%|\*|\\/|\u20AC|\$|\u00A7'
)
token_split_re = re.compile(DELIMITERS)

def preprocess_record(record):
    category, text = record
    tokens = token_split_re.split(text.lower())
    filtered = [t for t in tokens if t and t not in stopwords and len(t) > 1]
    return category, filtered

# Apply preprocessing and cache result
start = time.time()
cat_tokens = reviews.map(preprocess_record).cache()
cat_tokens_count = cat_tokens.count()
log.info("🧹 Preprocessed %d records in %.2f seconds.", cat_tokens_count, time.time() - start)

# Quick sample to verify
for cat, toks in cat_tokens.take(3):
    log.info("📖 Category: %s → %d tokens", cat, len(toks))

INFO:DIC25_Part1:🧹 Preprocessed 78829 records in 3.19 seconds.                 
INFO:DIC25_Part1:📖 Category: Patio_Lawn_and_Garde → 31 tokens
INFO:DIC25_Part1:📖 Category: Patio_Lawn_and_Garde → 34 tokens
INFO:DIC25_Part1:📖 Category: Patio_Lawn_and_Garde → 32 tokens


# 3. Token‐Category Counts with Spark

## Step: Compute A (term present in category)
Emit ((term, category), 1) once per document where term occurs. This gives A: documents in category where the term appears.


In [6]:
# Count in how many documents each token appears per category (document-level frequency)

def map_record_to_doc_flags(records):
    """
    For each document (category, tokens), emit ((token, category), 1)
    only once per document. This ensures document-level statistics for chi-square.

    Input: iterator of (category, tokens)
    Output: ((token, category), 1) for each unique token in the document
    """
    for category, tokens in records:
        unique_tokens = set(tokens)  # Remove duplicates within the document
        for token in unique_tokens:
            yield ((token, category), 1)

# Apply the transformation and reduce by key
start = time.time()
token_cat_doc_counts = (
    cat_tokens.mapPartitions(map_record_to_doc_flags)
              .reduceByKey(lambda a, b: a + b)  # Sum document counts per (token, category)
              .cache()
)

log.info("📄 Counted document-level (token, category) pairs in %.2f seconds.", time.time() - start)
log.info("Sample document-level token-category counts: %s", token_cat_doc_counts.take(3))


INFO:DIC25_Part1:📄 Counted document-level (token, category) pairs in 0.14 seconds.
INFO:DIC25_Part1:Sample document-level token-category counts: [(('studio', 'CDs_and_Vinyl'), 92), (('warm', 'CDs_and_Vinyl'), 42), (('stafford', 'CDs_and_Vinyl'), 1)]


# 4. Count in how many documents each token appears (global document frequency)

## Step: Compute A + B (term present anywhere)
Map ((term, category), count) → (term, count) and aggregate to get total doc count per term.


In [7]:
# Compute document frequency per token: number of documents the token appeared in (across all categories)

start = time.time()
token_doc_freqs = (
    token_cat_doc_counts  # use document-level counts!
    .map(lambda x: (x[0][0], x[1]))  # From ((token, category), count) to (token, doc_count)
    .reduceByKey(lambda a, b: a + b)  # Total document count per token
    .cache()
)

log.info("📊 Computed document frequency per token in %.2f seconds.", time.time() - start)
log.info("Sample token document frequencies: %s", token_doc_freqs.take(3))

INFO:DIC25_Part1:📊 Computed document frequency per token in 0.08 seconds.
INFO:DIC25_Part1:Sample token document frequencies: [('insight', 429), ('things', 3563), ('open', 1305)]


# 5. Count number of documents in each category (category document frequency)

## Step: Compute category and total document counts
This prepares:

Total docs in a category (C + A)

Total number of documents (A+B+C+D)

These are used to derive C and D.


In [8]:
# Compute number of documents per category (df_c), and total document count

start = time.time()
docs_per_cat = (
    cat_tokens
    .map(lambda x: (x[0], 1))
    .reduceByKey(lambda a, b: a + b)
    .cache()
)

total_docs = docs_per_cat.map(lambda x: x[1]).sum()
log.info("📁 Category document counts: %s", docs_per_cat.collect())
log.info("🧮 Total number of documents: %d", total_docs)
log.info("⏱️ Category doc count done in %.2f seconds.", time.time() - start)


INFO:DIC25_Part1:📁 Category document counts: [('Kindle_Store', 3205), ('Electronic', 7825), ('Movies_and_TV', 4607), ('Tools_and_Home_Improvement', 1926), ('Grocery_and_Gourmet_Food', 1297), ('Apps_for_Android', 2638), ('Book', 22507), ('Toys_and_Game', 2253), ('Office_Product', 1243), ('Digital_Music', 836), ('Automotive', 1374), ('Beauty', 2023), ('Patio_Lawn_and_Garde', 994), ('Sports_and_Outdoor', 3269), ('Musical_Instrument', 500), ('CDs_and_Vinyl', 3749), ('Clothing_Shoes_and_Jewelry', 5749), ('Home_and_Kitche', 4254), ('Cell_Phones_and_Accessorie', 3447), ('Pet_Supplie', 1235), ('Baby', 916), ('Health_and_Personal_Care', 2982)]
INFO:DIC25_Part1:🧮 Total number of documents: 78829
INFO:DIC25_Part1:⏱️ Category doc count done in 1.14 seconds.


# 6. Chi-square calculation using document-level frequencies

## Step: Compute Chi-square Scores from A, B, C, D

This step constructs a 2×2 contingency table for each (term, category) pair and calculates the corresponding chi-square score.

### The contingency table is based on:

A: Count of documents containing the term and belonging to the category

B: Count of documents containing the term but not belonging to the category

C: Count of documents not containing the term but belonging to the category

D: Count of documents neither containing the term nor belonging to the category

### These values are derived as follows:

A: From (term, category) document frequency

B: Term total across all documents minus A

C: Category total minus A

D: Total documents − A − B − C

The chi-square statistic is then computed for each pair.

In [9]:
# 6. Chi-square calculation using broadcasted document-level counts (normalized version)

def compute_chi2_top_terms(token_cat_doc_counts_rdd, 
                           category_doc_counts_rdd, 
                           token_doc_freqs_rdd, 
                           top_k=75):
    """
    Computes top-K tokens by chi-square score per category using a stable and normalized formula.

    Inputs:
      token_cat_doc_counts_rdd: RDD[((token, category), A)]
      category_doc_counts_rdd:   RDD[(category, df_c)]
      token_doc_freqs_rdd:       RDD[(token, df_t)]
      top_k: number of top terms per category to select

    Returns:
      RDD[(category, [(token, chi2)])] with top-k terms per category
    """

    # 1. Collect small side-tables and broadcast
    category_doc_map = category_doc_counts_rdd.collectAsMap()
    token_doc_map = token_doc_freqs_rdd.collectAsMap()
    total_docs = sum(category_doc_map.values())

    bc_cat_docs = sc.broadcast(category_doc_map)
    bc_token_docs = sc.broadcast(token_doc_map)

    # 2. Compute chi-square with normalized denominator
    def compute_chi2(kv):
        (token, category), A = kv
        N_i = bc_cat_docs.value.get(category, 0)     # df_c
        n_j = bc_token_docs.value.get(token, 0)      # df_t
        N = total_docs

        B = N_i - A
        C = n_j - A
        D = N - A - B - C

        num = (A * D - B * C) ** 2
        den = (A + B) * (C + D) * (A + C) * (B + D)
        chi2 = (N * num / den) if den != 0 else 0.0

        return (category, (token, chi2))

    chi2_rdd = token_cat_doc_counts_rdd.map(compute_chi2)

    # 3. Group by category and take top-K
    top_rdd = (
        chi2_rdd
        .groupByKey()
        .mapValues(lambda seq: sorted(seq, key=lambda x: -x[1])[:top_k])
        .cache()
    )

    return top_rdd

# Run updated version
start = time.time()

chi_scores = compute_chi2_top_terms(
    token_cat_doc_counts_rdd=token_cat_doc_counts,
    category_doc_counts_rdd=docs_per_cat,
    token_doc_freqs_rdd=token_doc_freqs,
    top_k=75
)

log.info("✅ Normalized chi-square calculation done in %.2f seconds.", time.time() - start)
print("Sample χ² top-75 terms for a few categories:")
for category, term_list in chi_scores.take(5):
    print(f"{category} → {term_list}")

INFO:DIC25_Part1:✅ Normalized chi-square calculation done in 0.83 seconds.


Sample χ² top-75 terms for a few categories:


[Stage 19:>                                                         (0 + 2) / 2]

Apps_for_Android → [('games', 3086.222198300114), ('play', 2166.658247313346), ('graphics', 1541.6385306131185), ('kindle', 1474.0208809190092), ('addictive', 1311.905562727777), ('challenging', 1038.1284558527927), ('coins', 1015.5899629723237), ('addicting', 990.8441134974868), ('fire', 961.4880765608062), ('levels', 828.0306568656633), ('playing', 693.7937006696752), ('ads', 651.3749934344062), ('puzzles', 583.3218622383833), ('apps', 548.7810653104153), ('free', 502.34201866039217), ('bingo', 409.2358492981346), ('mahjong', 322.00891943980963), ('download', 308.794407744245), ('faotd', 288.8577201586641), ('facebook', 282.51705437029005), ('downloaded', 262.77022492215735), ('hints', 242.61029019440056), ('android', 213.37008082383736), ('solitaire', 211.6429957838186), ('gameplay', 198.5123356770461), ('unlock', 190.27341706776008), ('freezes', 189.67737127837006), ('played', 180.78646373662968), ('deleted', 179.2243589462116), ('bought', 174.4587211734982), ('flappy', 173.3058369

                                                                                

# 8. Merge chi-squared outputs

**Why we don’t need a separate “merge χ² outputs” step in this implementation**

In this notebook, the true final output is written in the very next cell using pure Python:

1. We call `chi2_top75.collect()` (see the “Collect and sort…” cell) to build `cat_lines_sorted`.  
2. We then open `output/output_rdd.txt` and write each category line plus the merged dictionary in one go with a simple `open(..., "w")` loop.

Because that Python block produces the exact `output_rdd.txt` we need, a preceding Spark-based cleanup/merging `coalesce(1).saveAsTextFile("output/merged_chi2_output.txt")` would be redundant.

# 9. Final Formatting: Sorting, Token Dictionary, and Output File

In [10]:
# 1) Collect and sort the (category, [ (term, χ²), ... ]) lines
cat_lines = chi_scores.collect()  # e.g. [("Books", [("author", 123.4), ...]), ...]

# Convert to string format: "term1:score1 term2:score2 ..."
cat_lines = [
    (cat, " ".join(f"{tok}:{score:.4f}" for tok, score in toks))
    for cat, toks in cat_lines
]

# Sort categories alphabetically
cat_lines_sorted = sorted(cat_lines, key=lambda kv: kv[0])

# 2) Compute the set of all unique tokens from the formatted term strings
unique_terms = {
    term_score.split(":", 1)[0]
    for _, term_list in cat_lines_sorted
    for term_score in term_list.split()
}
unique_terms_sorted = sorted(unique_terms)

# 3) Write the output in required format
import os
os.makedirs("output", exist_ok=True)

with open("output/output_rdd.txt", "w", encoding="utf-8") as fout:
    for category, term_list in cat_lines_sorted:
        fout.write(f"{category} {term_list}\n")
    fout.write(" ".join(unique_terms_sorted) + "\n")

print("✅ Wrote final formatted output to output/output_rdd.txt")


✅ Wrote final formatted output to output/output_rdd.txt
