In [None]:
'''
Critical Disclosure:
I was able to run all of this locally, but it did not work in the docker. 
Within this folder, there is a docker and a local version of the ipynb. 
The local version should have the results I attained while running the functions
on my machine. Thank you for understanding. Please reach out if there is a problem
because I had some trouble with my EC2 instance while working on this HW
'''

In [None]:
!pip install pyspark

In [None]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

In [None]:
!echo $JAVA_HOME
!java -version

In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [None]:
# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

In [None]:
# Create an RDD with (doc_id, filtered_words)
print("Creating RDD...")
rdd = agnews.select("_c0", "filtered").rdd

#Word-document frequency (for TF)
print("Mapping word-document pairs...")
word_doc_pairs = rdd.flatMap(lambda row: [((word, row["_c0"]), 1) for word in row["filtered"]])

print("Reducing term counts...")
term_counts = word_doc_pairs.reduceByKey(lambda x, y: x + y)

# Document lengths (used to normalize TF)
print("Calculating document lengths...")
doc_lengths = rdd.map(lambda row: (row["_c0"], len(row["filtered"])))

# Monitor partitions to ensure Spark is progressing (Check I added when things were moving slowly, not necessary on my local machine
doc_lengths.foreachPartition(lambda part: print("Processed one doc_length partition."))

doc_lengths_dict = dict(doc_lengths.collect())

# Compute Term Frequency (TF)
print("Computing TF...")
tf = term_counts.map(lambda x: (x[0], x[1] / doc_lengths_dict[x[0][1]]))

# Compute Document Frequencies for each term
print("Calculating document frequency for each term...")
unique_word_doc = rdd.flatMap(lambda row: [(word, row["_c0"]) for word in set(row["filtered"])])
doc_freq = unique_word_doc.distinct().map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y)

# Log document count (another status check)
N = agnews.count()
print(f"Total documents: {N}")

In [None]:
import math
# IDF Collection using doc_freq which is a PipelinedRDD
print("Collecting document frequencies and computing IDF in-memory...")
idf_dict = dict(doc_freq.collect())
idf_dict = {term: math.log(N / df) for term, df in idf_dict.items()}

# Join TF and IDF
print("Computing TF-IDF...")
tfidf = tf.map(lambda x: (x[0], x[1] * idf_dict[x[0][0]]))

# Group TF-IDF scores by document
print("Grouping TF-IDF scores by document ID...")
doc_tfidf = tfidf.map(lambda x: (x[0][1], [(x[0][0], x[1])])) \
                 .reduceByKey(lambda x, y: x + y)

# Print first 5 TF-IDF results
print("\nTF-IDF scores for first 5 documents:")
for doc_id, scores in doc_tfidf.take(5):
    print(f"Doc {doc_id}: {sorted(scores, key=lambda x: -x[1])[:5]}")  # top 5 terms

In [None]:
# Part 2:SVM
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

In [None]:
# This is an example to read the files. But you should consider using pyspark directly. 
# *Make sure you are not assuming a header*!!
import pandas as pd
data_svm = pd.read_csv('data_for_svm.csv', header=None)
w = pd.read_csv('w.csv', header=None)
bias = pd.read_csv('bias.csv', header=None)

In [None]:
import numpy as np

def loss_SVM(w, b, X, y, lambd=1.0):
    """
    Compute the SVM loss:
    L(w, b) = lambda * ||w||^2 + (1/n) * sum_i max(0, 1 - y_i * (w^T x_i + b))
    
    Parameters:
    - w: weight vector (64, 1)
    - b: bias (scalar)
    - X: feature matrix (n, 64)
    - y: labels (n,)
    - lambd: regularization parameter
    """
    # Convert to numpy arrays
    w = w.values.flatten()
    b = bias.values.flatten()[0]
    X = X.iloc[:, :-1].values
    y = X = data_svm.iloc[:, :-1].values
    y = data_svm.iloc[:, -1].values
    
    # Compute hinge loss terms: max(0, 1 - y_i * (w^T x_i + b))
    margins = 1 - y * (X @ w + b)
    hinge_loss = np.maximum(0, margins)

    # Final loss
    loss = lambd * np.linalg.norm(w)**2 + hinge_loss.mean()
    return loss

In [None]:
loss = loss_SVM(w, bias, data_svm, data_svm.iloc[:, -1])
print("SVM Loss:", loss)

In [None]:
def predict_SVM(w, b, X):
    """
    Predicts labels using the SVM decision rule: sign(w^T x + b)
    
    Parameters:
    - w: weight vector (64,)
    - b: scalar bias
    - X: feature matrix (n, 64)
    
    Returns:
    - y_hat: predicted labels (n,), each ∈ {−1, +1}
    """
    w = w.values.flatten()
    b = b.values.flatten()[0]
    X = X.iloc[:, :-1].values  # only features, drop label column

    scores = X.dot(w) + b
    y_hat = np.sign(scores)
    return y_hat

In [None]:
y_pred = predict_SVM(w, bias, data_svm)
print("Predictions:\n", y_pred[:10])