In [None]:
# SETUP


import re
import numpy as np


# load up all the documents in the corpus
train_corpus = sc.textFile("s3://chrisjermainebucket/comp330_A5/TestingDataOneLinePerDoc.txt")
test_corpus = sc.textFile("s3://chrisjermainebucket/comp330_A5/SmallTrainingDataOneLinePerDoc.txt")


# each entry in validLines will be a line from the text file
train_validLines = train_corpus.filter(lambda x : 'id' in x)
test_validLines = test_corpus.filter(lambda x : 'id' in x)


# now we transform it into a bunch of (docID, text) pairs
train_keyAndText = train_validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:]))
test_keyAndText = test_validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:]))


# now we split the text in each (docID, text) pair into a list of words
# after this, we have a data set with (docID, ["word1", "word2", "word3", ...])
# we have a bit of fancy regular expression stuff here to make sure that we do not
# die on some of the documents
regex = re.compile('[^a-zA-Z]')
train_keyAndListOfWords = train_keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))
test_keyAndListOfWords = test_keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))


# now get the top 20,000 words... first change (docID, ["word1", "word2", "word3", ...])
# to ("word1", 1) ("word2", 1)...
train_allWords = train_keyAndListOfWords.flatMap(lambda x: ((j, 1) for j in x[1]))
test_allWords = test_keyAndListOfWords.flatMap(lambda x: ((j, 1) for j in x[1]))


# now, count all of the words, giving us ("word1", 1433), ("word2", 3423423), etc.
train_allCounts = train_allWords.reduceByKey (lambda a, b: a + b)
test_allCounts = test_allWords.reduceByKey (lambda a, b: a + b)

In [None]:
# TASK 1


train_mostFrequent = train_allCounts.sortBy(lambda x: (-x[1], x[0])).take(20000)
test_mostFrequent = test_allCounts.sortBy(lambda x: (-x[1], x[0])).take(20000)


train_mostFrequentDict = {}
for idx, (word, wordCount) in enumerate(train_mostFrequent):
   train_mostFrequentDict[word] = idx


print(f"applicant: {train_mostFrequentDict.get('applicant', -1)}")
print(f"and: {train_mostFrequentDict.get('and', -1)}")
print(f"attack: {train_mostFrequentDict.get('attack', -1)}")
print(f"protein: {train_mostFrequentDict.get('protein', -1)}")
print(f"car: {train_mostFrequentDict.get('car', -1)}")

In [None]:
# TASK 2


def wordsNumpyArray(words, mostFrequentDict):
   vector = np.zeros(20000)
   for word in words:
       if word in mostFrequentDict:
           vector[mostFrequentDict[word]] += 1
   return vector


train_keyAndCountOfWords = train_keyAndListOfWords.map(lambda x: (x[0], wordsNumpyArray(x[1], train_mostFrequentDict)))
test_keyAndCountOfWords = test_keyAndListOfWords.map(lambda x: (x[0], wordsNumpyArray(x[1], train_mostFrequentDict)))


def find_TF(word_count_vector):
   total_num_words = np.sum(word_count_vector)
   tf_vector = word_count_vector / total_num_words
   return tf_vector


def find_IDF(keyAndListOfWords):
   word_doc_pairs = keyAndListOfWords.flatMap(lambda x: [(word, x[0]) for word in set(x[1])]).distinct()
   total_num_docs = keyAndListOfWords.count()
   word_presence = word_doc_pairs.map(lambda x: (x[0], 1))
   doc_frequency = word_presence.reduceByKey(lambda a, b: a + b)
   idf_vector = doc_frequency.map(lambda x: (x[0], np.log((total_num_docs) / (x[1])))).collectAsMap()
   return idf_vector


train_idf_vector = find_IDF(train_keyAndListOfWords)
test_idf_vector = find_IDF(test_keyAndListOfWords)


def find_TFIDF_vector(word_count_vector, idf_vector, mfd):
   tf_vector = find_TF(word_count_vector)
   tfidf_vector = np.zeros(20000)
   for word, index in mfd.items():
       if tf_vector[index] > 0:
           idf_val = idf_vector.get(word, 0)
           tfidf_vector[index] = tf_vector[index] * idf_val 
   return tfidf_vector


train_keyAndTFIDFVector = train_keyAndCountOfWords.map(lambda x: (x[0], find_TFIDF_vector(x[1], train_idf_vector, train_mostFrequentDict)))
train_keyAndTFIDFVector.cache()


test_keyAndTFIDFVector = test_keyAndCountOfWords.map(lambda x: (x[0], find_TFIDF_vector(x[1], test_idf_vector, train_mostFrequentDict)))
test_keyAndTFIDFVector.cache()

In [None]:
# SEPARATE DATA AND LABELS AND NORMALIZE DATA


def extract_data_and_labels(key_tfidf_pair):
   key, tfidf_vector = key_tfidf_pair
  
   # Extract label based on document id prefix
   label = 1 if key[:2] == "AU" else 0
   return tfidf_vector, label


# Apply the mapping function to the RDD
train_data_and_labels = train_keyAndTFIDFVector.map(extract_data_and_labels)
test_data_and_labels = test_keyAndTFIDFVector.map(extract_data_and_labels)


# Split into separate RDDs for data and labels
train_data = train_data_and_labels.map(lambda x: x[0])
test_data = test_data_and_labels.map(lambda x: x[0])


train_vector_sum = np.array(train_data.reduce(lambda x, y: [xi + yi for xi, yi in zip(x, y)]))
train_total_count = train_data.count()
train_mean_vector = train_vector_sum / train_total_count


train_squared_diff_sum = train_data.map(lambda x: np.square(np.array(x) - train_mean_vector)).reduce(lambda x, y: np.add(x, y))
train_std_dev_vector = np.sqrt(train_squared_diff_sum / (train_total_count - 1)) * 100


nrml_train_data = train_data.map(lambda x: (x - train_mean_vector) / (train_std_dev_vector + 1e-8))
nrml_test_data = test_data.map(lambda x: (x - train_mean_vector) / (train_std_dev_vector + 1e-8))


train_labels = train_data_and_labels.map(lambda x: x[1])
test_labels = test_data_and_labels.map(lambda x: x[1])


nrml_train_data.cache()
train_labels.cache()
nrml_test_data.cache()
test_labels.cache()

In [None]:
# PERFORM GRADIENT DESCENT TRAINING


# evaluates the loss function and returns the loss
#
# x is the data set
# y is the labels
# w is the current set of weights
# c is the weight of the slack variables
#
def f (x, y, w, lamb):
   regularization_term = 0.5 * lamb * np.sum(w ** 2)  # L2 regularization
  
   loss_term = x.zip(y).map(lambda tfidf_label: -tfidf_label[1] * (tfidf_label[0].dot(w)) + np.log(1 + np.exp(tfidf_label[0].dot(w))))
   loss_term_sum = loss_term.reduce(lambda x, y: x + y)


   loss = regularization_term + loss_term_sum
   return loss


# evaluates and returns the gradient
#
# x is the data set
# y is the labels
# w is the current set of weights
# c is the weight of the slack variables
#
def gradient(x, y, w, lamb):
   gradient = np.zeros(x.first().shape[0]) # x.first().shape[0] is number of dimensions in data set
   regularization_term = lamb * w # L2 regularization
  
   gradient_term = x.zip(y).map(lambda tfidf_label: -tfidf_label[1] * tfidf_label[0] +
                                           tfidf_label[0] * (np.exp(tfidf_label[0].dot(w)) /
                                           (1 + np.exp(tfidf_label[0].dot(w)))))
   gradient_term_sum = gradient_term.reduce(lambda x, y: np.add(x, y))
  
   gradient += regularization_term + gradient_term_sum
  
   return gradient 
               
# performs gradient descent optimization, returns the learned set of weights
# uses the bold driver to set the learning rate
#
# x is the data set
# y is the labels
# w is the current set of weights  to start with
# c is the weight of the slack variable
#
def gd_optimize (x, y, w, lamb):
   rate = 0.1
   w_last = w + np.full (x.first().shape[0], 1.0)
   curr_cost = f(x, y, w, lamb)
   prev_cost = f(x, y, w_last, lamb)
   for _ in range(100):
   # while (abs(curr_cost - prev_cost) > 10e-2):
       w_last = w
       w = w - rate * gradient (x, y, w, lamb)
       curr_cost = f(x, y, w, lamb)
       prev_cost = f(x, y, w_last, lamb)
       if curr_cost > prev_cost:
           rate = rate * .5
       else:
           rate = rate * 1.1


       print ("cost", f (x, y, w, lamb))
      
   return w


w = np.zeros (nrml_train_data.first().shape[0])
w = gd_optimize (nrml_train_data, train_labels, w, 0.001)

In [None]:
# TASK 3: MAKE PREDICTIONS


# make predictions using all of the data points in x
# print ‘success’ or ‘failure’ depending on whether the
# prediction is correct
#
# x is the data set
# y is the labels
# w is the current set of weights
#
false_postives = []
def predict (x, y, w):
   global false_positives
   predictions = x.map(lambda point: 1 if point.dot(w) > 0 else 0)
   predictions_labels = predictions.zip(y)
   tp_count = predictions_labels.filter(lambda pred_label: pred_label[0] == 1 and pred_label[1] == 1).count()
   fp_count = predictions_labels.filter(lambda pred_label: pred_label[0] == 1 and pred_label[1] == 0).count()
   fn_count = predictions_labels.filter(lambda pred_label: pred_label[0] == 0 and pred_label[1] == 1).count()
   tn_count = predictions_labels.filter(lambda pred_label: pred_label[0] == 0 and pred_label[1] == 0).count()
   total_predictions = predictions.count()
      
   correct = tp_count + tn_count
   precision = tp_count / (tp_count + fp_count) if (tp_count + fp_count) > 0 else 0
   recall = tp_count / (tp_count + fn_count) if (tp_count + fn_count) > 0 else 0
   f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
  
   pred_lbl_idx = predictions_labels.zipWithIndex()
   fp_pred_lbl_idx = pred_lbl_idx.filter(lambda pred_lbl_idx: pred_lbl_idx[0][0] == 1 and pred_lbl_idx[0][1] == 0)
   fp_idx = fp_pred_lbl_idx.map(lambda pred_lbl_idx: pred_lbl_idx[1])
  
   false_positives = fp_idx.take(3)
  
   print("tp", tp_count)
   print("tn", tn_count)
   print("fp", fp_count)
   print("fn", fn_count)
  
   print(f"{correct} out of {total_predictions} correct.")
   print(f"Precision: {precision}")
   print(f"Recall: {recall}")
   print(f"F1 Score: {f1_score}")


predict (nrml_test_data, test_labels, w)
print("I tried but I couldn't figure out what the bug in my code was :(")




# GET TOP 50 WORDS WITH CORRESPONDING REGRESSION COEFFICIENT


w_copy = w.copy()
w_list = list(w_copy)
sorted_w = np.sort(w_copy)[::-1]
indexes = []


for i in range(50):
   indexes.append(w_list.index(sorted_w[i]))
  
reverse_mfd = {}
for key, val in train_mostFrequentDict.items():
   reverse_mfd[val] = key


print("Top 50 words with largest regression coefficients")
for i in range(len(indexes)):
   print(f"Number {i + 1} word: {reverse_mfd[indexes[i]]}")