In [6]:
# Importing necessary libraries
import time
from pyspark import SparkConf, SparkContext

In [7]:
# Record the start time before the execution of the main code begins
start_time = time.time()

In [9]:
# Setting up Spark configuration
conf=SparkConf().setMaster("local[*]").setAppName("Word counting application with MapReduce")
sc = SparkContext.getOrCreate(conf=conf)

In [10]:
# Reading the input file into an RDD (Resilient Distributed Dataset)
rdd = sc.textFile("Data.txt")  # File Size 124 MB

In [11]:
# Normalization Process
def clean_data(x):
    # Convert text to lowercase and remove non-alphabetic characters
    cleaned_text = ''.join(char.lower() for char in x if char.isalpha() or char.isspace())
     # Split text into lines, remove empty lines, and rejoin them
    lines = cleaned_text.split('\n')
    non_empty_lines = [line for line in lines if line.strip() != '']
    cleaned_text = '\n'.join(non_empty_lines)
    return cleaned_text

In [12]:
# Applying the clean_data function to each element in the RDD
cleaned_rdd = rdd.map(clean_data)

# Counting the number of elements in the RDD
cleaned_rdd.count()

1000000

In [13]:
# Taking the first 10 elements of the RDD
cleaned_rdd.take(10)

['a rebel statement sent to lisbon from jamba said  government soldiers and  guerrillas were killed in the fighting that ended jan  it said the rebel forces sill held mavinga',
 'authorities last week issued a vacate order for a club in manhattan and closed another in the bronx',
 'at the first pan am bankruptcy hearing for example at least five airlines were represented',
 'mr neigum pokerfaced during the difficult task manages a second showing',
 'this combined with the container division talks suggests the groups bankers might be considering an orderly disposal of all assets',
 'she told the post in an interview published sunday that some of the money may have become mingled into improvements on her home that included a swimming pool a  widescreen television and renovations to her basement',
 'according to a study by the marshall institute the average nasa employees age in  was  now most of its senior and middlemanagers will be eligible to retire in five years',
 'preston tisch  is 

In [14]:
# Splitting each line into words
words=cleaned_rdd.flatMap(lambda line: line.split(" "))
# Taking the first 10 words
words.take(10)

['a',
 'rebel',
 'statement',
 'sent',
 'to',
 'lisbon',
 'from',
 'jamba',
 'said',
 '']

In [15]:
# Splitting each line into a list of words
rowlist_words=cleaned_rdd.map(lambda line: line.split(" "))
# Taking the first 10 lists of words
rowlist_words.take(10)

[['a',
  'rebel',
  'statement',
  'sent',
  'to',
  'lisbon',
  'from',
  'jamba',
  'said',
  '',
  'government',
  'soldiers',
  'and',
  '',
  'guerrillas',
  'were',
  'killed',
  'in',
  'the',
  'fighting',
  'that',
  'ended',
  'jan',
  '',
  'it',
  'said',
  'the',
  'rebel',
  'forces',
  'sill',
  'held',
  'mavinga'],
 ['authorities',
  'last',
  'week',
  'issued',
  'a',
  'vacate',
  'order',
  'for',
  'a',
  'club',
  'in',
  'manhattan',
  'and',
  'closed',
  'another',
  'in',
  'the',
  'bronx'],
 ['at',
  'the',
  'first',
  'pan',
  'am',
  'bankruptcy',
  'hearing',
  'for',
  'example',
  'at',
  'least',
  'five',
  'airlines',
  'were',
  'represented'],
 ['mr',
  'neigum',
  'pokerfaced',
  'during',
  'the',
  'difficult',
  'task',
  'manages',
  'a',
  'second',
  'showing'],
 ['this',
  'combined',
  'with',
  'the',
  'container',
  'division',
  'talks',
  'suggests',
  'the',
  'groups',
  'bankers',
  'might',
  'be',
  'considering',
  'an',
  'or

In [16]:
# Mapping each word to a tuple (word, 1) for counting
uniqueWords = words.map(lambda word : (word, 1))
# Taking the first 10 unique words
uniqueWords.take(10)

[('a', 1),
 ('rebel', 1),
 ('statement', 1),
 ('sent', 1),
 ('to', 1),
 ('lisbon', 1),
 ('from', 1),
 ('jamba', 1),
 ('said', 1),
 ('', 1)]

In [17]:
# Reducing by key to count the occurrences of each word
wordCounts = uniqueWords.reduceByKey(lambda x, y : x + y)
# Taking the first 10 word counts
wordCounts.take(10)

[('statement', 4975),
 ('sent', 3645),
 ('jamba', 7),
 ('', 472906),
 ('soldiers', 3413),
 ('guerrillas', 1950),
 ('killed', 6687),
 ('in', 473567),
 ('authorities', 5082),
 ('last', 36100)]

In [18]:
# Counting the total number of unique words
wordCounts.count() 

218218

In [19]:
# Sorting the word counts by the count value in descending order
sortedWordCounts = sorted(wordCounts.collect(), key=lambda x: x[1], reverse=True)
# Creating an RDD from the sorted word counts
sortedWordCountsRDD = sc.parallelize(sortedWordCounts)
# Taking the first 10 items from the sorted word counts RDD
sortedWordCountsRDD.take(10)

[('the', 1370263),
 ('of', 595835),
 ('to', 566298),
 ('a', 507768),
 ('in', 473567),
 ('', 472906),
 ('and', 448469),
 ('said', 215897),
 ('for', 214759),
 ('that', 202017)]

In [20]:
# These items are stored in the 'first_10_items' variable as a list of tuples.
first_10_items = sortedWordCountsRDD.take(10)

In [21]:
# Saving the first 10 items as a text file in the output directory
output_dir = "output_data/"

import shutil
shutil.rmtree(output_dir, ignore_errors=True)

sc.parallelize(first_10_items).saveAsTextFile(output_dir)

In [22]:
end_time = time.time()
# Calculating the execution time by subtracting the start time from the end time
execution_time = end_time - start_time
# Printing the program execution time in seconds
print(f"Program execution time: {execution_time} seconds")

Program execution time: 49.97883415222168 seconds
