In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
from operator import add

In [3]:
conf = SparkConf().setMaster("local[*]")
sc = SparkContext.getOrCreate()

In [4]:
def rdd_to_dictionary(rdd):
    return sc.parallelize(rdd).collectAsMap()

In [5]:
def write_json(dictionary,filename):
    import json  

    with open(filename, "w") as outfile:  
        json.dump(dictionary, outfile) 
    

In [12]:
data_bag = spark.read.text(["data/4300-0.txt","data/pg19033.txt", "data/pg36.txt","data/pg514.txt","data/pg1497.txt","data/pg3207.txt","data/pg42671.txt","data/pg6130.txt"]).rdd.map(lambda r: r[0])

In [13]:
words_count = data_bag.flatMap(lambda x: x.split(' '))\
                .map(lambda x: x.lower()) \
                .map(lambda x: (x, 1))\
                .reduceByKey(add)\
                .filter(lambda x: x[1] > 1)\
                .filter(lambda x: len(x[0]) > 0)

In [14]:
top_40_words = words_count.takeOrdered(40, key = lambda x: -x[1])
top_40_words_dictionary = rdd_to_dictionary(top_40_words)

In [15]:
 print(top_40_words_dictionary)


{'the': 78844, 'and': 45168, 'of': 44739, 'to': 33436, 'a': 24234, 'in': 22126, 'that': 14818, 'he': 13019, 'is': 12918, 'his': 12270, 'i': 11044, 'with': 10296, 'for': 10036, 'as': 9639, 'be': 8834, 'was': 8787, 'not': 8141, 'it': 8123, 'but': 7856, 'by': 7701, 'or': 7407, 'her': 7403, 'they': 6735, 'which': 6517, 'you': 6354, 'on': 6214, 'from': 5811, 'at': 5695, 'are': 5590, 'she': 5458, 'all': 5437, 'their': 5285, 'have': 5146, 'had': 4647, 'this': 4090, 'my': 3841, 'so': 3710, 'we': 3629, 'no': 3620, 'if': 3571}


In [16]:
write_json(top_40_words_dictionary,"sp1.json")

In [17]:
stop_words_data_bag = spark.read.text("data/stopwords.txt").rdd.map(lambda r: r[0])
stop_words = stop_words_data_bag.flatMap(lambda x: x.split('\n'))\
                .map(lambda x: x.lower()).map(lambda x: (x, 1)).reduceByKey(add)
# print(stop_words.collect())

In [18]:
top_40_words_without_stop_words = words_count.subtractByKey(stop_words).takeOrdered(40, key = lambda x: -x[1])

# print(top_40_words_without_stop_words)

In [19]:
top_40_words_without_stop_words_dictionary = rdd_to_dictionary(top_40_words_without_stop_words)
print(top_40_words_without_stop_words_dictionary)

{'i': 11044, 'not': 8141, 'you': 6354, 'have': 5146, 'no': 3620, 'one': 3498, 'like': 2253, 'more': 2087, 'out': 2021, 'up': 1831, 'man': 1783, 'now': 1579, 'only': 1555, 'must': 1523, 'little': 1485, 'those': 1447, 'good': 1444, 'should': 1417, 'after': 1379, 'great': 1358, 'every': 1356, 'first': 1318, 'own': 1289, 'did': 1271, 'how': 1266, 'see': 1251, 'these': 1244, 'men': 1233, 'over': 1209, 'where': 1205, 'make': 1196, 'upon': 1188, 'nor': 1181, 'never': 1177, 'much': 1167, 'time': 1166, 'said,': 1163, 'two': 1142, 'old': 1140, 'made': 1128}


In [20]:
write_json(top_40_words_without_stop_words_dictionary,"sp2.json")

In [21]:
def trimPunctuation(word):
    if word[0] in punctuation:
        return word[1:]
    elif word[-1] in punctuation:
        return word[:-1]
    else:
        return word

In [22]:
punctuation = [".",",",":",";","'","!","?"];

In [23]:
words_without_stop_words = words_count.subtractByKey(stop_words)
# print(words_without_stop_words.collect())

In [24]:
words_without_punctuation = words_without_stop_words.map(lambda x: (trimPunctuation(x[0]), x[1]))
# print(words_without_punctuation.collect())

In [25]:
top_40_words_without_punctuation = words_without_punctuation.subtractByKey(stop_words).takeOrdered(40, key = lambda x: -x[1])
# print(top_40_words_without_punctuation)

In [26]:
top_40_words_without_punctuation_dictionary = rdd_to_dictionary(top_40_words_without_punctuation)
print(top_40_words_without_punctuation_dictionary)

{'i': 11044, 'not': 8141, 'you': 6354, 'have': 5146, 'no': 3620, 'one': 3498, 'like': 2253, 'more': 2087, 'out': 2021, 'up': 1831, 'man': 1783, 'now': 1579, 'only': 1555, 'must': 1523, 'little': 1485, 'those': 1447, 'good': 1444, 'should': 1417, 'after': 1379, 'great': 1358, 'every': 1356, 'first': 1318, 'own': 1289, 'did': 1271, 'how': 1266, 'see': 1251, 'these': 1244, 'men': 1233, 'over': 1209, 'where': 1205, 'make': 1196, 'upon': 1188, 'nor': 1181, 'never': 1177, 'much': 1167, 'time': 1166, 'two': 1142, 'old': 1140, 'made': 1128, 'most': 1114}


In [27]:
write_json(top_40_words_without_punctuation_dictionary,"sp3.json")

In [9]:
sc.stop()