# Word Count MapReduce Program in Pyspark

Objective:
Use Pyspark to read an input file and count frequency of unique keywords.

In [1]:
spark

### Create spark context

In [2]:
sc = spark.sparkContext

### Read the input file

In [3]:
lines = sc.textFile("Filepath\\Text1.txt")
lines.take(5)

['The Project Gutenberg EBook of Pride and Prejudice, by Jane Austen',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included']

### Get all the words from one line

In [4]:
words= lines.flatMap(lambda x:x.split(" "))
words.take(5)

['The', 'Project', 'Gutenberg', 'EBook', 'of']

###  Define function cleandata for following:

1. Convert all words to lowercase

2. Remove all double quotes 

3. Special characters{",", ".", ";", "?"} to be ignored and removed 
   from the END of the words such as 

In [5]:
def cleandata(x):
    x= x.strip().lower().replace('"','')
    char_list = [",", ".", ";", "?"]
    if  len(x) > 0 and x[-1] in char_list:
        x = x[:-1]
    return x


### Apply function cleandata on words rdd and return clean_words rdd after cleaning

In [6]:
clean_words = words.map(lambda x: cleandata(x))
clean_words.take(5)

['the', 'project', 'gutenberg', 'ebook', 'of']

### Create final_words rdd to ignore any word of length less than 3 from clean_words rdd

In [7]:
final_words = clean_words.filter(lambda x: len(x)>=3)
final_words.take(5)

['the', 'project', 'gutenberg', 'ebook', 'pride']

### Generate key value pairs with each word as key and 1 as value

In [8]:
pairs =final_words.map(lambda x: (x,1))
pairs.take(5)

[('the', 1), ('project', 1), ('gutenberg', 1), ('ebook', 1), ('pride', 1)]

### Create freq rdd from pairs rdd with reducebykey method to get key(word), value(frequency) pair

In [9]:
freq =pairs.reduceByKey(lambda x, y:x+y)
freq.take(5)

[('project', 83),
 ('gutenberg', 24),
 ('ebook', 10),
 ('pride', 45),
 ('jane', 252)]

### Create a list of 10 given words& their frequency using lookup

In [10]:
wordlist =["advantage", "book","mistake", "dancing", "gutenberg", "astonishment" ,"hill", "yesterday", "the", "fox"]
lst = []
for i in wordlist:
    lst.append((i, freq.lookup(i)))      
lst

[('advantage', [33]),
 ('book', [14]),
 ('mistake', [6]),
 ('dancing', [20]),
 ('gutenberg', [24]),
 ('astonishment', [30]),
 ('hill', [9]),
 ('yesterday', [12]),
 ('the', [4480]),
 ('fox', [])]

In [11]:
### Create word_filtered rdd to put back list of words & their frequency to a rdd

In [11]:
word_filtered_rdd = sc.parallelize(lst,1)
word_filtered_rdd.collect()

[('advantage', [33]),
 ('book', [14]),
 ('mistake', [6]),
 ('dancing', [20]),
 ('gutenberg', [24]),
 ('astonishment', [30]),
 ('hill', [9]),
 ('yesterday', [12]),
 ('the', [4480]),
 ('fox', [])]

### Define a function println() to print both the word & its fequency in the required format

In [12]:
def println(x):
    if len(x[1]) == 0:
        final_str = str(x[0]) + " " + "0"
    else:
        final_str = str(x[0]) + " " + "".join(str(x[1][0]))
    return final_str

### Print the words

In [13]:
final_rdd = word_filtered_rdd.map(lambda x: println(x))
final_rdd.collect()

['advantage 33',
 'book 14',
 'mistake 6',
 'dancing 20',
 'gutenberg 24',
 'astonishment 30',
 'hill 9',
 'yesterday 12',
 'the 4480',
 'fox 0']

## Save the output

In [None]:
final_rdd.saveAsTextFile("File path\FileName.txt")