# DE300 HW3 - Nicky Williams

## 1. tf-idf definition

In [7]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  42.6M      0 --:--:-- --:--:-- --:--:-- 42.5M


In [8]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
import math



# Adding doc id to each row (got error before doing this)
agnews = agnews.withColumn("doc_id",F.monotonically_increasing_id())

# Creating pairs of (doc_id, term)
doc_term_rdd = agnews.select("doc_id","filtered").rdd.flatMap(
    lambda row: [(row["doc_id"], term) for term in row["filtered"]]
)


# Counting the num of times each term appears in each doc
tf_counts = doc_term_rdd.map(lambda x:((x[0], x[1]), 1)) \
                        .reduceByKey(lambda a, b: a + b)


# Counting total numb of terms in each document
doc_lengths = agnews.select("doc_id", F.size("filtered").alias("doc_len")) \
                    .rdd.map(tuple) \
                    .collectAsMap()



# Computing normalized term frequency where tf = (# occurrences of term in doc) / (total terms in doc)
tf_rdd = tf_counts.map(lambda x:(x[0], x[1] / doc_lengths[x[0][0]]))


# Retrieving all unique pairs to find how many docs contain each term
term_doc_rdd = doc_term_rdd.distinct().map(lambda x: (x[1], x[0]))



# Counting num of distinct docs each term appears in
doc_freq = term_doc_rdd.map(lambda x: (x[0], 1)) \
                       .reduceByKey(lambda a,b: a + b)


# Total num of docs in the data
num_docs = agnews.count()


# Computing IDF for each term where idf = log(num_docs / doc_frequency)
idf_rdd = doc_freq.map(lambda x:(x[0],math.log(num_docs / x[1])))


# Converting to dictionary to make rest of code easier to implement
idf_dict = dict(idf_rdd.collect())


# Computing TF-IDF by multiplying tf and idf for each (doc_id, term) pair
tfidf_rdd = tf_rdd.map(lambda x:(x[0], x[1] * idf_dict.get(x[0][1], 0)))


In [4]:
# TASK 2

# Grouping the TF-IDF scores by doc_id so that each row (doc) gets a list of pairs (term, tf-idf)
tfidf_grouped = tfidf_rdd.map(lambda x: (x[0][0],(x[0][1], x[1]))) \
                         .groupByKey() \
                         .mapValues(list)


# Converting the grouped rdd into a df with schema (doc_id, tfidf) --> each row will have a doc_id and its corresponding tuples (term, tf-idf)
tfidf_df = tfidf_grouped.toDF(["doc_id", "tfidf"])


# Adding a new col called "tfidf" to agnews_with_tfidf to join scored back with the og df
agnews_with_tfidf = agnews.join(tfidf_df, on="doc_id", how="left")


In [5]:
# TASK 3

# Print out the tf-idf measure for the first 5 documents
agnews_with_tfidf.select("doc_id", "tfidf").orderBy("doc_id").show(5, truncate=False)


+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|doc_id|tfidf                                                                                                                                                                                                

## 2. SVM objective function

In [2]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   4504      0 --:--:-- --:--:-- --:--:--  4516
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0     54      0 --:--:-- --:--:-- --:--:--    55
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  25.8M      0  0:00:02  0:00:02 --:--:-- 25.8M


In [3]:
# TASK 1/2/3

from pyspark.sql import SparkSession
import numpy as np
import pandas as pd

# Building spark session
spark = SparkSession.builder.appName("SVM Loss").getOrCreate()


# Loading data into X and y rdds
data_df = spark.read.csv("data_for_svm.csv", inferSchema=True, header=False)
X_rdd = data_df.rdd.map(lambda row: np.array(row[:-1]))  # features (64-dim)
y_rdd = data_df.rdd.map(lambda row: row[-1]) # labels (either +1 or -1)

# Loading w and b
w = np.array(pd.read_csv("w.csv", header=None)).flatten()
b = float(pd.read_csv("bias.csv", header=None).values[0][0])



# TASK 2 - Defining  SVM loss function using MapReduce functions
def loss_SVM(w, b, X_rdd, y_rdd, lam=0.01):

    data_rdd = X_rdd.zip(y_rdd)  #combine to (xi, yi)
    n = data_rdd.count()
    if n == 0:
        return None


    # TASK 1 - Designing MapReduce functions
    # Computing hinge losses
    hinge_losses = data_rdd.map(lambda row: max(0, 1 - row[1] * (np.dot(w, row[0]) +b)))
    # Summing hinge losses
    total_hinge_loss = hinge_losses.reduce(lambda a,b: a + b)



    # Computing regularization term
    reg_term = lam * np.linalg.norm(w) ** 2

    # Returning SVM objective value
    return reg_term + (total_hinge_loss/ n)


# TASK 3 - predicting for all of the data
# Running and printing the loss
svm_loss_value = loss_SVM(w, b, X_rdd, y_rdd)
print(f"Loss from loss_SVM(): {svm_loss_value:.6f}")


Loss from loss_SVM(): 0.999775


In [None]:
# TASK 4

# Loading data
data_df = spark.read.csv("data_for_svm.csv", inferSchema=True, header=False)
X_rdd = data_df.rdd.map(lambda row: np.array(row[:-1])) #features

# Loading weights and bias
w = np.array(pd.read_csv("w.csv", header=None)).flatten()
b = float(pd.read_csv("bias.csv", header=None).values[0][0])

# Defining MapReduce func
def predict_svm(w, b, X_rdd):

    return X_rdd.map(lambda x: 1 if np.dot(w, x) + b >= 0 else -1)

# Predictions
predictions_rdd = predict_svm(w, b, X_rdd)

# Printing first 10 predictions
print("First 10 predictions:")
print(predictions_rdd.take(10))


First 10 predictions:
[-1, -1, -1, 1, -1, 1, -1, -1, 1, -1]


**GenAI statement:** I worked on HW in large part with TAs who would help me use ChatGPT when I came across errors. I had issues defining the SVM loss function in part 2 and chat told me to add:  

"n = data_rdd.count()
    if n == 0:
        print("Dataset is empty")
        return None"


I also had issues loading w and b because I didnt add .flatten() for w and didn't add the float or .values[0][0] for b:

w = np.array(pd.read_csv("w.csv", header=None)).flatten()

b = float(pd.read_csv("bias.csv", header=None).values[0][0])


Other help was from TAs and their GenAi sources
