In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
import json
import os
import math

In [3]:
conf = SparkConf().setAppName('word count').setMaster('local[3]')
sc = SparkContext(conf = conf)

In [4]:
stoplist = sc.textFile('./data/stopwords.txt')
broadcastStopList = sc.broadcast(set(stoplist.collect()))

In [5]:
punctuations = sc.broadcast(set(".,:;'!?"))

In [6]:
inputFileDir = './data'
inputFiles = ['4300-0.txt', 'pg19033.txt', 'pg36.txt', 'pg514.txt', 'pg1497.txt', 'pg3207.txt', 'pg42671.txt', 'pg6130.txt']
rdds = []
N = len(inputFiles)

In [7]:
for i, inputFile in enumerate(inputFiles):
    increment = [0 for _ in range(N)]
    increment[i] = 1
    lines = sc.textFile(os.path.join(inputFileDir, inputFile))
    words = lines.flatMap(lambda line : line.split())
    filteredPunctuation = words.map(lambda word : word[1:] if len(word) > 0 and word[0] in punctuations.value else word).map(lambda word : word[:-1] if len(word) > 0 and word[-1] in punctuations.value else word)
    filteredWords = filteredPunctuation.filter(lambda word : word.lower() not in broadcastStopList.value)
    counts = filteredWords.map(lambda word : (word.lower(), increment)).reduceByKey(lambda a, b : [a[k] + b[k] for k in range(N)])
    rdds.append(counts)

In [8]:
unionRdd = rdds[0]
for i in range(1, N):
    unionRdd = unionRdd.union(rdds[i])
matrixRdd = unionRdd.reduceByKey(lambda a, b : [a[k] + b[k] for k in range(N)])

In [9]:
tfidf = matrixRdd.map(lambda x : (x[0], [k * math.log(N / sum(j > 0 for j in x[1])) for k in x[1]]))

In [None]:
output = {}
for i in range(N):
    top5 = tfidf.filter(lambda x : x[1][i] > 2).takeOrdered(5, key=lambda x : -x[1][i])
    for j in range(5):
        output[top5[j][0]] = top5[j][1][i]

In [None]:
with open('sp4.json', 'w') as outfile:
    json.dump(output, outfile)