#### Part 1 ####

In [1]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  23.3M      0  0:00:01  0:00:01 --:--:-- 23.4M


In [5]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [6]:
# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



In [8]:
agnews = agnews.withColumnRenamed("_c0", "id")

In [9]:
exploded = agnews.withColumn("word", F.explode("filtered"))
word_doc_count = exploded.groupBy("id", "word").agg(F.count("*").alias("tf"))
doc_total_words = exploded.groupBy("id").agg(F.count("*").alias("doc_len"))
tf_df = word_doc_count.join(doc_total_words, on="id").withColumn("tf", F.col("tf") / F.col("doc_len"))

In [10]:
total_docs = agnews.select("id").distinct().count()
docs_with_term = word_doc_count.select("word", "id").distinct().groupBy("word").agg(F.count("id").alias("doc_freq"))
idf_df = docs_with_term.withColumn("idf", F.log(F.lit(total_docs) / F.col("doc_freq")))

In [11]:
tfidf_df = tf_df.join(idf_df, on="word").withColumn("tfidf", F.col("tf") * F.col("idf"))

In [12]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("id").orderBy(F.col("tfidf").desc())
ranked = tfidf_df.withColumn("rank", F.row_number().over(windowSpec))
top5 = ranked.filter(F.col("id") < 5).orderBy("id", "rank")
top5.select("id", "word", "tfidf").show(50, truncate=False)

+---+-------------+-------------------+
|id |word         |tfidf              |
+---+-------------+-------------------+
|0  |cynics       |0.563734318747707  |
|0  |wall         |0.5115985326511431 |
|0  |claw         |0.499114829314058  |
|0  |dwindling    |0.4572386180709258 |
|0  |sellers      |0.4468379768438066 |
|0  |ultra        |0.4125512394225831 |
|0  |seeing       |0.37743394553516213|
|0  |band         |0.3643421454792778 |
|0  |bears        |0.3372044607529448 |
|0  |black        |0.2953171727366614 |
|0  |green        |0.2877107940095433 |
|0  |short        |0.2773120373951269 |
|0  |st           |0.2584728642725166 |
|0  |reuters      |0.24754017186645658|
|0  |street       |0.24678348986493034|
|0  |back         |0.1892216338539946 |
|1  |carlyle      |0.7168306746824437 |
|1  |occasionally |0.33274321954270536|
|1  |timed        |0.324478643568105  |
|1  |bets         |0.27861293130724324|
|1  |aerospace    |0.2581171817448437 |
|1  |reputation   |0.2578098186776328 |


#### Part 2 ####

In [13]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   6002      0 --:--:-- --:--:-- --:--:--  6021
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0     99      0 --:--:-- --:--:-- --:--:--    99
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  36.8M      0  0:00:01  0:00:01 --:--:-- 36.8M


In [14]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.appName("SVMData").getOrCreate()

# Read CSVs with no header
data_svm = spark.read.option("header", "false").csv("data_for_svm.csv")
w = spark.read.option("header", "false").csv("w.csv")
bias = spark.read.option("header", "false").csv("bias.csv")


In [20]:
def svm_loss_map(row):
    x = np.array([float(row[i]) for i in range(64)], dtype=np.float64)
    y = float(row[64])
    dot = float(np.dot(w_broadcast.value, x) + b_broadcast.value)
    hinge = max(0.0, 1.0 - y * dot)
    return hinge


In [21]:
import numpy as np

# Convert w and bias to numpy
w_vec = np.array(w.collect(), dtype=np.float64).flatten()
b_val = float(bias.collect()[0][0])

# Broadcast
w_broadcast = spark.sparkContext.broadcast(w_vec)
b_broadcast = spark.sparkContext.broadcast(b_val)


In [22]:
hinge_losses = data_svm.rdd.map(svm_loss_map)
total_hinge_loss = hinge_losses.reduce(lambda a, b: a + b)
mean_hinge_loss = total_hinge_loss / data_svm.count()


In [23]:
lambda_val = 1  # as instructed
reg_term = lambda_val * np.dot(w_vec, w_vec)
objective = mean_hinge_loss + reg_term

In [24]:
def loss_SVM(w_df, b_df, data_df, lamb):
    w_vec = np.array(w_df.collect(), dtype=np.float64).flatten()
    b_val = float(b_df.collect()[0][0])

    w_broadcast = spark.sparkContext.broadcast(w_vec)
    b_broadcast = spark.sparkContext.broadcast(b_val)

    def mapper(row):
        x = np.array([float(row[i]) for i in range(64)], dtype=np.float64)
        y = float(row[64])
        score = float(np.dot(w_broadcast.value, x) + b_broadcast.value)
        return max(0.0, 1.0 - y * score)

    hinge_losses = data_df.rdd.map(mapper)
    total_hinge_loss = hinge_losses.reduce(lambda a, b: a + b)
    mean_hinge_loss = total_hinge_loss / data_df.count()
    reg_term = lamb * np.dot(w_vec, w_vec)

    return mean_hinge_loss + reg_term

In [26]:
objective_value = loss_SVM(w, bias, data_svm, lamb=1)
print("SVM Objective Value:", objective_value)


SVM Objective Value: 1.002940383485752
