# Part 1 - Word counting
---------------

For the first part of the practical assignment, we will use MapReduce paradigms to implement a word counting program in Apache Spark. First, some setup and data preparation. For further information on the software, check out the documentation [here](https://spark.apache.org/docs/latest/api/python/pyspark.html).

To run the code in a cell, select it and press Ctrl+Enter, or use the `Run` button at the top of the notebook.

In [1]:
import pyspark

sc = pyspark.SparkContext("local[*]", "PUC Big Data workshop")

We load the text file containing Shakespeare's famous Romeo & Juliet using a convenient PySpark function. This will automatically split the text file into separate lines.

In [2]:
lines = sc.textFile("shakespeare.txt")

To give you an idea of the contents of this file, we will print a few random lines. The `takeSample` operation can be very useful to examine the data you are working with; particularly when working with a large amount of data, it is unfeasible to look at it all, so looking at a random small subset can give you a rough idea of what you are working with. Feel free to sample the results of the individual cells below to see what is happening!

In [14]:
lines.takeSample(withReplacement=False, num=10)

['  Who, but for dreaming on this fond exploit,',
 '',
 "  And cherish factions. 'Tis inferr'd to us  ",
 "SECOND MURDERER. 'Tis better, sir, than to be tedious. Let",
 "  Back'd by the power of Warwick, that false peer,",
 "  him today, I can tell them that. And there's Troilus will not",
 '  A Roman sworder and banditto slave',
 'RIANA. Say, how grows it due?',
 '  You did mistake.',
 'If by strong hand you offer to break in']

Since some lines in the file are empty, we first filter those out; they do not contain words, so we don't need them! The `filter` operation will give us all lines with a length larger than 0.

In [3]:
non_empty_lines = lines.filter(lambda line: len(line) > 0)

We will extensively use `lambda` functions when working with Spark, since they allow for a nice and short notation of simple operations. Usually, the function will take one argument and perform some action on that argument. For example, the function we just used takes a single line, and checks whether its length is larger than 0 (e.g. it is not empty).

Now that we have a list of non-empty sentences, we will split these sentences into single words. Since this is just an exercise, we will simply split the sentences on space characters; this will not give a perfect split, but it is good enough to use for the rest of the program. `flatMap` will make sure that we don't end up with nested lists and instead just give us one long list of words.

In [4]:
words = non_empty_lines.flatMap(lambda line: line.strip().split())

The `strip` function removes leading and trailing spaces from the line, and the `split` function splits the line on spaces.

Let's look at some of the words we ended up with!

In [5]:
words.takeSample(withReplacement=False, num=10)

['thou',
 "'Item.",
 'lay',
 'Cold',
 'her',
 'should',
 'from',
 'OFFICERS',
 'than',
 'deeps;']

We loaded the text of Shakespeare's famous Romeo & Juliet, removed empty lines and split the remaining lines on spaces using the `flatMap` function. Displayed above are 25 random words sampled from the split text. As you can see, splitting the sentences on spaces does not result in a perfect separation of words but it will do for our purposes.

------------------------

Now, let's implement a simple word count! First, we will use the `map` operation to transform each word into a (word, 1) tuple as per the slides.

In [6]:
# Replace the dots with the right code:
annotated_words = words.map(lambda word: ...)       

After annotating each word with the number 1, we can then perform the shuffle step by using the `groupByKey` operation. This will group all (word, 1) tuples with identical keys (words) to the same worker node, and transform the (word, 1) tuples into (word, [1, 1, ...]) tuples, with all the 1's for a single word grouped into a list.

In [9]:
grouped_words = annotated_words.groupByKey()

To obtain the total word counts, we simply need to add up all the 1's for each word. We can do that by using the `mapValues` operation. This will pass all the 1's in each (word, [counts]) tuple into the sum function, so we end up with (word, totalCount) tuples.

In [10]:
# Replace the dots with the right code!
# Hint: look at the builtin Python sum function:
# https://docs.python.org/3/library/functions.html#sum
word_counts = grouped_words.mapValues(...)

Now, in order to make the next steps easier, we first swap the positions of the words and their counts in the tuples so we end up with (count, word) instead of (word, count). This makes the count the key of the item, and will allow us to sort by key to see which words are most common. We need to do this because Spark does not allow us to sort by values, only by keys.

To access the individual values in a tuple, you can use the bracket syntax:
    
```python
>>> x = ("hello", 1)
>>> x[0]
"hello"
>>> x[1]
1
>>> (x[1], x[0])
(1, "hello")
```

In [11]:
# Replace the dots with the right code:
word_counts_reordered = word_counts.map(lambda x: ...)

Now we can sort the tuples by their counts in descending order, putting the most frequent words at the beginning of the list.

In [12]:
sorted_word_counts = word_counts_reordered.sortByKey(ascending=False)

That's it! Let's look at the top 10 most frequent words in Romeo & Juliet.

In [13]:
for (count, word) in sorted_word_counts.take(10):
    # print the word following by its count
    print(f"{word:<4}: {count:<6} occurrences")

the : 23178  occurrences
I   : 19540  occurrences
and : 18218  occurrences
to  : 15592  occurrences
of  : 15503  occurrences
a   : 12513  occurrences
my  : 10823  occurrences
in  : 9564   occurrences
you : 9058   occurrences
is  : 7829   occurrences


### Well done!
That's it for the first part of this assignment. After the next part of the lecture, we can continue with part 2!

# Part 2 - Averaging a list of numbers
---
In this part of the assignment, we will tackle a slightly more complicated problem; averaging a list of numbers. As in the first part, we will first create the data we need to get started. We will use a list of ten thousand random integers between 0 and one million. For reference, we calculate the average first using `numpy` functionality and then try to replicate it using Spark!

In [26]:
import numpy

numbers = numpy.random.randint(0, 1_000_000, size=10_000)
print(numbers.mean())

# Parallelize the list to use it with Spark after this cell
numbers = sc.parallelize(numbers)

499656.4287


Let's first try the same strategy we used for word counting. Transform the list of numbers into (number, 1) tuples using the `map` function. Then use the `groupByKey` function to collect all 1's for each number.

In [27]:
annotated_numbers = numbers.map(lambda x: (x, 1))
grouped_numbers = annotated_numbers.groupByKey()

Just like in part 1, we now have (number, [1, 1, ...]) tuples. Now to calculate the average of all the numbers, we can sum the 1's per number to get its occurrences. Then we divide each number by its number of occurrences, and sum all those results in order to get our answer.

In [30]:
# sum all the 1's
number_counts = grouped_numbers.mapValues(sum)
# divide each number by its count
single_averages = number_counts.map(lambda x: x[0] / x[1])
# sum all local averages
average = single_averages.sum() / single_averages.count()

print(average)

498961.2909601686
