**Average word length application**

### Write a MapReduce program using PySpark that reads any text inputs and computes the average length of all words that start with each character.
#### For any text input, the job should report the average length of words that starts with "a", "b" and so forth. 

For example, given the text input that include two sentences:
"No now is definitely not the time to call them"

"This new device is helping American cut their power bills in half"

Output would be:

[('a', 8.0),
 ('b', 5.0),
 ('c', 3.5),
 ('d', 8.0),
 ('h', 5.5),
 ('i', 2.0),
 ('n', 2.75),
 ('p', 5.0),
 ('t', 3.67)]


### ** Part 1: Creating a base RDD and pair RDDs **

#### Create a base RDD with `parallelize` and using pair RDDs to count words.

#### ** Create a base RDD **
#### We'll start by generating a base RDD by using a Python list and the `sc.parallelize` method.  Then we'll print out the type of the base RDD.

In [1]:
wordsList = ['cat', 'elephant', 'rat', 'Rout', 'Catch']
# set number of partitions to be 4.
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print(type(wordsRDD))
print(wordsRDD.collect())

<class 'pyspark.rdd.RDD'>
['cat', 'elephant', 'rat', 'Rout', 'Catch']


#### ** Lowercase and test **
#### We apply a map() transformation to lowercase each string in wordsRDD we just created. 

In [3]:
lowerWordsRDD = wordsRDD.map(lambda x : x.lower())
lowerWordsRDD.collect()

['cat', 'elephant', 'rat', 'rout', 'catch']

#### ** Length of each word **
#### Now use `map()` and a `lambda` function to return the number of characters in each word.  We'll `collect` this result directly into a variable wordLengths.

In [5]:
wordLengths = (lowerWordsRDD
                 .map(lambda x: len(x))
                 .collect())
print(wordLengths)

[3, 8, 3, 4, 5]


#### **First character of each word **
#### Now use `map()` and a `lambda` function to return the first characters of the words.  We'll `collect` this result directly into a variable firstChar. Given a string s, you can use s[0] to obtain the first chacter of the string.

In [7]:
firstChars = (lowerWordsRDD
                 .map(lambda s: s[0])
                 .collect())
print(firstChars)

['c', 'e', 'r', 'r', 'c']


#### **charPairRDDs **
#### The next step, we want to combine previous steps. We create a new type of RDD, called charPairRDD. A charPairRDD is an RDD where each element is a pair tuple `(k, v)` where `k` is the key and `v` is the value. In this example, we will create a pair consisting of `('<first character>', <word length>)` for each word in the lowerWordsRDD.
#### We can create the charPairsRDD using the `map()` transformation with a `lambda()` function to create a new RDD.

In [9]:
charPairRDD = lowerWordsRDD.map(lambda x: (x[0], len(x)))
print(charPairRDD.collect())

[('c', 3), ('e', 8), ('r', 3), ('r', 4), ('c', 5)]


### ** Part 2: Computing average word lengths with charPairRDDs **

#### Now, let's compute the average word length giving a particular character in the RDD. There are multiple ways to perform this computation, but some are much less efficient than others.

#### Computing avg word length using `groupByKey`, which is easier to understand but not as efficient as using reduceByKey

In [11]:
charsGrouped = charPairRDD.groupByKey()
for key, value in charsGrouped.collect():
    print('{0}: {1}'.format(key, list(value)))

c: [3, 5]
r: [3, 4]
e: [8]


#### (2a.1) Compute sum of word lengths
#### For example, given the key/value pair ('c', [3,5]), where [3,5] represents the wordlengths of the two words (i.e, cat and catch) that starts with 'c', write code to output ('c', 8), where 8 is the sum of [3,5].

In [12]:
wordLengthRDD = charsGrouped.map(lambda x: (x[0], sum(x[1])))
print(wordLengthRDD.collect())

[('c', 8), ('r', 7), ('e', 8)]


#### Counting by value
####  Since we want to compute average word lengths, we need to count number of words that starts with a given charater. So for the key/value pair ('c', [3,5]), write code to output ('c', 2), where 2 is the count of items in the list [3,5]

In [14]:
wordCountRDD= charsGrouped.map(lambda x: (x[0], len(x[1])))
wordCountRDD.collect()

[('c', 2), ('r', 2), ('e', 1)]

#### After knowing how to compute word counts and word lengths, we can now write code to compute average word count. Apply the map or mapValues function with a lambda function to charsGrouped and obtain an RDD avgWordLengths that contains a list of key/value pairs. Each key represents a character, and the value the average word length of words that start with the character.

In [16]:
avgWordLengths = charsGrouped.mapValues(lambda x: sum(x)/len(x))
avgWordLengths.collect()

[('c', 4.0), ('r', 3.5), ('e', 8.0)]

#### (2b) A better approach is to start from the pair RDD and then use the [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) transformation to create a charPairRDD. The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.

In [18]:
morecharPairRDD = charPairRDD.mapValues(lambda x: (x,1))
morecharPairRDD.collect()

[('c', (3, 1)), ('e', (8, 1)), ('r', (3, 1)), ('r', (4, 1)), ('c', (5, 1))]

#### We will create sumCountPairRDD. Each pair in the RDD consists of  `('<first character>', (<sum of word lengths>, <word count>))` - the value of the pair is a tuple.

In [19]:
sumCountPairRDD = morecharPairRDD.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
sumCountPairRDD.collect()

[('c', (8, 2)), ('r', (7, 2)), ('e', (8, 1))]

#### We will create an RDD called avgWordLengths. Each pair in the RDD consists of `('<first character>', <avg word length>)`

In [20]:
avgWordLengths = sumCountPairRDD.mapValues(lambda x: x[0] / x[1])
avgWordLengths.collect()

[('c', 4.0), ('r', 3.5), ('e', 8.0)]

####  All together 
#### The code below performs a sequence of transformations to wordsRDD that contains a list of words to output average word lengths using reduceByKey. These transformations are included in just one statement.

In [21]:
avgWordLengthsCollected = (wordsRDD
                           .map(lambda x: x.lower())
                           .map(lambda x: (x[0], len(x)))
                           .mapValues(lambda x: (x,1))
                           .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
                           .mapValues(lambda x: x[0] / x[1])
                           .mapValues(lambda x: round(x, 2))
                           .sortByKey()
                           .collect())
print(avgWordLengthsCollected)

[('c', 4.0), ('e', 8.0), ('r', 3.5)]


### ** Apply avg word lengths to a file **

#### In this section we will finish developing our average word length application.  We'll have to build the `avgWordLength` function that can be used to deal with real world datasets.

#### ** `avgWordLength` function **
#### First, define a function for average word length.  This function should take in an RDD that is a list of words like `wordsRDD` and return a RDD that includes key/value pairs. Each key represent a lowercase character and each value represent average length of words that start with the charactors. The values should be rounded to two decimal places. The key value pairs should be sorted by key.

In [23]:
def avgWordLength(wordListRDD):
    """
    Args:
        wordListRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (character, avgwordlength) tuples.
    """
    resultRDD = (wordListRDD
                 .map(lambda x: x.lower())
                 .map(lambda x: (x[0], len(x)))
                 .mapValues(lambda x: (x,1))
                 .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
                 .mapValues(lambda x: x[0] / x[1])
                 .mapValues(lambda x: round(x, 2))
                 .sortByKey()
                )
    return resultRDD
                 
print(avgWordLength(wordsRDD).collect())

[('c', 4.0), ('e', 8.0), ('r', 3.5)]


#### ** Remove punctuation ** 

In [25]:
def removePunctuation(text):
    """
    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    import re
    import string
    text= text.translate(str.maketrans('', '', string.punctuation))
    return re.sub(r'[^\w\s]','',text).strip()
print(removePunctuation('Hi, you!'))
print(removePunctuation(' No under_score!'))

Hi you
No underscore


#### ** Load a text file **
#### For the next part of this lab, we will use the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). To convert a text file into an RDD, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation.  Since the file is large we use `take(15)`, so that we only print 15 lines.

In [27]:
shakespeareRDD = (sc
                  .textFile("shakespeare.txt", 8)
                  .map(removePunctuation))
print('\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda x: '{0}: {1}'.format(x[1], x[0]))  # to 'lineNum: line'
                .take(15))
     )

0: 
1: Project Gutenbergs The Complete Works of William Shakespeare by William
2: Shakespeare
3: 
4: This eBook is for the use of anyone anywhere in the United States and
5: most other parts of the world at no cost and with almost no restrictions
6: whatsoever  You may copy it give it away or reuse it under the terms
7: of the Project Gutenberg License included with this eBook or online at
8: wwwgutenbergorg  If you are not located in the United States youll
9: have to check the laws of the country where you are located before using
10: this ebook
11: 
12: See at the end of this file  CONTENT NOTE added in 2017
13: 
14: 


#### (3d) Apply a transformation that will split each element (i.e., sentence) of the shakespeareRDD by its spaces. For each element of the RDD, apply Python's string [split()](https://docs.python.org/2/library/string.html#string.split) function. Use flatMap(). If we do a split of an empty line, we are going to get an empty list. flatMap() will automatically remove these emtpy lists. As a result, the RDD you obtained from the funtion flatMap() does not contain any empty elements 

In [28]:
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split())
print(shakespeareWordsRDD.top(5))

['zwounds', 'zwounds', 'zwaggered', 'zone', 'zodiacs']


#### ** Compute the average word lengths **
#### We now have an RDD that is only words.  Next, let's apply the `avgWordLength()` function to produce a list of key/value pairs with lowercase characters as the keys and average word lengths as the values. We can view the top 15 words by using the `takeOrdered()` action; however, since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair.

In [30]:
avgWordLengthRDD = avgWordLength(shakespeareWordsRDD)
print(avgWordLengthRDD.collect())

[('1', 2.58), ('2', 1.63), ('3', 1.58), ('4', 1.74), ('5', 2.26), ('6', 2.42), ('7', 2.56), ('8', 2.6), ('9', 2.62), ('a', 3.28), ('b', 4.49), ('c', 6.25), ('d', 4.99), ('e', 5.51), ('f', 4.92), ('g', 5.21), ('h', 4.0), ('i', 2.21), ('j', 5.26), ('k', 4.73), ('l', 4.72), ('m', 3.97), ('n', 3.81), ('o', 2.86), ('p', 6.28), ('q', 5.9), ('r', 6.03), ('s', 4.98), ('t', 3.83), ('u', 4.6), ('v', 5.89), ('w', 4.43), ('x', 2.5), ('y', 3.53), ('z', 5.17)]


In [32]:
#Take top 15 characters with the largest average word lengths

top15CharaterWithAvgWordLengths = avgWordLengthRDD.takeOrdered(15, key = lambda x: -x[1])
print ('\n'.join(map(lambda w_c: '{0}: {1}'.format(w_c[0], w_c[1]), top15CharaterWithAvgWordLengths)))

p: 6.28
c: 6.25
r: 6.03
q: 5.9
v: 5.89
e: 5.51
j: 5.26
g: 5.21
z: 5.17
d: 4.99
s: 4.98
f: 4.92
k: 4.73
l: 4.72
u: 4.6
