# Question 1


In [None]:
# Load in the data
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  16.5M      0  0:00:02  0:00:02 --:--:-- 16.5M


In [None]:
from pyspark.sql import SparkSession

# Begin spark session
spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

# Read data
agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# Turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [None]:
# Each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import col
from pyspark.sql.functions import size

# Explode the data such that each word gets its own row in order to be analyzed more conveniently
words = agnews.select('_c0', explode(col('filtered')).alias('word'))

words.show(5)

+---+-----+
|_c0| word|
+---+-----+
|  0| wall|
|  0|   st|
|  0|bears|
|  0| claw|
|  0| back|
+---+-----+
only showing top 5 rows



In [None]:
# Group by the document id and then word and aggregate by counting the number occurrences; result is the word count by document
word_counts_per_document = (words
                            .groupBy('_c0', 'word')
                            .count()
                            .withColumnRenamed('count', 'word_count'))

# Show the result, compare against document 0 to validate that it is working properly
word_counts_per_document.orderBy('_c0', ascending = True).show(18)

+---+----------+----------+
|_c0|      word|word_count|
+---+----------+----------+
|  0|        st|         1|
|  0|    cynics|         1|
|  0|     ultra|         1|
|  0|     green|         1|
|  0|    street|         1|
|  0|     bears|         1|
|  0|    seeing|         1|
|  0|      band|         1|
|  0|   sellers|         1|
|  0|      claw|         1|
|  0|      wall|         2|
|  0|   reuters|         2|
|  0|     black|         1|
|  0| dwindling|         1|
|  0|     short|         1|
|  0|      back|         1|
|  1|    toward|         1|
|  1|investment|         1|
+---+----------+----------+
only showing top 18 rows



In [None]:
# Sum the word counts of all words in each document; result is the number of words in each document
total_words_per_document = (word_counts_per_document
                            .groupBy('_c0')
                            .sum('count')
                            .withColumnRenamed('sum(count)', 'words_in_doc'))

# Show the results and validate that it is wordking properly
total_words_per_document.orderBy('_c0', ascending = True).show(10)

+---+------------+
|_c0|words_in_doc|
+---+------------+
|  0|          18|
|  1|          27|
|  2|          24|
|  3|          28|
|  4|          30|
|  5|          32|
|  6|          30|
|  7|          30|
|  8|          44|
|  9|          19|
+---+------------+
only showing top 10 rows



In [None]:
from pyspark.sql.functions import col

# Group by the word to determine if the word appears in that document that count over all documents; result is the number of documents in which that word appears
documents_per_word = (
    word_counts_per_document
    .groupBy('word')
    .count()
    .withColumnRenamed('count', 'document_count')
    .orderBy('document_count', ascending=False)
)

# Display results
documents_per_word.show(10)

+-------+--------------+
|   word|document_count|
+-------+--------------+
|     39|         31734|
|   said|         20141|
|    new|         18950|
|reuters|         13750|
|     us|         10424|
|    two|          9657|
|  first|          9193|
|   year|          9116|
|     ap|          9040|
| monday|          7855|
+-------+--------------+
only showing top 10 rows



In [None]:
# Join word counts by document data with total words in the document on the document id
combined = word_counts_per_document.join(total_words_per_document, '_c0')

# Join the previous dataframe with the number of documents in which each word appears for a complete dataframe of all needed data
combined_all = combined.join(documents_per_word, 'word')

# Show the results to validate that all columns were combined correctly into one dataframe
combined_all.orderBy('_c0', ascending = True).show(18)

+---------+---+----------+------------+--------------+
|     word|_c0|word_count|words_in_doc|document_count|
+---------+---+----------+------------+--------------+
|dwindling|  0|         1|          18|            34|
|       st|  0|         1|          18|          1217|
|     back|  0|         1|          18|          4233|
|    ultra|  0|         1|          18|            76|
|    green|  0|         1|          18|           719|
|   street|  0|         1|          18|          1502|
|    bears|  0|         1|          18|           295|
|   seeing|  0|         1|          18|           143|
|   cynics|  0|         1|          18|             5|
|  sellers|  0|         1|          18|            41|
|     band|  0|         1|          18|           181|
|     wall|  0|         2|          18|          1277|
|     claw|  0|         1|          18|            16|
|    black|  0|         1|          18|           627|
|  reuters|  0|         2|          18|         13750|
|    short

In [None]:
# Convert the dataframe to an RDD for mapping and reducing functions to be applied
combined_all_rdd = combined_all.select('_c0', 'word', 'word_count', 'words_in_doc', 'document_count').rdd

# Show the first few rows to validate that it converted correctly
combined_all_rdd.take(5)

[Row(_c0=42468, word='online', word_count=1, words_in_doc=26, document_count=2444),
 Row(_c0=45307, word='online', word_count=1, words_in_doc=28, document_count=2444),
 Row(_c0=23364, word='online', word_count=1, words_in_doc=13, document_count=2444),
 Row(_c0=36538, word='still', word_count=1, words_in_doc=58, document_count=2281),
 Row(_c0=26425, word='still', word_count=1, words_in_doc=31, document_count=2281)]

In [None]:
# Number of documents total in agnews; result to be used as a constant in analysis
num_docs = agnews.count()

# Display rsult
num_docs

127600

In [None]:
from math import log

# Compute TF-IDF per word by mapping a function onto each row in the rdd and performing operations according to the given formula
tfidf = combined_all_rdd.map(lambda row: (
    row._c0,
    {'word': row.word, 'tfidf': (row.word_count / row.words_in_doc) * log(num_docs / row.document_count)}
))

# Combine results for repeating words in documents and track the TF-IDF by document and word using the groupByKey and convert result to a list
tfidf_grouped = tfidf.groupByKey().mapValues(list).sortByKey(ascending=True)

# Display the TF-IDF values for each word in the first 5 documents
tfidf_grouped.take(5)


[(0,
  [{'word': 'cynics', 'tfidf': 0.563734318747707},
   {'word': 'green', 'tfidf': 0.2877107940095433},
   {'word': 'claw', 'tfidf': 0.499114829314058},
   {'word': 'back', 'tfidf': 0.1892216338539946},
   {'word': 'dwindling', 'tfidf': 0.4572386180709258},
   {'word': 'band', 'tfidf': 0.3643421454792778},
   {'word': 'reuters', 'tfidf': 0.24754017186645658},
   {'word': 'bears', 'tfidf': 0.3372044607529448},
   {'word': 'wall', 'tfidf': 0.5115985326511431},
   {'word': 'ultra', 'tfidf': 0.4125512394225831},
   {'word': 'st', 'tfidf': 0.2584728642725166},
   {'word': 'sellers', 'tfidf': 0.4468379768438066},
   {'word': 'black', 'tfidf': 0.2953171727366614},
   {'word': 'short', 'tfidf': 0.2773120373951269},
   {'word': 'seeing', 'tfidf': 0.37743394553516213},
   {'word': 'street', 'tfidf': 0.24678348986493034}]),
 (1,
  [{'word': 'investment', 'tfidf': 0.1890771769001148},
   {'word': 'commercial', 'tfidf': 0.2057832028092643},
   {'word': 'reputation', 'tfidf': 0.2578098186776328},

# Question 2


In [None]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   3556      0 --:--:-- --:--:-- --:--:--  3557
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0     61      0 --:--:-- --:--:-- --:--:--    61
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  19.2M      0  0:00:03  0:00:03 --:--:-- 19.2M


In [None]:
from pyspark.sql import SparkSession

# Begin spark session
spark = SparkSession.builder \
    .appName("SVM Data Load") \
    .getOrCreate()

sc = spark.sparkContext


In [None]:
import pandas as pd

# Read the data directly into pyspark
data_svm = spark.read.csv("data_for_svm.csv", header=False, inferSchema=True)

# Read the weights data using pandas; flatten to a 1d array
w = pd.read_csv("w.csv", header=None).values.flatten()

# Read the bias data using pandas; take the item to extract the singular value contained within bias
bias = pd.read_csv("bias.csv", header=None).values.item()

# Broadcast the weight and bias into read-only objects
w= sc.broadcast(w)
bias = sc.broadcast(bias)

In [None]:
import numpy as np

# Separate the X and y data from data_svm to be conveniently used in calculations

# Count the number of columns in data
cols = len(data_svm.columns)

# X values (all columns except the last in data_svm); create rdd
X_rdd = data_svm.rdd.map(lambda input: np.array([input[i] for i in range(cols - 1)]))

# Y values (last column from data_svm); create rdd
y_rdd = data_svm.rdd.map(lambda input: input[cols - 1])


In [None]:
# Define the loss function

def loss_SVM(w, b, X, y, lmb):
    # Pair elements of x and y to easily track which y value corresponds to which x values
    xy_rdd = X.zip(y)

    # Map the calculations for the hinge value onto each row using the formula given in the assignment
    hinge_value = xy_rdd.map(lambda pair: max(
        0,
        1 - pair[1] * (np.dot(w.value, pair[0]) + b.value)
    ))

    # Sum all hinge values, according to the given formula, using reduce
    sum_hinge = hinge_value.reduce(lambda a, b: a + b)

    # Count the number of rows in the data
    n = xy_rdd.count()

    # Calculate the average hinge value
    avg_hinge = sum_hinge / n

    # Calculate the L2 Norm
    l2_norm = np.sum(w.value ** 2)

    # Return the final value below according to the formula given in the assignment
    return print("The value of the loss function is:",round(avg_hinge + lmb * l2_norm, 5))



In [None]:
# Display the final value of the loss function using the given data
loss_SVM(X = X_rdd,y= y_rdd,w = w, b = bias, lmb =1)


The value of the loss function is: 1.00295


In [None]:
# Generate the predictions using the svm classifier by mapping the formula given onto each row
predict_classifier = X_rdd.map(lambda x: 1 if np.dot(w.value, x) + bias.value >= 0 else -1)

# Print the first 15 predictions using the given data
print("The first 15 predictions using the svm classifier are:",predict_classifier.take(15))

The first 15 predictions using the svm classifier are: [-1, -1, -1, 1, -1, 1, -1, -1, 1, -1, 1, -1, -1, -1, 1]


# Generative AI Statement

In this assignment, I used ChatGPT to help me determine how to create and group by keys and output the data in a way that both the TF-IDF value and document ID could be seen in a easy-to-read format in the first question when reducing. Specficially, this line of code was generated with the help of ChatGPT: tfidf_grouped = tfidf.groupByKey().mapValues(list).sortByKey(ascending=True).

Additionally, I used ChatGPT in the second question to help with connecting corresponding x and y values in the formula. Chat recommended using the .zip function for this purpose (xy_rdd = X.zip(y)). Chat also recommended broadcasting my variables into a read-only format once I read my data for w and bias (w= sc.broadcast(w); bias = sc.broadcast(bias)). This helped to resolve errors that I was encountering without first broadcasting the variables. In the function itself, I learned that I must use " w.value" and "b.value" when referencing the weight and bias data from ChatGPT because of the broadcasting of this data.