In [22]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  32.4M      0  0:00:01  0:00:01 --:--:-- 32.4M


In [23]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [24]:
# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



In [25]:
agnews.show(5, truncate=False)

+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0|filtered                                                                                                                                                                                                                                          |
+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0  |[wall, st, bears, claw, back, black, reuters, reuters, short, sellers, wall, street, dwindling, band, ultra, cynics, seeing, green]                                                                                                               |
|1  

# **PART 1: TF-IDF**

**For TF**

In [37]:
#number of terms t in d
#map
def map_word_in_docs(doc_id, words):
    """ maps word to doc and associates 1 to count later"""
    return [((word, doc_id), 1) for word in words]


In [38]:
def addab(a,b):
  """ function to sum two counts"""
  return a + b

In [39]:
# for each d counts of words
#map
def map_doc_lengths(word_in_docs):
    """
    takes word t and number of times it appears in doc d and maps to doc d
    (returns doc_id and count for each word)
    """
    word, doc_id = word_in_docs[0]
    count = word_in_docs[1]
    return (doc_id, count)

#reduce
def reduce_doc_lengths(doc_id, counts):
    """
    takes mapped doc_id and counts and returns sum of counts for each doc_id for total word counts in each doc d
    """
    return (doc_id, sum(counts))

In [40]:
#tf calculation
def compute_tf(word_doc_count, doc_length_dict):
    """
    for each word and doc_id, returns tf value (count/total_words in doc).
    First splits word_doc_count into tuple and count for each word and doc_id
    """
    (word, doc_id), count = word_doc_count
    total_words = doc_length_dict[doc_id]
    return ((word, doc_id), count / total_words)

**For IDF**

In [41]:
#number of docs containing word t

def map_docs_in_word(doc_id, words):
    """
    maps each word t in d to doc d (as a list, so for each t list is of docs that have that word)
    """
    return list(set((word, doc_id) for word in words))


In [42]:
#IDF calculation
def compute_idf(word, df, num_docs):
    """
    for each word, returns idf value (log(# of d in D / #docs containing t))
    """
    import math
    return (word, math.log(num_docs / df))

**For TD-IDF Calculation**

In [43]:
#TD-IDF Calculation
def compute_tfidf(word_doc_id, tf, idf_dict):
    """
    for each word and doc_id, returns tf-idf value (tf * idf)
    """
    word, doc_id = word_doc_id
    return ((word, doc_id), tf * idf_dict[word])


**AGNews TD-IDF Measure**

In [44]:
## tf ##
agnews_rdd = agnews.select("_c0", "filtered").rdd

# dataset flattened (mapped) into word and doc pairs
word_doc_pairs = agnews_rdd.flatMap(lambda row: map_word_in_docs(row[0], row[1]))
#sum counts
word_counts = word_doc_pairs.reduceByKey(addab)
#map each doc to number of word entries in doc
doc_word_counts = word_counts.map(map_doc_lengths)
# sum for total words in doc
doc_lengths = doc_word_counts.reduceByKey(addab)
#make dictionary associating doc to word count
doc_length_dict = dict(doc_lengths.collect())
#computes tf for each doc and word pair
tf_rdd = word_counts.map(lambda x: compute_tf(x, doc_length_dict))

##idf##
#maps dataset into word and doc pairs
docs_with_word = agnews_rdd.flatMap(lambda row: map_docs_in_word(row[0], row[1]))
#adds frequency of docs that contain each word
word_doc_freq = docs_with_word.map(lambda x: (x[0], 1)).reduceByKey(addab)
#counts total number of docs in dataset
num_docs = agnews.count()
#calculates idf term
idf_rdd = word_doc_freq.map(lambda x: compute_idf(x[0], x[1], num_docs))
#puts idf value in dictionary with each word and associated idf number
idf_dict = dict(idf_rdd.collect())

##tfidf calculation##
tfidf_rdd = tf_rdd.map(lambda x: compute_tfidf(x[0], x[1], idf_dict))


#to DF
from pyspark.sql import Row
#converts return of rdd into dataframe
tfidf_rows = tfidf_rdd.map(lambda x: Row(d=x[0][1], tfidf_vector=[(x[0][0], x[1])]))
#groups by each doc ID
tfidf_grouped = tfidf_rows.reduceByKey(addab)

tfidf_df = tfidf_grouped.map(lambda x: Row(d=x[0], tfidf_vector=x[1])).toDF()
result_df = agnews.withColumnRenamed("_c0", "d").join(tfidf_df, on="d", how="left")

#returns first 5 doc id's and each word tf-idf pair per doc
result_df.select("d", "tfidf_vector").orderBy("d").show(5, truncate=False)


+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|d  |tfidf_vector                                                                                                                                                                                               

# **Part 2: SVM Objective Function**

In [45]:
def map_summation_term(row, w, b):
    """
    summation component of the hinge loss function (before it is summed, terms are mapped together)
    """
    from numpy import dot
    x_i, y_i = row
    margin = y_i * (dot(w, x_i) + b)
    return max(0.0, 1.0 - margin)

#function addab still works for summing over all i = 1 to n, so no additional summation function needed

In [46]:
def loss_SVM(w, b, X, y):
    """
    SVM objective function
    """
    lambd = 1
    #combines X and Y into pairs in rdd
    Xy_rdd = X.zip(y)

    n = Xy_rdd.count()
    #this isn't the summation itself, it is the mapping of each term in that component before it is summed
    summation = Xy_rdd.map(lambda row: map_summation_term(row, w, b))
    #here is where each term is added together
    total_summation = summation.reduce(addab)
    #lambda and norm of w term combined
    from numpy.linalg import norm
    reg_term = lambd * (norm(w) ** 2)

    return reg_term + (1 / n) * total_summation


In [47]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   9567      0 --:--:-- --:--:-- --:--:--  9593
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0    149      0 --:--:-- --:--:-- --:--:--   148
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  81.2M      0 --:--:-- --:--:-- --:--:-- 81.3M


In [48]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SVM").getOrCreate()

df = spark.read.csv("data_for_svm.csv", header=False, inferSchema=True)

In [49]:
import pandas as pd
import numpy as np

X = df.rdd.map(lambda row: [float(row[i]) for i in range(64)])
y = df.rdd.map(lambda row: int(row[64]))
w = np.array(pd.read_csv("w.csv", header=None)).flatten()
b = float(pd.read_csv("bias.csv", header=None).iloc[0, 0])

loss = loss_SVM(w, b, X, y)
print("SVM Objective Loss:", loss)

SVM Objective Loss: 1.0029686410368055


In [50]:
#SVM Classifier

def map_classifier(row, w, b):
  """ makes estimate of y by taking dot product of w and xi and returns 1 if it is positive and -1 if negative"""
  from numpy import dot
  z = (dot(w,row) + b)
  if z >= 0:
    return 1
  else:
    return -1


In [51]:
predictions_rdd = X.map(lambda x: map_classifier(x, w, b))
predictions_rdd.collect()

[-1,
 -1,
 -1,
 1,
 -1,
 1,
 -1,
 -1,
 1,
 -1,
 1,
 -1,
 -1,
 -1,
 1,
 1,
 1,
 -1,
 1,
 -1,
 1,
 1,
 1,
 -1,
 -1,
 -1,
 1,
 1,
 1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 -1,
 1,
 -1,
 1,
 -1,
 1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 -1,
 1,
 1,
 -1,
 1,
 -1,
 -1,
 1,
 1,
 -1,
 1,
 1,
 -1,
 1,
 -1,
 1,
 1,
 1,
 -1,
 1,
 1,
 1,
 -1,
 -1,
 -1,
 1,
 -1,
 -1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 -1,
 1,
 -1,
 -1,
 1,
 1,
 1,
 -1,
 -1,
 -1,
 1,
 1,
 -1,
 1,
 -1,
 1,
 -1,
 1,
 -1,
 -1,
 -1,
 1,
 -1,
 -1,
 1,
 1,
 -1,
 1,
 1,
 -1,
 -1,
 -1,
 -1,
 -1,
 1,
 1,
 1,
 -1,
 -1,
 -1,
 -1,
 1,
 -1,
 -1,
 1,
 1,
 1,
 -1,
 -1,
 1,
 1,
 -1,
 1,
 1,
 -1,
 -1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 1,
 1,
 1,
 1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 1,
 -1,
 -1,
 1,
 -1,
 1,
 1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 1,
 1,
 -1,
 1,
 1,
 1,
 -1,
 1,
 -1,
 1,
 -1,
 1,
 1,
 -1,
 -1,
 1,
 -1,
 1,
 1,
 -1,
 1,
 1,
 1,
 1,
 -1,
 -1,
 -1,
 -1,
 1,
 -1,
 1,
 -1,
 -1,
 1,
 -1,
 1,
 1,
 -1,
 1,
 1,
 1,
 1,
 -1,
 -1,
 1,
 -1

In [52]:
pred_true_rdd = X.zip(y).map(lambda row: (map_classifier(row[0], w, b), row[1]))

# Count correct number of predictions
correct = pred_true_rdd.filter(lambda pair: pair[0] == pair[1]).count()
total = pred_true_rdd.count()

#accuracy is correct number of predictions divided by the number of predictions made (entries in dataset)
accuracy = correct / total
print("Accuracy:", accuracy)

Accuracy: 0.5018


# Generative AI Disclosure

In constructing rdds for the tf-idf function, ChatGPT was used to understand the .flatMap in implementing map_docs_in_word because it returns many outputs for one input which was needed as one word appears in many docs (and one doc contains many words), so it allows for this input. I asked if there was a way to associate multiple entries to the same key term. Additionally, I asked about a way for an rdd to count all the entries with the same term to group and sum them, to which it provided .reduceByKey() to count frequencies, used again for the td-idf function. The python equivalent would be .groupBy().sum().