# 1. tf-idf definition

In [1]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  29.3M      0  0:00:01  0:00:01 --:--:-- 29.3M


In [4]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [5]:
# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



In [19]:
from pyspark.sql.functions import explode, size, count, log, col, countDistinct, struct, collect_list

In [20]:
# Explode words to get (id, word) per document
tokens = agnews.select('_c0', explode('filtered').alias('word'))

# Total number of documents
N = agnews.select('_c0').distinct().count()

# Document length
doc_length = agnews.select('_c0', size('filtered').alias('doc_len'))

# tf = count(word in doc) / total words in doc
tf = tokens.groupBy('_c0', 'word').agg(count('*').alias('term_count')) \
          .join(doc_length, on='_c0') \
          .withColumn('tf', col('term_count') / col('doc_len'))

# Number of docs with each word
df = tokens.dropDuplicates(['_c0', 'word']).groupBy('word').agg(count('*').alias('doc_freq'))

# Inverse Document Frequency (IDF)
idf = df.withColumn('idf', log(N / col('doc_freq')))

# TF-IDF
tfidf = tf.join(idf, on='word').withColumn('tfidf', col('tf') * col('idf'))

# TF -IDF by doc
tfidf = tfidf.groupBy('_c0').agg(
    collect_list(struct('word', 'tfidf')).alias('tfidf_values')
)

tfidf.orderBy('_c0').show(5, truncate=False)


+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0|tfidf_values                                                                                                                                                                                               

# 2. SVM objective function

In [2]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   6935      0 --:--:-- --:--:-- --:--:--  6955
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0    119      0 --:--:-- --:--:-- --:--:--   119
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  55.8M      0  0:00:01  0:00:01 --:--:-- 55.8M


In [21]:
# This is an example to read the files. But you should consider using pyspark directly.
# *Make sure you are not assuming a header*!!

import numpy as np
from pyspark.sql.functions import udf, struct, mean
from pyspark.sql.types import IntegerType, DoubleType

import pandas as pd
data_svm = pd.read_csv('data_for_svm.csv', header=None)
w = pd.read_csv('w.csv', header=None)
bias = pd.read_csv('bias.csv', header=None)


In [27]:
spark = SparkSession.builder.appName("SVM_Loss").getOrCreate()
df = spark.read.csv("data_for_svm.csv", header=False, inferSchema=True)
feature_cols = [f"x{i}" for i in range(64)]
all_cols = feature_cols + ["y"]
for i, name in enumerate(all_cols):
    df = df.withColumnRenamed(f"_c{i}", name)

In [28]:
df.show(5)

+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|x10|x11|x12|x13|x14|x15|x16|x17|x18|x19|x20|x21|x22|x23|x24|x25|x26|x27|x28|x29|x30|x31|x32|x33|x34|x35|x36|x37|x38|x39|x40|x41|x42|x43|x44|x45|x46|x47|x48|x49|x50|x51|x52|x53|x54|x55|x56|x57|x58|x59|x60|x61|x62|x63|  y|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| -1| -1|  1|  1| -1|  1|  1|  1|  1| -1|  1|  1| -1| -1| -1|  1| -1|  1| -1|  1| -1|  1|  1| -1|  1|  1|  1| -1|  1| -1| -1| -1|  1|  1|  1| -1|  1| -1| -1| -1| -1|  1| -1|  1| -1| -1| -1| -1| -1| -1| -1| -1| -1| 

In [32]:
w.head(5)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,54,55,56,57,58,59,60,61,62,63
0,-0.007841,-0.004139,-0.005414,0.0033,0.002638,0.00939,-0.011182,-0.004379,-0.008683,-0.007685,...,-0.002512,0.011746,-0.006911,0.00074,0.011397,0.005465,0.005641,-0.002381,0.004438,-0.004438


In [33]:
bias

Unnamed: 0,0
0,0.00015


In [66]:
def loss_SVM(w, b, X, y):
    lambd = 0.1
    # Helper function for the max part of the obj function by row
    def loss_by_row(*cols):
        x = np.array(cols[:-1])  # x0 to x63
        y_val = cols[-1]         # Final value as y
        margin = y_val * (np.dot(w, x) + b)
        return float(max(0, 1 - margin))
    # user defined function for pyspark
    loss_udf = udf(loss_by_row, DoubleType())

    features = [f"x{i}" for i in range(64)]
    inputs = [X[c] for c in features] + [y]

    df = X.withColumn("loss", loss_udf(*inputs))
    avg_loss = df.agg(mean("loss")).first()[0]
    # Add final term
    return avg_loss + lambd * np.dot(w, w)


In [67]:
def predict(w, b, X):
    # Prediction by row
    def predict_row(*cols):
        x = np.array(cols)
        score = np.dot(w, x) + b
        return int(1 if score >= 0 else -1)
    # Declare pyspark UDF
    predict_udf = udf(predict_row, IntegerType())

    feature_cols = [f"x{i}" for i in range(64)]
    return X.withColumn("prediction", predict_udf(*[X[c] for c in feature_cols]))


In [69]:
# Decided to pass the entire df as X and use y as the column identifier for the y values to make it easier
loss = loss_SVM(w.values.flatten(), float(bias.values[0][0]), df, "y")
print("Objective Value:", loss)

predicted_df = predict(w.values.flatten(), float(bias.values[0][0]), df)
predicted_df.select("y", "prediction").show()

Objective Value: 1.0000454245191737
+---+----------+
|  y|prediction|
+---+----------+
| -1|        -1|
|  1|        -1|
|  1|        -1|
|  1|         1|
| -1|        -1|
| -1|         1|
| -1|        -1|
|  1|        -1|
| -1|         1|
|  1|        -1|
|  1|         1|
|  1|        -1|
| -1|        -1|
|  1|        -1|
| -1|         1|
|  1|         1|
|  1|         1|
|  1|        -1|
|  1|         1|
|  1|        -1|
+---+----------+
only showing top 20 rows

