In [1]:
import time
initial = time.time()

In [2]:
%config IPCompleter.greedy=True
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark import SparkContext

In [3]:
conf = SparkConf() \
    .setAppName('Word Count') \
    .setMaster("local[*]")
sc = SparkContext(conf = conf)

In [5]:
# Strings to be removed from text files
import string
remove = string.punctuation+string.digits+string.whitespace

##### Word Count for 124MB text file

In [6]:
initial_124MB = time.time()

In [7]:
rdd_124MB = sc.textFile('./input/leipzig124MB.txt')
num_lines_124MB = rdd_124MB.count()

In [8]:
# Normalize the text
words_124MB = rdd_124MB \
    .flatMap(lambda line: line \
        .translate(str.maketrans(remove, ''.ljust(len(remove)))) \
        .lower() \
        .split(" "))

In [9]:
# The filter function cleans '' values as a result of the normalization operation.
wordCounts_124MB = words_124MB \
    .filter(lambda word: len(word) != 0) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a,b: a + b)

In [10]:
wordCounts_124MB_list = wordCounts_124MB.collect()
unique_words_124MB = len(wordCounts_124MB_list)

In [11]:
sorted_key_value_pairs_124MB = sorted(wordCounts_124MB_list, key=lambda item: item[1], reverse=True)
sorted_first_10_key_value_pair_124MB = '\n'.join(map(str, sorted_key_value_pairs_124MB[:10]))

In [12]:
with open('./output_124MB.txt', 'w') as f:
    f.write(sorted_first_10_key_value_pair_124MB)
end_124MB = time.time()

##### Word Count for 12GB text file

In [13]:
initial_12GB = time.time()

In [14]:
rdd_12GB = sc.textFile('./input/leipzig12GB.txt').coalesce(16)
num_lines_12GB = rdd_12GB.count()

In [15]:
# Normalize the text
words_12GB = rdd_12GB \
    .flatMap(lambda line: line \
        .translate(str.maketrans(remove, ''.ljust(len(remove)))) \
        .lower() \
        .split(" "))

In [16]:
# The filter function cleans '' values as a result of the normalization operation.
wordCounts_12GB = words_12GB \
    .filter(lambda word: len(word) != 0) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a,b: a + b).coalesce(16)

In [17]:
# Decreasing partition numbers. Logical processors(8) * 2 = 16
wordCounts_12GB_list = wordCounts_12GB.coalesce(16).collect()
unique_words_12GB = len(wordCounts_12GB_list)

In [18]:
sorted_key_value_pairs_12GB = sorted(wordCounts_12GB_list, key=lambda item: item[1], reverse=True)
sorted_first_10_key_value_pair_12GB = '\n'.join(map(str, sorted_key_value_pairs_12GB[:10]))

In [19]:
with open('./output_12GB.txt', 'w') as f:
    f.write(sorted_first_10_key_value_pair_12GB)
end_12GB = time.time()

In [20]:
import datetime
print(f'Prints for 124 MB text file', end='\n--------------------------------------------------------------------\n')
print(f'Number of lines in leipzig124MB.txt file: {num_lines_124MB} lines.')
print(f'Number of occurrences of unique words in leipzig124MB.txt: {unique_words_124MB} unique words.', end='\n\n')
print(f'First 10 key-value pair of sorted result:\n{sorted_first_10_key_value_pair_124MB}', end='\n\n')
print(f'The processes for 124MB text file  took {datetime.timedelta(seconds=end_124MB - initial_124MB)}')

Prints for 124 MB text file
--------------------------------------------------------------------
Number of lines in leipzig124MB.txt file: 1000000 lines.
Number of occurrences of unique words in leipzig124MB.txt: 164860 unique words.

First 10 key-value pair of sorted result:
('the', 1372271)
('of', 597086)
('to', 568300)
('a', 513030)
('in', 475365)
('and', 449883)
('s', 264098)
('said', 215925)
('for', 215232)
('that', 206253)

The processes for 124MB text file  took 0:00:16.026593


In [21]:
print(f'Prints for 12 GB text file', end='\n--------------------------------------------------------------------\n')
print(f'Number of lines in leipzig12GB.txt file: {num_lines_12GB} lines.')
print(f'Number of occurrences of unique words in leipzig12GB.txt: {unique_words_12GB} unique words.', end='\n\n')
print(f'First 10 key-value pair of sorted result:\n {sorted_first_10_key_value_pair_12GB}', end='\n\n')
print(f'The processes for 12GB text file  took {datetime.timedelta(seconds=end_12GB - initial_12GB)}')

Prints for 12 GB text file
--------------------------------------------------------------------
Number of lines in leipzig12GB.txt file: 96000001 lines.
Number of occurrences of unique words in leipzig12GB.txt: 164861 unique words.

First 10 key-value pair of sorted result:
 ('the', 131738016)
('of', 57320256)
('to', 54556800)
('a', 49250880)
('in', 45635040)
('and', 43188768)
('s', 25353408)
('said', 20728800)
('for', 20662272)
('that', 19800288)

The processes for 12GB text file  took 0:14:30.145427


In [22]:
print(f"The whole process took {datetime.timedelta(seconds=end_12GB - initial)}")

The whole process took 0:14:49.801940
