In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext(appName = "wordcount")



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/06 22:02:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/06 22:02:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/10/06 22:02:25 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
spark = SparkSession.Builder().getOrCreate()

# 1 and 2: load the files into RDDs, Remove empty lines, Remove punctuations

In [4]:

text_file = sc.textFile("nbs/shakespeare.txt") \
            .map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower()) \


text_file.count()

122683

In [5]:
remove_empty_lines = text_file.filter(lambda row:row!='')
remove_empty_lines.count()


113064

In [6]:
def clean_str(x):
    punc = '!"#$%&\'()*+,./:;<=>?@[\\]_`{|}~-'
    for ch in punc:
        x = x.replace(ch, '')
    return x

In [7]:
remove_punctuation = remove_empty_lines.map(clean_str)
# remove_punctuation.count()

In [26]:
verbs = sc.textFile("nbs/all_verbs.txt")\
            .map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower()) \

print(type(verbs))
print(verbs.take(10))
verbs.count()

verbs_ll = []

for element in verbs.collect():
    if element not in verbs_ll:
        verbs_ll.append(element)

print(len())

<class 'pyspark.rdd.PipelinedRDD'>
['abash', 'abashed', 'abashed', 'abashes', 'abashing', 'abate', 'abated', 'abated', 'abates', 'abating']
4029


In [16]:
verb_dict = sc.textFile("nbs/verb_dict.txt")\
            .map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower()) \


verb_dict.count()


1003

# 3. find out used verbs in the collection (shakespeare.txt) 

In [28]:
mymap = remove_punctuation.flatMap(lambda x: x.split(" "))
# print(type(mymap))
# mymap.take(5)
# for element in mymap.collect():
#     if element in verbs.collect():
#         print(element)

match_verbs = mymap.filter(lambda row:row in verbs_ll)

print(match_verbs.take(10))
match_verbs.count()

['desire', 'increase', 'rose', 'die', 'bear', 'contracted', 'own', 'eyes', 'lights', 'making']


                                                                                

148491


# 4. occurrences of all the verbs

In [88]:
counts = match_verbs.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

print(counts.count())
counts.take(20)




2534


                                                                                

[('increase', 28),
 ('rose', 68),
 ('die', 475),
 ('bear', 543),
 ('own', 771),
 ('eyes', 690),
 ('making', 83),
 ('dig', 11),
 ('treasure', 52),
 ('say', 1685),
 ('eating', 14),
 ('praise', 182),
 ('use', 326),
 ('count', 111),
 ('make', 1632),
 ('proving', 1),
 ('look', 828),
 ('tell', 1069),
 ('is', 9137),
 ('form', 108)]

In [89]:
# verbslist = verb_dict.map(lambda x: x.split(" ")).collect()

# print('\n')
# print(len(verbslist))
# print(verbslist[0:10])
verbslist = verb_dict.map(lambda x: x.split(" "))

verbslist.take(10)

[['abash', 'abash', 'abashed', 'abashed', 'abashes', 'abashing'],
 ['abate', 'abate', 'abated', 'abated', 'abates', 'abating'],
 ['abide', 'abide', 'abode', 'abode', 'abides', 'abiding'],
 ['absorb', 'absorb', 'absorbed', 'absorbed', 'absorbs', 'absorbing'],
 ['accept', 'accept', 'accepted', 'accepted', 'accepts', 'accepting'],
 ['accompany',
  'accompany',
  'accompanied',
  'accompanied',
  'accompanies',
  'accompanying'],
 ['ache', 'ache', 'ached', 'ached', 'aches', 'aching'],
 ['achieve', 'achieve', 'achieved', 'achieved', 'achieves', 'achieving'],
 ['acquire', 'acquire', 'acquired', 'acquired', 'acquires', 'acquiring'],
 ['act', 'act', 'acted', 'acted', 'acts', 'acting']]

In [90]:
k_v = {}
# for elements in verbslist.take(5):
for elements in verbslist.collect():
    for e in elements:
        if e not in k_v:
            k_v[e] = elements[0]

# print(k_v)

def combine_verb(x):
#     print('x=', x[0])
    if x[0] in k_v:
        return k_v[x[0]], x[1]
                   
counts_combine = counts.map(combine_verb)

print(counts_combine.count() )
counts_combine.take(20)


2534


[('increase', 28),
 ('rise', 68),
 ('die', 475),
 ('bear', 543),
 ('own', 771),
 ('eye', 690),
 ('make', 83),
 ('dig', 11),
 ('treasure', 52),
 ('say', 1685),
 ('eat', 14),
 ('praise', 182),
 ('use', 326),
 ('count', 111),
 ('make', 1632),
 ('prove', 1),
 ('look', 828),
 ('tell', 1069),
 ('be', 9137),
 ('form', 108)]

In [93]:
counts_combine.reduceByKey(lambda a, b: a + b)
print(counts_combine.take(10))

[('increase', 28), ('rise', 68), ('die', 475), ('bear', 543), ('own', 771), ('eye', 690), ('make', 83), ('dig', 11), ('treasure', 52), ('say', 1685)]


In [94]:
sorted_counts = counts_combine.sortBy(lambda wordCounts: wordCounts[1], ascending=False)
# the #24 most used word in Shakespeares writings
# the first one is not a word 
i = 0
for word, count in sorted_counts.collect()[0:10]:
    print("{} : {} : {} ".format(i, word, count))
    i += 1

0 : be : 9137 
1 : be : 6867 
2 : have : 5885 
3 : do : 3753 
4 : be : 3405 
5 : come : 2519 
6 : enter : 2350 
7 : be : 2230 
8 : be : 2168 
9 : love : 2109 
