In [None]:
# Part 1) RDDs

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import json
import re
from collections import defaultdict

# for initializing the Spark context and spark session:

sc = SparkContext(appName="ChiSquareRDD")
spark = SparkSession.builder.appName("ChiSquareRDD").getOrCreate()

# below is function for loading stopwords from stopwords.txt:

def loading_stopwords(stopwords_file):
    with open(stopwords_file, 'r') as f:
        stopwords = {line.strip() for line in f}
    return stopwords

# now, the function for loading and preprocessing steps of the dataset:

def preprocessing(line, stopwords):
    review = json.loads(line)
    reviewText = review['reviewText']
    category = review['category']
    
    words = re.split(r'[^\w]+', reviewText.lower())
    words = [word for word in words if len(word) > 1 and word.isalpha() and word not in stopwords]   # to filter words only containing characters
    return (category, words)

# it will load stopwords from stopwords.txt:

stopwords_file = "src/stopwords.txt"
stopwords = loading_stopwords(stopwords_file)


data = sc.textFile("hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json")     # to use the reduced dataset for development
rdd = data.map(lambda line: preprocessing(line, stopwords)).cache()

# function for calculation of the chi-square values:

def chi_square(category_word_counts, total_counts, category_counts, total_docs):
    chi_square_value = defaultdict(float)
    for category, words in category_word_counts.items():
        for word, count in words.items():
            A = count
            B = category_counts[category] - count
            C = total_counts[word] - A
            N = total_docs
            D = N - (A + B + C)
            numerator = N * (A * D - B * C) ** 2
            denominator = (A + B) * (C + D) * (A + C) * (B + D)
            if denominator != 0:
                chi_square_value[(category, word)] = numerator / denominator
    return chi_square_value

# collecting category_word_count, total_count, category_count, total_docs using RDD's:

category_word_count = rdd.flatMapValues(lambda words: words) \
                          .map(lambda x: ((x[0], x[1]), 1)) \
                          .reduceByKey(lambda x, y: x + y) \
                          .map(lambda x: (x[0][0], {x[0][1]: x[1]})) \
                          .reduceByKey(lambda x, y: {**x, **y}) \
                          .collectAsMap()

total_count = rdd.flatMap(lambda x: set(x[1])) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda x, y: x + y) \
                  .collectAsMap()

category_count = rdd.map(lambda x: (x[0], 1)) \
                     .reduceByKey(lambda x, y: x + y) \
                     .collectAsMap()

total_docs = rdd.count()

# computing the chi-square value:

chi_square_value = chi_square(category_word_count, total_count, category_count, total_docs)

# now, to obtain top 75 terms for each category based on the chi-square values:

top_terms_per_category = {}
for category in category_word_count:
    sorted_terms = sorted([(word, chi_square_value[(category, word)]) for word in category_word_count[category]],
                          key=lambda x: -x[1])
    top_terms_per_category[category] = sorted_terms[:75]

# to obtain top terms from all categories:

top_terms = set()
for terms in top_terms_per_category.values():
    top_terms.update(term[0] for term in terms)

# creating a joined dictionary and for the output:

joined_dictionary = sorted(top_terms)

output_content = []    # to prepare the output 


# for writing the top terms per category:

for category in sorted(top_terms_per_category.keys()):
    terms = top_terms_per_category[category]
    output_content.append(f"Category: {category}")
    for term in terms:
        output_content.append(f"{term[0]}: {term[1]:.4f}")   
    output_content.append("\n")
    

# for seaparating and leaving some space between the chi square values of terms and the dictionary:

output_content.append("=" * 100)
output_content.append("DICTIONARY:")
output_content.append("=" * 100)

# to add the joined dictionary in the output file:

output_content.append(" ".join(joined_dictionary))

# for creating the output file with all the results:

output_file = "src/output_rdd.txt"
with open(output_file, "w") as f:
    for line in output_content:
        f.write(line + "\n")

sc.stop()


In [None]:
# Part 2) Datasets/DataFrames: Spark ML and Pipelines

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, lower
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, ChiSqSelector, StringIndexer

# to initialize Spark context:

sc = SparkContext(appName="ChiSquareDF")
spark = SparkSession.builder.appName("ChiSquareDF").getOrCreate()

# for loading the stopwords:

stopwords = set()
with open("src/stopwords.txt", "r") as f:
    for line in f:
        stopwords.add(line.strip())

# now, function to load and preprocess the dataset:

def load_and_preprocess_data(file_path):
    data = spark.read.json(file_path)
    # Normalize text
    data = data.withColumn("text", lower(col("reviewText")))
    data = data.withColumn("text", regexp_replace(col("text"), "[^a-zA-Z\\s]", ""))
    return data.select(col("category"), col("text"))

# loading data from the amazon review dataset for development:

data = load_and_preprocess_data("hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json")

# to define the structure and different stages of the pipeline:

indexer = StringIndexer(inputCol="category", outputCol="label")
tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=list(stopwords))
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features", vocabSize=2000)
idf = IDF(inputCol="raw_features", outputCol="features")
selector = ChiSqSelector(numTopFeatures=2000, featuresCol="features", outputCol="selected_features", labelCol="label")

# Creating of a pipeline:

pipeline = Pipeline(stages=[indexer, tokenizer, stopwords_remover, vectorizer, idf, selector])

# for fitting the pipeline to the given data:

model = pipeline.fit(data)

# for transforming the data:

result = model.transform(data)

# to extract the selected features (terms/words) from the data:

selected_indices = model.stages[-1].selectedFeatures
vocab = model.stages[3].vocabulary
selected_terms = [vocab[i] for i in selected_indices]

# Collection of the TF-IDF vectors for the first 2000 terms:

tfidf_vectors_rdd = result.select("selected_features").rdd.map(lambda row: row.selected_features.toArray())
tfidf_vectors = tfidf_vectors_rdd.take(2000)  # Limiting to 2000 terms for memory efficiency

# writing and finally saving the selected terms and their corresponding TF-IDF vectors to a file:

output_file = "src/output_ds.txt"
with open(output_file, "w") as f:
    for term, vec in zip(selected_terms, tfidf_vectors):
        f.write(f"{term}: {vec}\n")


sc.stop()    # to stop the Spark context


In [None]:
# Part 3) Text Classification

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, Normalizer, StringIndexer, ChiSqSelector
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder

# initializing Spark context:

sc = SparkContext(appName="TextClassification")
spark = SparkSession.builder.appName("TextClassification").getOrCreate()

# Loading the stopwords:

stopwords = set()
with open("src/stopwords.txt", "r") as f:
    for line in f:
        stopwords.add(line.strip())

# Loading and preprocessing the dataset:

def load_and_preprocess_data(file_path):
    data = spark.read.json(file_path)
    data = data.withColumn("text", lower(col("reviewText")))
    data = data.withColumn("text", regexp_replace(col("text"), "[^a-zA-Z\\s]", ""))
    return data.select(col("category"), col("text"))

# to load data from reviews dataset:

data = load_and_preprocess_data("hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json")

# to define stages of the pipeline:

indexer = StringIndexer(inputCol="category", outputCol="label")
tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=list(stopwords))
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=2000)
idf = IDF(inputCol="raw_features", outputCol="features")
selector = ChiSqSelector(numTopFeatures=2000, featuresCol="features", outputCol="selected_features", labelCol="label")
normalizer = Normalizer(inputCol="selected_features", outputCol="normalized_features", p=2.0)
svm = LinearSVC(featuresCol="normalized_features", labelCol="label")
ovr_classifier = OneVsRest(classifier=svm)

# to create a pipeline wityh various stages:

pipeline = Pipeline(stages=[indexer, tokenizer, stopwords_remover, hashingTF, idf, selector, normalizer, ovr_classifier])

# for splitting the data into training, validation, and test sets:

(training_data, validation_data, test_data) = data.randomSplit([0.7, 0.2, 0.1], seed=12345)

# to define parameter grid for grid search (regularization parameter (3 values), standardization of training features (2 values), and maximum number of iterations (2 values)):

param_grid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [500, 2000]) \
    .addGrid(svm.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(svm.maxIter, [10, 20]) \
    .build()

# evaluator for Multiclass Classification (F1 score):

evaluator = MulticlassClassificationEvaluator(metricName="f1")

# to perform grid search manually:

for params in param_grid:
    model = pipeline.copy(params).fit(training_data)
    predictions = model.transform(validation_data)
    f1_score = evaluator.evaluate(predictions)
    print(f"Parameters: {params}, F1 Score on Validation Set: {f1_score}")

# to make predictions on test data:

    predictions_test = model.transform(test_data)
    f1_score_test = evaluator.evaluate(predictions_test)
    print(f"F1 Score on Test Set with Parameters {params}: {f1_score_test}")

sc.stop()
