# Homework 3 - Ally Hayden

# 1. TF-IDF Definition

### Initialize the dataset

In [3]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  22.9M      0  0:00:01  0:00:01 --:--:-- 22.9M


In [4]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [5]:
# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



### Explode the words (make each word a separate row)

In [6]:
from pyspark.sql.functions import explode, col

tokens = agnews.select(col("_c0").alias("id"), explode(col("filtered")).alias("term"))
tokens.show(5)

+---+-----+
| id| term|
+---+-----+
|  0| wall|
|  0|   st|
|  0|bears|
|  0| claw|
|  0| back|
+---+-----+
only showing top 5 rows



### Count Term Frequencies per Document (Numerator)

In [7]:
from pyspark.sql.functions import count

tf_counts = tokens.groupBy("id", "term").count().withColumnRenamed("count", "tf_raw")
tf_counts.show(5)


+---+----------+------+
| id|      term|tf_raw|
+---+----------+------+
| 10|    stocks|     1|
| 21|    nation|     1|
| 36|      news|     2|
| 44|     salem|     1|
| 48|government|     1|
+---+----------+------+
only showing top 5 rows



### Count Total Terms per Document (Denominator)

In [8]:
doc_lengths = tokens.groupBy("id").count().withColumnRenamed("count", "total_terms")
doc_lengths.show(5)

+---+-----------+
| id|total_terms|
+---+-----------+
|148|         32|
|463|         24|
|471|         24|
|496|         31|
|833|         13|
+---+-----------+
only showing top 5 rows



### Compute Normalized TF

In [9]:
from pyspark.sql.functions import col

tf = tf_counts.join(doc_lengths, on="id")
tf = tf.withColumn("tf", col("tf_raw") / col("total_terms"))
tf.select("id", "term", "tf").show(5)


+----+---------+--------------------+
|  id|     term|                  tf|
+----+---------+--------------------+
| 833|      hit| 0.07692307692307693|
|1088|following| 0.05555555555555555|
|1959|      ups| 0.06896551724137931|
|1959|    block|0.034482758620689655|
|6397|   friday|0.037037037037037035|
+----+---------+--------------------+
only showing top 5 rows



### Compute Inverse Document Frequency (IDF)

In [10]:
# count document frequency
from pyspark.sql.functions import countDistinct

df_counts = tokens.select("id", "term").distinct().groupBy("term").agg(countDistinct("id").alias("df"))
df_counts.show(5)


+-------------+----+
|         term|  df|
+-------------+----+
|        still|2281|
|      acidity|   2|
|       online|2444|
|precautionary|   6|
|       harder|  82|
+-------------+----+
only showing top 5 rows



In [11]:
# get total number of documents
num_docs = agnews.select("_c0").distinct().count()
print("Total documents:", num_docs)


Total documents: 127600


In [12]:
# idf = log(N / df)
from pyspark.sql.functions import log, lit

idf = df_counts.withColumn("idf", log(lit(num_docs) / col("df")))
idf.show(5)

+-------------+----+------------------+
|         term|  df|               idf|
+-------------+----+------------------+
|        still|2281|4.0242864276084385|
|      acidity|   2| 11.06350846933288|
|       online|2444|3.9552643296013406|
|precautionary|   6| 9.964896180664772|
|       harder|  82| 7.349936402628574|
+-------------+----+------------------+
only showing top 5 rows



### Compute TF-IDF by joining TF and IDF

In [13]:
tfidf = tf.join(idf, on="term")
tfidf = tfidf.withColumn("tfidf", col("tf") * col("idf"))

# filter for first 5 documents
tfidf.filter(col("id") < 5).select("id", "term", "tfidf").show(truncate=False)


+---+-----------+-------------------+
|id |term       |tfidf              |
+---+-----------+-------------------+
|4  |posing     |0.2589223867776184 |
|3  |pipeline   |0.4720829409342409 |
|0  |wall       |0.5115985326511431 |
|2  |doldrums   |0.3770252270329423 |
|2  |stocks     |0.14976769101715193|
|4  |us         |0.1669859687392097 |
|4  |present    |0.22209684830286883|
|2  |stock      |0.17879168082328206|
|3  |exports    |0.2146590164054526 |
|1  |industry   |0.15043731768548949|
|3  |iraq       |0.23809526243476142|
|1  |aerospace  |0.2581171817448437 |
|2  |prices     |0.14472559202114177|
|4  |prices     |0.23156094723382684|
|1  |toward     |0.1898997183872362 |
|0  |cynics     |0.563734318747707  |
|3  |authorities|0.18159366801541998|
|4  |menace     |0.5747440955975784 |
|1  |carlyle    |0.7168306746824437 |
|4  |records    |0.19759033440942064|
+---+-----------+-------------------+
only showing top 20 rows



### (Optional) Group TF-IDF values back per document

In [14]:
from pyspark.sql.functions import struct, collect_list

# Group TF-IDF scores into a list per document
grouped = tfidf.select("id", "term", "tfidf") \
               .withColumn("term_tfidf", struct("term", "tfidf")) \
               .groupBy("id").agg(collect_list("term_tfidf").alias("tfidf_scores"))

grouped.show(5, truncate=100)


+---+----------------------------------------------------------------------------------------------------+
| id|                                                                                        tfidf_scores|
+---+----------------------------------------------------------------------------------------------------+
|  1|[{investment, 0.1890771769001148}, {commercial, 0.2057832028092643}, {reputation, 0.2578098186776...|
|  3|[{exports, 0.2146590164054526}, {infrastructure, 0.22959926718225876}, {reuters, 0.15913296762843...|
|  5|[{positive, 0.18127557126337487}, {46, 0.2067185029184427}, {o, 0.1405921241478995}, {end, 0.1131...|
|  6|[{98, 0.24380014644675033}, {billion, 0.12463394966614495}, {money, 0.32032556569959436}, {thursd...|
|  9|[{wall, 0.48467229409055657}, {green, 0.27256812064061997}, {ultra, 0.3908380162950787}, {new, 0....|
+---+----------------------------------------------------------------------------------------------------+
only showing top 5 rows



# 2. SVM Objective Function

### Initializing the dataset (no header)

In [15]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0  12232      0 --:--:-- --:--:-- --:--:-- 12309
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0    243      0 --:--:-- --:--:-- --:--:--   244
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  25.5M      0  0:00:02  0:00:02 --:--:-- 25.5M


In [16]:
from pyspark.sql.types import StructType, StructField, DoubleType

svm_schema = StructType(
    [StructField(f"f{i}", DoubleType(), True) for i in range(64)] +
    [StructField("label", DoubleType(), True)]
)

svm_df = spark.read.csv("data_for_svm.csv", schema=svm_schema, header=False)

svm_df.show(5)


+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+
|  f0|  f1|  f2|  f3|  f4|  f5|  f6|  f7|  f8|  f9| f10| f11| f12| f13| f14|f15| f16| f17| f18| f19| f20| f21| f22| f23|f24| f25| f26| f27| f28| f29| f30| f31| f32| f33| f34| f35| f36| f37| f38| f39| f40| f41| f42| f43| f44| f45| f46| f47| f48| f49| f50| f51| f52| f53| f54| f55| f56| f57| f58| f59| f60| f61| f62| f63|label|
+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+
|-1.0|-1.0| 1.0| 1.0|-

### Load Weights and Bias

In [17]:
import pandas as pd

w = pd.read_csv("w.csv", header=None).values.flatten()
bias = pd.read_csv("bias.csv", header=None).values[0][0]

print("w shape:", w.shape)   # should be (64,)
print("bias:", bias)


w shape: (64,)
bias: 0.0001495661647902


### Compute Hinge Loss per Row

In [18]:
# defining/applying UDF
import numpy as np
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType

w_broadcast = spark.sparkContext.broadcast(w)
bias_broadcast = spark.sparkContext.broadcast(bias)

# hing loss function
def compute_hinge_loss(*row):
    x = np.array(row[:-1])   # all 64 features
    y = row[-1]              # the label
    margin = y * (np.dot(w_broadcast.value, x) + bias_broadcast.value)
    return float(max(0.0, 1 - margin))

hinge_udf = udf(compute_hinge_loss, DoubleType())


In [19]:
# applying UDF to the dataframe
columns = [f"f{i}" for i in range(64)] + ["label"]

svm_df = svm_df.withColumn("hinge_loss", hinge_udf(*[col(c) for c in columns]))

svm_df.select("hinge_loss").show(5)


+------------------+
|        hinge_loss|
+------------------+
|0.9493088124624688|
|1.0629362913461953|
|1.0707972344482695|
|0.9978622601633201|
|0.9916031666951637|
+------------------+
only showing top 5 rows



### Compute SVM Objective Function

In [20]:
# compute avg hinge loss
from pyspark.sql.functions import avg

avg_hinge_loss = svm_df.select(avg("hinge_loss")).first()[0]
print("Average hinge loss:", avg_hinge_loss)


Average hinge loss: 0.9997237624117761


In [21]:
# regularization
λ = 0.1

l2_norm_squared = float(np.dot(w, w))
reg_term = λ * l2_norm_squared
print("Regularization term:", reg_term)


Regularization term: 0.00032166210739758135


In [22]:
# final loss result
svm_loss = reg_term + avg_hinge_loss
print("Final SVM Loss:", svm_loss)


Final SVM Loss: 1.0000454245191737


# AI Disclosure

Questions asked to ChatGPT:

- TF-IDF Section
 - Why am I getting an 'unresolved column' error when I reference 'id' in my DataFrame?
 - How to use the .filter() function in PySpark.

- SVM Loss Section
 - Why is my UDF for hinge loss giving me a type error?
 - How should I choose the lambda value for SVM regularization?