In [5]:
from __future__ import print_function
import re
import sys
import numpy as np
from operator import add
import psutil
import time
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
# from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.linalg import Vectors
# from pyspark.ml.classification import LinearSVC

In [6]:
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [7]:
# Build an array of size 20,000 with each position tells how many occurance of 
# the word in that position of the top-20-frequent-words dictionary
def buildArray(listOfIndices):

    returnVal = np.zeros(20000)
    
    for index in listOfIndices:
        returnVal[index] = returnVal[index] + 1
    
    mysum = np.sum(returnVal)
    
    returnVal = np.divide(returnVal, mysum)
    
    return returnVal


# # Check if the predicted label is a TP, FN, FP or TN
# # Returns an array of len(4) containing binary values indicating the notation
# # Ex: [0,1,0,0] means the prediction is a False Negative (FN)
# def predictionNotation(true, pred):
#     TP, FN, FP, TN = 0, 0, 0, 0
    
#     if true == 1:
#         if pred == 1: 
#             TP = 1
#         else:
#             FN = 1
#     elif true == 0:
#         if pred == 1:
#             FP = 1
#         else:
#             TN = 1
    
#     return np.array([TP, FN, FP, TN])

# # calcuate the f1-score
# def f_score(TP, FP, P):
#     # f1 = NA if there is no predicted positive
#     if TP == 0:
#         f_score = 'NaN (zero TP)'
#     else:
#         recall = TP/P
#         precision = TP/(TP+FP)
#         f_score = 2*precision*recall / (precision+recall)
    
#     return f_score

In [3]:
sc = SparkContext.getOrCreate()

21/12/22 16:58:57 WARN Utils: Your hostname, Yihuis-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.68.120 instead (on interface en0)
21/12/22 16:58:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/12/22 16:58:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Data Preprocesing

In [8]:
# use a regular expression here to check for non-letter characters
regex = re.compile('[^a-zA-Z]')

# Load file into an RDD
d_corpus = sc.textFile('SmallTrainingData.txt')

# Transform both into a set of (docID, text) pairs
d_keyAndText = d_corpus\
            .map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:][:-6]))

# remove all non letter characters
# Split the text in each (docID, text) pair into a list of words
# Resulting RDD is a dataset with (docID, ["word1", "word2", "word3", ...])
d_keyAndListOfWords = d_keyAndText\
            .map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))

# Term Frequency Matrix

In [9]:
# map (docID, ["word1", "word2", "word3", ...])
# to ("word1", 1) ("word2", 1)...
allWords = d_keyAndListOfWords\
            .map(lambda x: (x[1]))\
            .flatMap(lambda x: [w for w in x])\
            .map(lambda x: (x, 1))

# count all of the words, giving --> ("word1", 1433), ("word2", 3423423), etc.
allCounts = allWords.reduceByKey(lambda x, y: x+y)

# Get the top 20,000 words in a local array in a sorted format based on frequency
topWords = allCounts.top(20000, key=lambda x: x[1])
topWords = np.array(topWords)

# Create a RDD that has a set of (word, dictNum) pairs
# start by creating an RDD that has the number 0 through 19999
# 20000 is the number of words that will be in our dictionary
topWordsK = sc.parallelize(range(20000))

# Then, transform (0), (1), (2), ... to ("MostCommonWord", 1) ("NextMostCommon", 2), ...
# the number will be the spot in the dictionary used to tell us where the word is located
dictionary = topWordsK.map(lambda x: (topWords[x][0], x))

# Will be using Map-Side Join Operation
# Collect the small RDD as Map (a dict in python)
dictionaryAsMap = dictionary.collectAsMap()

# broad cast this to all worker nodes. 
sc.broadcast(dictionaryAsMap)

# Create a RDD that has, for each (docID, ["word1", "word2", "word3", ...]),
# ("word1", docID), ("word2", docId), ...
allWordsWithDocID = d_keyAndListOfWords\
                    .flatMap(lambda x: ((j, x[0]) for j in x[1]))


# Then do a simple map on it to get a set of (word, (dictionaryPos, docID)) pairs
allDictionaryWords = allWordsWithDocID\
                        .map(lambda x: (x[0], (x[1], dictionaryAsMap.get(x[0]))) 
                         if x[0] in dictionaryAsMap.keys() else None)\
                        .filter(lambda x: x!=None)\
                        .map(lambda x: (x[0], (x[1][1], x[1][0])))

# Drop the actual word itself to get a set of (docID, dictionaryPos) pairs
justDocAndPos = allDictionaryWords.map(lambda x: (x[1][1], x[1][0]))

# Create a set of (docID, [dictionaryPos1, dictionaryPos2, dictionaryPos3...]) pairs
allDictionaryWordsInEachDoc = justDocAndPos.groupByKey()

# Converts the dictionary positions to a bag-of-words numpy array...
# use the buildArray function to build the feature array
# this gives a set of (docID, featureArray)
allDocsAsNumpyArrays = allDictionaryWordsInEachDoc\
                        .map(lambda x: (x[0], buildArray(x[1])))

                                                                                

# Reduce Feature Dimension Based on Variance

In [10]:
# Store the number of docs in the training set
numOfDocs = allDocsAsNumpyArrays.count() 

# calculate mean for each feature
SumOfFeatures = allDocsAsNumpyArrays.map(lambda x: x[1]).reduce(lambda x, y: x+y)
meanOfFeatures = SumOfFeatures / numOfDocs  # a vector of length 20k

# calculate variance for each feature
# Omit 'dividing by n' in calculation as it has no impact on the feature selection
varianceOfFeatures = allDocsAsNumpyArrays\
                        .map(lambda x: x[1])\
                        .map(lambda x: (x - meanOfFeatures)**2)\
                        .reduce(lambda x, y: x+y)

# Index of the top 10k features with highest variance
top10kVarWordIdx = (-varianceOfFeatures).argsort()[:10000]


# Make a subset of the selected features
allDocsAsNumpyArrays_Reduced = allDocsAsNumpyArrays\
                                .map(lambda x: (x[0], x[1][top10kVarWordIdx]))

                                                                                

In [11]:
# An example reduced feature array for a document
allDocsAsNumpyArrays_Reduced.take(1)

[('AU35',
  array([0.11425061, 0.06511057, 0.01781327, ..., 0.        , 0.        ,
         0.        ]))]