In [41]:
from pyspark import SparkContext, SparkConf

In [42]:
conf = SparkConf()
sc = SparkContext.getOrCreate(conf=conf)

In [45]:
# Function to remove invalid/bad characters from words in file
def bad_char_remover(word):
    bad_chars = ['~', '!', '#', '$', '%', '^', '&', '*', '(', ')', '-', '+', '[', ']', ';', ':', '"', ',', '.', '?', '<', '>', '\\', '/', '_', '=']
    for char in word:
        if char in bad_chars:
            word = word.replace(char, "")
        if char == '\r' or char == "\n":
            word = word.replace(char," ")
    return word

In [47]:
# Read in each file 
path = "Data/"
rdd = sc.wholeTextFiles(path)

# Convert words into lower case to avoid duplicates and clean out invalid/bad characters 
cleaned_rdd = rdd.flatMap(lambda file_content:[(file_content[0], word, 1) for word in bad_char_remover(file_content[1]).lower().split()])

In [53]:
# Filter out stop list words from rdd
stop_list = ["they", "she", "he", "it", "the", "as", "is", "and"]
filterd_rdd = cleaned_rdd.filter(lambda cleaned_word : cleaned_word[1] not in stop_list)

In [59]:
# Map and reduce to get cumulative occurences of words per file
output = filterd_rdd.map(lambda string: ((string[1], string[0]), string[2]))
# print(output.take(1))
output = output.reduceByKey(lambda a,b: a+b)
# print(output.take(1))

[(('othello', 'file:/notebooks/Data/Data/othello'), 1)]
[(('othello', 'file:/notebooks/Data/Data/othello'), 339)]


In [60]:
# Map and reduce to get cumulative occurences in file per word (Inverted index)
output = output.map(lambda string: (string[0][0], [(string[0][1], string[1])]))
# print(output.take(1))
output = output.reduceByKey(lambda a,b: a+b)
# print(output.take(1))

[('othello', [('file:/notebooks/Data/Data/othello', 339)])]
[('othello', [('file:/notebooks/Data/Data/othello', 339), ('file:/notebooks/Data/Data/Miserables.txt', 1)])]


In [62]:
output.coalesce(1).saveAsTextFile("results/")