In [1]:
import re
from pyspark.sql import SparkSession

In [2]:
# Create PySpark Session
spark = SparkSession.builder.appName("WordCount").getOrCreate()
# Read input file and store it as a Resilient Distributed Dataset
text = spark.sparkContext.textFile("./input.txt")
# Print first 10 lines
print(text.take(10))

['The Project Gutenberg EBook of The Complete Works of William Shakespeare, by', 'William Shakespeare', '', 'This eBook is for the use of anyone anywhere at no cost and with', 'almost no restrictions whatsoever.  You may copy it, give it away or', 're-use it under the terms of the Project Gutenberg License included', 'with this eBook or online at www.gutenberg.org', '', '** This is a COPYRIGHTED Project Gutenberg eBook, Details Below **', '**     Please follow the copyright guidelines in this file.     **']


In [3]:
def splitter(line):
    # Remove any non-words
    line = re.sub(r"^\W+|\W+$", "", line)
    # Split lines into separate words and convert all words to lowercase
    return map(str.lower, re.split(r"\W+", line))

# Create an RDD of each word in the input file
words = text.flatMap(splitter)
print(words.take(10))

['the', 'project', 'gutenberg', 'ebook', 'of', 'the', 'complete', 'works', 'of', 'william']


In [4]:
# Map Step

# Assign a count of 1 to each word
mapped_words = words.map(lambda x: (x,1))
print(mapped_words.take(10))

[('the', 1), ('project', 1), ('gutenberg', 1), ('ebook', 1), ('of', 1), ('the', 1), ('complete', 1), ('works', 1), ('of', 1), ('william', 1)]


In [5]:
# Reduce Step
from operator import add

# Aggregate the total count of each word
counts = mapped_words.reduceByKey(add)
print(counts.take(10))

[('project', 331), ('gutenberg', 326), ('ebook', 17), ('of', 18307), ('shakespeare', 272), ('', 9682), ('this', 6900), ('is', 9808), ('use', 562), ('anyone', 7)]


In [6]:
# Store word_counts as a dictionary with word as key and count as value
word_counts = {item[0] : item[1] for item in counts.collect()}

In [7]:
# Save result as JSON
import json
with open("./output/word_count.json", "w") as outfile:
    json.dump(word_counts, outfile, indent = 4)