In [2]:
!mkdir -p dataset
!curl -L https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv \
     -o dataset/agnews_clean.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  16.2M      0  0:00:02  0:00:02 --:--:-- 16.2M


In [3]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("dataset/agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [4]:
# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



In [5]:
num_docs = agnews.count()
print("Total documents:", num_docs)

Total documents: 127600


# 1. tf-idf definition

## What do we need to calculate?

- For each $d$, the counts of $t$
- For each $d$, the counts of words,
- For each $t$, the counts of $d$ that contains $t$.
- what should be returned if we only want to know if the document contains $t$ of not.

In [6]:
# build rdd (docId, tokens)

from pyspark.sql.functions import col

doc_rdd = (
    agnews
    .select(col("_c0").alias("docId"),
            col("filtered").alias("tokens"))
    .rdd
    .map(lambda row: (row.docId, row.tokens))
)

In [7]:
# For each $d$, the counts of $t$
t_counts = (
    doc_rdd
    .flatMap(lambda doc: [((doc[0], t), 1) for t in doc[1]])
    .reduceByKey(lambda a, b: a + b)
)

t_counts.take(5)

[((0, 'claw'), 1),
 ((0, 'green'), 1),
 ((1, 'looks'), 1),
 ((1, 'investment'), 1),
 ((1, 'quietly'), 1)]

In [8]:
# For each $d$, the counts of words
word_counts = (
    doc_rdd
    .map(lambda doc: (doc[0], len(doc[1])))
)

word_counts.take(5)

[(0, 18), (1, 27), (2, 24), (3, 28), (4, 30)]

In [9]:
# For each $t$, the counts of $d$ that contains $t$

# get (term, id) from t_counts
t_docs = t_counts.map(lambda kv: (kv[0][1], kv[0][0]))

# remove duplicate term counts
distinct_t_docs = t_docs.distinct()

# count how many $d$ each term is in
d_counts = (
    distinct_t_docs
    .map(lambda td: (td[0], 1))
    .reduceByKey(lambda a, b: a + b)
)

d_counts.take(5)

[('latest', 1820),
 ('17', 1218),
 ('japan', 1948),
 ('corporate', 809),
 ('shuts', 82)]

In [10]:
# what should be returned if we only want to know if the document contains $t$ of not

# d_contains_t = RDD[((id, term), 1)]
d_contains_t = t_counts.map(lambda kv: (kv[0], 1))

d_contains_t.take(10)

[((0, 'claw'), 1),
 ((0, 'green'), 1),
 ((1, 'looks'), 1),
 ((1, 'investment'), 1),
 ((1, 'quietly'), 1),
 ((1, 'bets'), 1),
 ((2, 'cloud'), 1),
 ((2, 'outlook'), 1),
 ((2, 'doldrums'), 1),
 ((3, 'iraq'), 1)]

## Tasks
1. Design the MapReduce functions for calculating the tf-idf measure.
2. Calculate tf-idf measure for each row in the agnews_clean.csv. Save the measures in a new column.
3. Print out the tf-idf measure for the first 5 documents.

In [12]:
# compute tf
tf = (t_counts
      .map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
      .join(word_counts)
      .map(lambda rec: (
          (rec[0], rec[1][0][0]),
          rec[1][0][1] / rec[1][1]
      )))

tf.take(5)

[((0, 'claw'), 0.05555555555555555),
 ((0, 'green'), 0.05555555555555555),
 ((0, 'wall'), 0.1111111111111111),
 ((0, 'cynics'), 0.05555555555555555),
 ((0, 'back'), 0.05555555555555555)]

In [14]:
# compute idf
import math

idf = (d_counts.map(lambda kv: (kv[0], math.log(num_docs / kv[1]))))

idf.take(5)

[('latest', 4.250063869821986),
 ('17', 4.651690201622984),
 ('japan', 4.1820971656903465),
 ('corporate', 5.060856732834335),
 ('shuts', 7.349936402628574)]

In [15]:
# compute tf–idf measures

# rekey tf by term to join on idf
rekeyed_tf = tf.map(lambda kv: (kv[0][1], (kv[0][0], kv[1])))  

# join on idf & multiply
tf_idf = (
    rekeyed_tf
      .join(idf)
      .map(lambda kv: (
          (kv[1][0][0], kv[0]),
          kv[1][0][1] * kv[1][1]
      ))
)

tf_idf.take(10)

[((32, 'push'), 0.232878964391475),
 ((3808, 'push'), 0.1506863887238956),
 ((10928, 'push'), 0.3013727774477912),
 ((10976, 'push'), 0.24396843888630712),
 ((11536, 'push'), 0.204933488664498),
 ((12864, 'push'), 0.3202085760382781),
 ((21536, 'push'), 0.16526894247136936),
 ((26320, 'push'), 0.170777907220415),
 ((27904, 'push'), 0.1766668005728431),
 ((27984, 'push'), 0.1506863887238956)]

In [20]:
# save measures in new column

from pyspark.sql import Row

# create (docId, term, tfidf) rows
rows = tf_idf.map(lambda kv: Row(docId=kv[0][0], term=kv[0][1], tfidf=kv[1]))
tf_idf_df = spark.createDataFrame(rows)

# join onto original dataframe
agnews_with_id = agnews.withColumn("docId", F.monotonically_increasing_id())

result = (agnews_with_id.join(tf_idf_df, on="docId", how="left"))

result.show(5)

+-----+---+--------------------+-------+-------------------+
|docId|_c0|            filtered|   term|              tfidf|
+-----+---+--------------------+-------+-------------------+
|    0|  0|[wall, st, bears,...|   wall| 0.5115985326511431|
|    0|  0|[wall, st, bears,...|reuters|0.24754017186645658|
|    0|  0|[wall, st, bears,...|  short| 0.2773120373951269|
|    0|  0|[wall, st, bears,...|  ultra| 0.4125512394225831|
|    0|  0|[wall, st, bears,...|  bears| 0.3372044607529448|
+-----+---+--------------------+-------+-------------------+
only showing top 5 rows



In [23]:
# print out the tf-idf measure for the first 5 documents
result.filter("docId < 5").select("docId", "term", "tfidf").orderBy("docId", F.desc("tfidf")).show(5)

+-----+---------+------------------+
|docId|     term|             tfidf|
+-----+---------+------------------+
|    0|   cynics| 0.563734318747707|
|    0|     wall|0.5115985326511431|
|    0|     claw| 0.499114829314058|
|    0|dwindling|0.4572386180709258|
|    0|  sellers|0.4468379768438066|
+-----+---------+------------------+
only showing top 5 rows



# 2. SVM objective function

## Tasks
1. Design the MapReduce functions required to calculate the loss function.
2. Using these functions, create a function loss_SVM(w, b, X, y) to calculate the SVM objective for a given choice of w, b with data stored in X, y.
3. You are given the following dataset data_for_svm.csv, where the first 64 columns contain X and the last column contains y. Using the weights and bias provided in w.csv and bias.csv, calculate the objective value.
4. Design the MapReduce function required to make prediction. Predict for all of the data using the provided weights and bias.

In [23]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   4117      0 --:--:-- --:--:-- --:--:--  4115
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0     74      0 --:--:-- --:--:-- --:--:--    74
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  18.7M      0  0:00:03  0:00:03 --:--:-- 18.7M


In [24]:
# load svm data into rdd

svm_df = spark.read.csv("data_for_svm.csv", header=False, inferSchema=True)

# convert to rdd with (features, y) rows
svm_rdd = svm_df.rdd.map(
    lambda row: ([row[i] for i in range(len(row) - 1)], row[-1])
)

svm_rdd.take(3)

[([-1,
   -1,
   1,
   1,
   -1,
   1,
   1,
   1,
   1,
   -1,
   1,
   1,
   -1,
   -1,
   -1,
   1,
   -1,
   1,
   -1,
   1,
   -1,
   1,
   1,
   -1,
   1,
   1,
   1,
   -1,
   1,
   -1,
   -1,
   -1,
   1,
   1,
   1,
   -1,
   1,
   -1,
   -1,
   -1,
   -1,
   1,
   -1,
   1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   1],
  -1),
 ([1,
   1,
   -1,
   1,
   -1,
   -1,
   1,
   1,
   -1,
   1,
   1,
   -1,
   1,
   -1,
   1,
   1,
   1,
   1,
   -1,
   1,
   -1,
   -1,
   1,
   -1,
   1,
   -1,
   -1,
   1,
   1,
   1,
   1,
   -1,
   -1,
   1,
   1,
   -1,
   -1,
   -1,
   -1,
   1,
   1,
   1,
   1,
   1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   1,
   1,
   -1],
  1),
 ([1,
   1,
   1,
   -1,
   1,
   1,
   -1,
   1,
   -1,
   1,
   -1,
   -1,
   1,
   -1,
   -1,
   1,
   -1,
   -1,
   1,
   -1,
   -1,
   -

In [26]:
# broad cast weights and bias

import pandas as pd

# read weight vector from w.csv
w = pd.read_csv("w.csv", header=None).iloc[:,0].tolist()

# read bias scalar from bias.csv
b = float(pd.read_csv("bias.csv", header=None).iloc[0,0])

# broadcast weights and biases together
wb = spark.sparkContext.broadcast((w, b))

In [27]:
# create loss_SVM(w, b, X, y) function


def loss_SVM(data_rdd, lam):

    # get total # of examples
    num_examples = data_rdd.count()

    # get broadcasted (w, b) tuple
    w_b = wb.value
    
    # calculate hinge loss for each example and sum
    # for each (x, y):
    # - margin = y( wx + b)
    # - hinge = max(0, 1 - margin)
    hinge_sum = data_rdd.map(lambda xy: max(
        0.0,
        1 - xy[1] * (sum(w_i * x_i for w_i, x_i in zip(w_b[0], xy[0])) + w_b[1]
        )
    )).reduce(lambda a, b: a + b)
    
    # compute L2 regularization term
    l2_reg = lam * sum(w_i * w_i for w_i in w_b[0])

    # return objective value (regularization + mean hinge loss)
    return l2_reg + hinge_sum / num_examples

In [28]:
# example
print("SVM objective:", loss_SVM(svm_rdd, lam=0.1))

SVM objective: 1.000001871756999


In [29]:
# design MapReduce function
def map_reduce(data_rdd):
    # get broadcasted (w, b) tuple
    w_b = wb.value

    # compute predicted label for each example
    return data_rdd.map(
        lambda xy: (
            xy[1],
            1 if (sum(w_i * x_i for w_i, x_i in zip(w_b[0], xy[0])) + w_b[1]) >= 0
              else -1
        )
    )

In [30]:
# example
predictions = map_reduce(svm_rdd)
predictions.take(10)

[(-1, 1),
 (1, -1),
 (1, -1),
 (1, -1),
 (-1, -1),
 (-1, -1),
 (-1, -1),
 (1, 1),
 (-1, -1),
 (1, -1)]