**1. tf-idf definition**

In [None]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0   134M      0 --:--:-- --:--:-- --:--:--  134M


In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.option("header", True).option("inferSchema", True).csv("agnews_clean.csv")

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [None]:
# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



In [None]:
agnews.printSchema()
agnews.show(3, truncate=False)

root
 |-- _c0: integer (nullable = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0|filtered                                                                                                                                                                                                                                          |
+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0  |[wall, st, bears, claw, back, black, reuters, reuters, short, sellers, wall, street, dwindling, band, ultra, cynics, se

In [None]:
# flatten csv to (doc id, word)
def flatten_doc(row):
    doc_id, words = row
    return [(doc_id, word) for word in words]

rdd = agnews.rdd.map(lambda row: (row['_c0'], row['filtered']))
rdd_flattened = rdd.flatMap(flatten_doc)

rdd_flattened.take(5)

[(0, 'wall'), (0, 'st'), (0, 'bears'), (0, 'claw'), (0, 'back')]

**Calculate Term Frequency (tf)**

In [None]:
tf = (
    rdd_flattened
    # 0 is document id, 1 is word
    .map(lambda x: ((x[0], x[1]), 1))
    # count occurrences per (document id, word) pair
    .reduceByKey(lambda a, b: a + b)
)


In [None]:
doc_lengths = (rdd.mapValues(lambda words: len(words))) # gives (document id, total number of words in that doc) for normalizing tf

In [None]:
tf_normalized = (
    tf.map(lambda x: (x[0][0], (x[0][1], x[1])))# gives (doc_id, (word, tf))
    .join(doc_lengths) # gives (doc_id, ((word, tf), doc_lengths))
    .map(lambda x: ((x[0], x[1][0][0]), x[1][0][1] / x[1][1])) # gives ((doc_id, word), normalized_tf)
)

**Calculate Inverse document frequency (idf)**

In [None]:
df = (
    rdd_flattened
    .distinct()
    .map(lambda x: (x[1], 1))
    .reduceByKey(lambda a, b: a + b)
)

In [None]:
import numpy as np
num = rdd.count() # total num of docs
idf = df.map(lambda x: (x[0], np.log(num/x[1]))) # gives (word, idf)

**Calculate tf-idf**

In [None]:
tfidf = (
    tf_normalized.map(lambda x: (x[0][1], ((x[0][0], x[1])))) # gives (word, (doc_id, tf))
    .join(idf) # gives (word, ((doc_id, tf), idf))
    .map(lambda x: ((x[1][0][0], x[0]), x[1][0][1] * x[1][1])) # performs multiplication to get tf-idf ((doc_id, word), tf-idf)
)


**Saving tf-idf measure in a new column**

In [None]:
# convert tifdf rdd to (doc_id, list of (word, tfidf))
tfidf_grouped = (
    tfidf
    .map(lambda x: (x[0][0], (x[0][1], x[1])))
    .groupByKey()
    .mapValues(list)
)

In [None]:
# create a df from tfidf_grouped
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType

schema = StructType([
    StructField("_c0", IntegerType(), False),
    StructField("tfidf", ArrayType(StructType([
        StructField("word", StringType(), False),
        StructField("tfidf", DoubleType(), False)
    ])))
])

# convert tuples to dict format
tfidf_df_ready = tfidf_grouped.map(lambda x: (x[0], [{"word": w, "tfidf": float(t)} for w, t in x[1]]))

# create df
tfidf_df = spark.createDataFrame(tfidf_df_ready, schema)


In [None]:
# join on _c0
agnews_with_tfidf = agnews.join(tfidf_df, on="_c0", how='left')


**Display tf-idf for the first 5 documents**

In [None]:
agnews_with_tfidf.orderBy("_c0").show(5, truncate=False)

+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
spark.stop()

**2. SVM objective function**

In [None]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   8763      0 --:--:-- --:--:-- --:--:--  8803
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0    148      0 --:--:-- --:--:-- --:--:--   149
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  78.3M      0 --:--:-- --:--:-- --:--:-- 78.2M


In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .master("local[*]")
         .appName("SVM Loss")
         .getOrCreate()
        )

data_df = spark.read.csv("data_for_svm.csv", inferSchema=True, header=False)
X_rdd = data_df.rdd.map(lambda row: np.array(row[:-1]))
y_rdd = data_df.rdd.map(lambda row: row[-1])

import pandas as pd
import numpy as np
w = pd.read_csv('w.csv', header=None).values.flatten() # 1d numpy array
b = pd.read_csv('bias.csv', header=None).values.flatten()[0] # scalar

In [None]:
def loss_SVM(w, b, X, y, lmb=1.0):
    """
    w: numpy array of weights
    b: bias
    X: RDD of feature vectors
    y: RDD of labels
    lmb: regularization parameter = 1.0
    """
    # create an RDD of (x_i, y_i) tuples
    data_rdd = X.zip(y)

    # map to compute hinge loss
    hinge_losses = data_rdd.map(lambda xy: max(0, 1 - xy[1] * (np.dot(w, xy[0]) + b)))

    # reduce to sum hinge losses
    hinge_loss_sum = hinge_losses.reduce(lambda a, b: a + b)

    # L2 regularization term
    l2 = lmb * np.dot(w, w)

    # calculate final loss
    loss = l2 + hinge_loss_sum/data_rdd.count()
    return loss


In [None]:
# SVM Loss
objective_val = loss_SVM(w, b, X_rdd, y_rdd)
print("The SVM objective value is ",objective_val)

The SVM objective value is  1.0029595550626365


**Prediction**

In [None]:
def predict_SVM(w, b, X_rdd):
    """
    w: numpy array of weights
    b: bias
    X_rdd: RDD of feature vectors

    Returns:
        RDD of predicted labels
    """

    # compute prediction = sign(w^Tx + b)
    predictions = X_rdd.map(lambda x: 1 if np.dot(w, x) + b >= 0 else -1)

    return predictions


In [None]:
predictions_rdd = predict_SVM(w, b, X_rdd)
predictions = predictions_rdd.collect()
print("First 10 predictions are: ", predictions[:10])

First 10 predictions are:  [-1, -1, -1, 1, -1, 1, -1, -1, 1, -1]


In [None]:
spark.stop()