# Part 2

In [1]:
#open spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("A2-Part2-Pipeline").getOrCreate()

#load reviews and stopwords
DEBUG = True
if DEBUG:
    RAW_PATH = "hdfs:///user/dic25_shared/amazon-reviews/full/reviews_devset.json"
else:
    RAW_PATH = "hdfs:///user/dic25_shared/amazon-reviews/full/reviewscombined.json"
stopwordsPath = "Exercise_1/stopwords.txt"
#define structure of json for faster reading
from pyspark.sql import types as T
review_schema = T.StructType([
     T.StructField("reviewerID",      T.StringType(),  True),
     T.StructField("asin",            T.StringType(),  True),
     T.StructField("reviewerName",    T.StringType(),  True),
     T.StructField("helpful",         T.ArrayType(T.IntegerType()), True),
     T.StructField("reviewText",      T.StringType(),  True),
     T.StructField("overall",         T.FloatType(),   True),
     T.StructField("summary",         T.StringType(),  True),
     T.StructField("unixReviewTime",  T.LongType(),    True),
     T.StructField("reviewTime",      T.StringType(),  True),
     T.StructField("category",        T.StringType(),  True),
 ])
#read and select category and review
df = (
    spark.read
         .schema(review_schema)
         .json(RAW_PATH)
         .selectExpr("reviewText AS text",
                     "category")
    .na.drop(subset=["text", "category"])
)

# reading the stopwords
stopwords = spark.sparkContext.textFile(stopwordsPath).collect()


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/05/13 00:48:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/13 00:48:48 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/05/13 00:48:50 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


                                                                                

Build the pipeline

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    RegexTokenizer,
    StopWordsRemover,
    CountVectorizer,
    IDF,
    ChiSqSelector,
    StringIndexer
)

# 1 Tokenisation and lower-casing via RegexTokenizer
tokenizer = RegexTokenizer(
    inputCol="text",
    outputCol="tokens",
    pattern=r"""[ \t0-9()\[\]{}.!?,;:+=\-_"'`~#@&*%€$§\\/]+""",  # delimiters
    gaps=True,                # pattern defines the split points
    toLowercase=True,
)

# 2 Stopword removal
stopper = StopWordsRemover(inputCol="tokens",stopWords = stopwords, outputCol="clean_tokens")


# 3 Vectorizing
cv = CountVectorizer(
    inputCol="clean_tokens",
    outputCol="tf",
    minDF=2,
    vocabSize=50_000, 
)

# 4 IDF weighting
idf = IDF(inputCol="tf", outputCol="tfidf")

# 5 encode the category column from string to int
encoder = StringIndexer(inputCol="category", outputCol="label")

# 6 select top 2000 terms by chi²
selector = ChiSqSelector(
    numTopFeatures=2000,
    featuresCol="tfidf",
    outputCol="chi2_features",
    labelCol="label",
)

# 7 Pipeline assembly
pipeline = Pipeline(stages=[tokenizer, stopper, cv, idf, encoder, selector])

Fit pipeline

In [3]:
# Fit pipeline
spark.conf.set("spark.sql.shuffle.partitions", "128")
df.persist() # persisting intermediate output, better for large datasets
model = pipeline.fit(df)
df.unpersist()

# Extract vocabulary & selected indices
vocab = model.stages[2].vocabulary                        # get vocabulary from Vecorizer
selected = model.stages[-1].selectedFeatures             # index of term after selector

selected_terms = [vocab[i] for i in selected]            # map indices to terms

                                                                                

25/05/13 00:50:07 WARN DAGScheduler: Broadcasting large task binary with size 1243.4 KiB
25/05/13 00:50:07 WARN DAGScheduler: Broadcasting large task binary with size 1245.5 KiB


                                                                                

25/05/13 00:50:16 WARN DAGScheduler: Broadcasting large task binary with size 1247.5 KiB


                                                                                

Output file

In [4]:
# Saves top 2000 terms to output_ds.txt (one term per line)
import pathlib, os, codecs

out_file = pathlib.Path("output_ds.txt")
out_file.write_text("\n".join(selected_terms), encoding="utf-8")
print(f"Wrote {len(selected_terms)} terms to {out_file.resolve()}")

Wrote 2000 terms to /home/e12427512/Exercise_2/src/output_ds.txt


In [5]:

# ────────────────────────────────────────────────────────────────────────
# Optional: automatic comparison with Assignment 1
# ────────────────────────────────────────────────────────────────────────
# old_terms = pathlib.Path("assignment1_terms.txt").read_text(encoding="utf-8").splitlines()
# old_set, new_set = set(old_terms), set(selected_terms)
# print("\n🔹  Terms kept in *both* assignments:", len(old_set & new_set))
# print("🔸  Terms only in Assignment 1:",      len(old_set - new_set))
# print("🔸  Terms only in Spark pipeline:",    len(new_set - old_set))
# (You can also diff the two files directly with any text-diff tool.)


In [6]:
# Step 1: Read the file and get the last line (merged vocabulary)
lines = pathlib.Path("../Exercise_1/src/output_dev.txt").read_text(encoding="utf-8").splitlines()
merged_vocab_line = lines[-1]  # This is the merged vocab line

# Step 2: Split merged vocab into terms
old_terms = merged_vocab_line.strip().split()
old_set = set(old_terms)

# Step 3: Load selected terms from Spark pipeline (output_ds.txt)
new_terms = pathlib.Path("output_ds.txt").read_text(encoding="utf-8").splitlines()
new_set = set(term.strip() for term in new_terms)

# Step 4: Compare sets
common_terms = old_set & new_set
only_in_old = old_set - new_set
only_in_new = new_set - old_set

# Step 5: Print results
print(f"Terms in BOTH assignments: {len(common_terms)}")
print(f"Terms ONLY in Assignment 1: {len(only_in_old)}")
print(f"Terms ONLY in Spark pipeline (Assignment 2): {len(only_in_new)}")
print(f"Overlap: {len(common_terms) / len(new_set) * 100:.2f}% of Spark-selected terms are also in Assignment 1")

FileNotFoundError: [Errno 2] No such file or directory: '../Exercise_1/src/output_dev.txt'

# Part 3 
// Not working yet

In [16]:
df_3 = model.transform(df).select("label","chi2_features").toDF("label","chi2_features")
display(df_3)

DataFrame[label: double, chi2_features: vector]

In [17]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.classification import LinearSVC,  OneVsRest
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import (Tokenizer, StopWordsRemover, 
                               CountVectorizer, IDF, 
                               ChiSqSelector, Normalizer, 
                               StringIndexer)
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
spark.sparkContext.setLogLevel("ERROR")   # WARN-Meldungen verschwinden

In [21]:
# Complete Text Processing Pipeline

# 1. Convert category to numeric index
label_indexer = StringIndexer(inputCol="category", outputCol="label_index")

# 2. Text processing pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopper = StopWordsRemover(inputCol="words", outputCol="filtered_words")
cv = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
selector = ChiSqSelector(featuresCol="tfidf_features", 
                       outputCol="selected_features",
                       labelCol="label_index",
                       numTopFeatures=2000)
selector_heavy = ChiSqSelector(featuresCol="tfidf_features",
                             outputCol="selected_features_heavy",
                             labelCol="label_index",
                             numTopFeatures=500)  # Heavy filtering
normalizer = Normalizer(inputCol="selected_features", 
                      outputCol="scaled_features",
                      p=2.0)

# 3. Classifier setup
binary_svm = LinearSVC(featuresCol="scaled_features",
                     labelCol="label_index",
                     maxIter=50,
                     regParam=0.1)

ovr = OneVsRest(classifier=binary_svm,
              featuresCol="scaled_features",
              labelCol="label_index")

# 4. Full pipeline
pipeline = Pipeline(stages=[
    label_indexer,
    tokenizer,
    stopper,
    cv,
    idf,
    selector,
    normalizer,
    ovr
])

full_param_grid = (ParamGridBuilder()
    .addGrid(selector.numTopFeatures, [500, 2000])  # Compare feature sizes
    .addGrid(binary_svm.regParam, [0.01, 0.1, 1.0])
    .addGrid(binary_svm.standardization, [True, False])
    .addGrid(binary_svm.maxIter, [10, 50])
    .build())

                                                                                

In [23]:
# Complete Training Function with Cross-Validation
def run_full_pipeline():
    try:
        # Split data
        train_val, test = df.randomSplit([0.8, 0.2], seed=42)
        train, val = train_val.randomSplit([0.75, 0.25], seed=42)  # 60/20/20 split
        
        print(f"Training samples: {train.count()}")
        print(f"Test samples: {test.count()}")
        
        # Define evaluator FIRST
        evaluator = MulticlassClassificationEvaluator(
            labelCol="label_index",
            predictionCol="prediction",
            metricName="f1"
        )
        
        # Setup CrossValidator
        cv = CrossValidator(
            estimator=pipeline,
            estimatorParamMaps=full_param_grid,
            evaluator=evaluator,
            numFolds=3,
            parallelism=4,
            seed=42
        )
        
        # Train model
        print("\nRunning cross-validation...")
        cv_model = cv.fit(train)
        best_model = cv_model.bestModel
        
        # Evaluate
        predictions = best_model.transform(test)
        
        print("\nBest Model Parameters:")
        print(f"regParam: {best_model.stages[-1].getClassifier().getRegParam()}")
        print(f"maxIter: {best_model.stages[-1].getClassifier().getMaxIter()}")
        print(f"standardization: {best_model.stages[-1].getClassifier().getStandardization()}")
        
        print("\nEvaluation Metrics:")
        print(f"{'F1 Score':>12}: {evaluator.evaluate(predictions):.4f}")
        print(f"{'Accuracy':>12}: {evaluator.setMetricName('accuracy').evaluate(predictions):.4f}")
        
        # Show predictions with original labels
        label_map = {i:l for i,l in enumerate(best_model.stages[0].labels)}
        predictions = predictions.withColumn(
            "predicted_category",
            udf(lambda x: label_map[x], StringType())("prediction")
        )
        
        print("\nSample Predictions:")
        predictions.select("category", "predicted_category").show(10)
        
    except Exception as e:
        print(f"\nError during training: {str(e)}")
        raise

# Execute the pipeline
run_full_pipeline()

                                                                                

Training samples: 47490


                                                                                

Test samples: 15664

Running cross-validation...


                                                                                

KeyboardInterrupt: 

In [None]:
predictions.select("label", "prediction").show(10)