**HW 03**  
> In this homework, we will use MapReduce to calculate two canonical quantities in data analyses: the term frequency-inverse document frequency (tf-idf) measure, and the loss function for support vector machine.  
> In this homework, you should write your own functions to calculate the quantities, instead of using functions from pyspark's MLLib library. Your code should utilize the RDDs and dataframes created from pyspark.  
> While you probably will be able to use Chat to generate all of the necessary functions, I would encourage you to give it a try to design and process through how you may do it, before asking Chat.

In [None]:
# run once to download data- data should now be stored in work folder!
# !curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

In [1]:
# import necessary packages
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType

import numpy as np

In [2]:
# set up pyspark:
spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

sc = spark.sparkContext

# ignore warnings:
spark.sparkContext.setLogLevel("ERROR")

# read in csv:
agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/23 01:37:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [17]:
# fix headers:
agnews = agnews.withColumnRenamed('_c0', 'id')
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [18]:
# each row contains the document id and a list of filtered words
# agnews.show(5, truncate=30)

agnews = agnews.limit(10)

In [None]:
# term frequency:

In [19]:
# TF MAP:
def tf_map(row):
    
    id = row['id']
    terms = [term.lower() for term in row['filtered']]

    length = len(terms)        # divide by length to get term frequency, not count!

    for term in terms:
        yield ((term, id), 1/length) 

In [21]:
tf_data = (agnews.rdd
            .flatMap(tf_map)                      # tells spark we are applying over each row
            .reduceByKey(lambda a, b: a+b)        # reduces by like keys
            .collect()
            )

# returns ((term, doc), idf)

                                                                                

In [23]:
# IDF MAP:
def idf_map(row):
    
    id = row['id']
    terms = [term.lower() for term in row['filtered']]
    terms = set(terms)

    for term in terms:
        yield (term, 1) 

# IDF = log(#docs / #docs_term)

# (term, count) ==> yield per doc, sum count of each doc
# --> sum count over each term

In [24]:
num_docs = agnews.count()

idf_data = (agnews.rdd
            .flatMap(idf_map)                     # tells spark we are applying over each row
            .reduceByKey(lambda a, b: a+b)        # reduces by like keys (COUNT AND DIVIDE BY NUM DOCS)
            .mapValues(lambda x: np.log(num_docs / x))
            .collect()
            )

# returns (term, idf)

In [46]:
# Calculate tf-idf measure for each row in the agnews_clean.csv. Save the measures in a new column.
    # tf_data --> ((term, doc), tf)
    # idf_data --> (term, idf)

# PARALLELIZE DATA (too slow before):
tf_rdd = sc.parallelize(tf_data)  # ((term, doc), tf)
idf_rdd = sc.parallelize(idf_data)  # (term, idf)


## JOIN DATASETS:
# create new rdd for tf with key = term
tf_by_term = tf_rdd.map(lambda x: (x[0][0], (x[0][1], x[1])))  # (term, (doc_id, tf))

# join with idf now that keys are same
tfidf_rdd = tf_by_term.join(idf_rdd)                           # (term, ((doc_id, tf), idf))

# remap dataframe and calculate tf-idf
tfidf_rdd = tfidf_rdd.map(lambda x: ((x[0], x[1][0][0]), x[1][0][1] * x[1][1]))  # ((term, doc), tf-idf)


## CREATE VECTORS OF TF-IDF:
# group by doc
tfidf_by_doc = tfidf_rdd.map(lambda x: (x[0][1], (x[0][0], x[1])))

# create vectors of tf_idf values per document
tfidf_vectors = tfidf_by_doc.groupByKey().mapValues(lambda keys: [tf_idf for term, tf_idf in keys]).sortBy(lambda x: x[0])

In [51]:
# print out tf-idf values for first five documents:
tfidf_vectors.take(5)

                                                                                

[(0,
  [0.06688737801810755,
   0.08941321735745002,
   0.17882643471490003,
   0.08941321735745002,
   0.08941321735745002,
   0.05675840264066563,
   0.08941321735745002,
   0.08941321735745002,
   0.08941321735745002,
   0.08941321735745002,
   0.08941321735745002,
   0.08941321735745002,
   0.08941321735745002,
   0.08941321735745002,
   0.08941321735745002,
   0.08941321735745002]),
 (1,
  [0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.05960881157163334,
   0.03783893509377709,
   0.05960881157163334,
   0.08528092937014985,
   0.04459158534540504,
   0.1705618587402997,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.05960881157163334,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985,
   0.08528092937014985]),
 (2

In [84]:
# add each vector as new column in dataframe:

# create dataframe from tf_idf vectors:
tfidf_clean = tfidf_vectors.map(lambda x: (x[0], [float(score) for score in x[1]]))

tfidf_df = tfidf_clean.map(
    lambda x: Row(id=x[0], tfidf_vector=x[1])
).toDF()

# join w/ old dataset into new dataset:
agnews_joined = agnews.join(tfidf_df, on='id', how='left')

# display columns:
print(agnews_joined.columns)

['id', 'filtered', 'tfidf_vector']
