# Activity 6.1 Map-Reduce Basics
## Map-Reduce in Python
Since Python has `map` and `reduce` built-in functions, we use it to demonstrate the concepts behind Map-Reduce. Then, we move forward to do some basic experiments using actual `map` and `reduce` functions in Spark.

**Note:** All parts of this activty are designed to run in Python 2. For Python 3 you may need to modify some parts of the code.

### Mapping Example (Square of a Vector)
Suppose we have a vector (list) of numbers and we want to calculate the squares of each number of that vector. Let’s do it with a `for` loop first, and then without a for loop and only using a simple `map` function that does the job for us. 

In [1]:
# define square function:
def sqr(x): return x ** 2 

# the traditional way using loop:
numbers = [1, 2, 3, 4]
squared = []

print('numbers: ' + str(numbers)) # print thr input numbers

for n in numbers: # loop
    squared.append(sqr(n)) # save the results
    print (str(n) + " X " + str(n) + " = " + str(squared[-1])) # print n and the last element of squared
    
print('squared: ' + str(squared))

numbers: [1, 2, 3, 4]
1 X 1 = 1
2 X 2 = 4
3 X 3 = 9
4 X 4 = 16
squared: [1, 4, 9, 16]


In [2]:
# then map the function onto the list
map(sqr, numbers) # note the different syntax/usage for a single input: e.g. sqr(6)

[1, 4, 9, 16]

Easy! We observe that `map` applies `sqr` function on every elements of the input list (i.e., `numbers`).

Because map expects a function to be passed in, it also happens to be one of the places where `lambda` routinely appears. In fact, `lambda` (aka anonymous function) helps us to define functions on the fly without the need to be bounded to a name. This feature is very helpful when we do not need to call a function in different part of a code script (more reading: http://www.secnetix.de/olli/Python/lambda_functions.hawk).

In [3]:
map((lambda x: x **2), numbers) # mapping with a lmbda function

[1, 4, 9, 16]

### Reducing Example (Summation)
Recall from Example 1 that `map` gets a list, maps a function on all of the elements of the list and returns the result which is another list. In opposite, `reduce` gets a list, apply a function of its elements, but returns the result as a single number (not a list). For example, we can use `reduce` to compute sum of the elements of a list. Let’s see `reduce` in action: 

In [4]:
numbers = range(1,5)
reduce((lambda x, y: x + y), numbers) # reduce 1,2,3,4 : 1 + 2 + 3 + 4 = 10

10

Why 10? Because 1 + 2 + 3 + 4 = 10.

Note that in the above statement, `x` and `y` are initially the first two elements of the list `numbers`. Then as they move along the data, `x` becomes the sum of the previous numbers while `y` indicates next element in the list. 

Here is the equivalent with a loop:

In [5]:
x = numbers[0] # grab the first
for y in numbers[1:]: # or use range(1, len(numbers) + 1)
    x = x + y # multiply by the second and update
    
print(x)

10


Now, let's combine all `lambda`, `map`, and `reduce` to calculate the sum of the squares of the elements of a list:

In [6]:
numbers = range(1,5)
reduce((lambda x, y: x+ y), map((lambda x: x **2), numbers)) # 1 + 4 + 9 + 16 = 30

30

## Spark Map-Reduce
Up to this point, we only saw some toy examples to learn how Map-Reduce works in action. Now, we implement a word count example using the actual Map-Reduce functions that will be run on Spark. The ultimate goal of this example is to count the number of each word (not the total number of all words) in the text file `spark.txt`.

**Note 1:** While the main concpets are the same, there are some minor differences between `map` and `reduce` fucntions in Python and Spak. We will find out these differences soon.

**Note 2:** This part is developed based on https://github.com/apache/spark/blob/master/examples/src/main/python/wordcount.py

### Libraries and Data
Let's start with loading the libraries

In [7]:
from __future__ import print_function # this version is Py2: print vs print()
from pyspark import SparkContext, SparkConf
from operator import add

Note that a SparkContext is an object that represents a connection to a Spark cluster. We use this object to create Resilient Distributed Datasets (RDD) and broadcast variables on that cluster.

Before going further, let's make sure there is no Spark Context running in the background.

In [8]:
sc.stop() # sometimes you have to stop previous 'context', especially if it crashed

Now, we initialize Spark and set a name for our example:

In [9]:
sc = SparkContext(conf=SparkConf(), appName='PyWordCount')

Here we load a text file `spark.txt`. The function `textFile` reads and submits a text file to Spark. Indeed, each line of the text file is considered as one independent element. The `textFile` function also splits and distributes the data over some partitions. 

In [10]:
inputRDD = sc.textFile("./spark.txt")
print(inputRDD)

MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2


To find out the number of partitions and the size of RDD (the total number of elements) we can simply call `getNumPartitions` and `count` methods, respectively.

In [11]:
print('The number of partitions: ',inputRDD.getNumPartitions(),
      '\nThe total number of elements: ', inputRDD.count())

The number of partitions:  2 
The total number of elements:  7


In [12]:
inputRDD.mapPartitions(lambda m: [1]).reduce(lambda a,b: a+b)

2

In [13]:
inputRDD.map(lambda m: len(m)).reduce(lambda a,b: a+b)

739

Before going further, let's take a look at the file

In [14]:
! cat ./spark.txt

These examples give a quick overview of the Spark API. 
Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects. 
You create a dataset from external data, then apply parallel operations to it. 
The building block of the Spark API is its RDD API. In the RDD API, there are two types of operations: transformations, which define a new dataset based on previous ones, and actions, which kick off a job to execute on a cluster.
On top of Spark�s RDD API, high level APIs are provided, e.g.DataFrame API and Machine Learning API. 
These high level APIs provide a concise way to conduct certain data operations. 
In this page, we will show examples using RDD API as well as examples using high level APIs.

### Mappings
Here we use three built-in mapping methods from Spark: `map`, `flatMap`, and `mapPartitions`. The `map` returns a new RDD by applying a function to each element of the input RDD. Therefore, for each single element in input RDD we will have exactly one element in output RDD (1-to-1 mapping).
The `flatMap` applies a function on all elements, then returns the flattened results. This means we may have a list of returned results for each single input (1-to-N mapping). Finally, the `mapPartitions` applies a function on every partitions (not elements) of the RDD.

Let's do some simple expriments to understand the differences between `map` and `mapPartisions` (which we will extensively use in the this and other activities). Suppose we want to count the number of elements (in this example the number of lines) in our RDD. To find out this number, we map every elements to a single value `1`. Then we sum up all these `1`s which gives us the total number of elements:

In [15]:
inputRDD.map(lambda m: 1).reduce(lambda a,b: a+b) # = inputRDD.inputRDD.count()

7

To count the number of partitions, we redo the above expression but using `mapPartitions`. Indeed, we map each partition into a partition which only has one `[1]` and then sum up these values. Note that we have to use `[1]` instead of `1` as the partitions has to be an iterator (i.e., lists as opposed to single values)

In [16]:
inputRDD.mapPartitions(lambda m: [1]).reduce(lambda a,b: a+b) # = inputRDD.getNumPartitions()

2

** Question 1:** How we can count the number of characters in the text file using `map` and `reduce` functions?

Back to our main problem, we intend to calculate the frequency of each word in the document. Therefore, we use `flatMap` to split the text file into single words (so the input elements are lines and the outputs are lists of words in each line). Then we use `map` to create tuples of the form `<key, value>`. Here, `key` is nothing but a single word that was previously generated by `flatMap`, and the `value`s are always `1`.

One may ask "*why we set all the values equal to 1 regardless of their keys?*", or even "*why we need such tuples, anyway?*". The answer is that the key-value tuples are the common way of intraction between the "distributed mapers" and the "central reducer". Indeed, the logic behind having this special mapping in this example is that every single word that a mapper receives is counted as one more observation. Therefore the value of that key should be `1`. In the final stage, the reducer groups all tuples with the same keys and then adds their values. Therefore, if a certain word has been seen for $N$ times (no matter how many mapper we had), the reducer gets $N$ tuples with that specfic word as the key. Ironically, the sum of $N$ `1`s is nothing but $N$ which is exactly the number of occurrence of that word. 

Now that we know the logic behind the mapping and reducing, let's do the splitting to get the list of words:

In [17]:
words = inputRDD.flatMap(lambda x: x.split(' '))
print(words)

PythonRDD[7] at RDD at PythonRDD.scala:43


... and then generating < word, 1> tuples

In [18]:
wordsOne = words.map(lambda x: (x, 1))
print(wordsOne)

PythonRDD[8] at RDD at PythonRDD.scala:43


### Reducing
As mentioned before, our reducer only need to group the < word, 1 > tuples by their keys and add their values to calculate the number of times that word was observed in the text file. To do so, we use `reduceByKy` method (instead of `reduce` which ignores keys), and declare we want to calculate the sum of the values by seting its input parameter as `add`.

In [19]:
wordCounts = wordsOne.reduceByKey(add)
print(wordCounts)

PythonRDD[13] at RDD at PythonRDD.scala:43


### Collecting
Now, the only thing remians is to collect the result. Indeed, up to this point Spark did not touched our data! By calling `collect` method, Spark does all the above steps and returns the results


In [20]:
output = wordCounts.collect() 

In [21]:
# Print the 10 first entries of the result
output[:10]

[(u'', 5),
 (u'operations', 1),
 (u'page,', 1),
 (u'Java', 1),
 (u'well', 1),
 (u'is', 2),
 (u'APIs.', 1),
 (u'Machine', 1),
 (u'as', 2),
 (u'You', 1)]

As we see, the output entries have no specific order (e.g., alphabetically, order of occurrence or frequency of occurrence) because behind the scene Spark shuffles the inputs of the reducer(s). 

To see the top ten frequent words, we can sort the output elements and print them:

In [22]:
sorted(output, key=lambda x: x[1], reverse = True)[0:10] # top 10 

[(u'a', 6),
 (u'', 5),
 (u'of', 5),
 (u'the', 4),
 (u'RDD', 4),
 (u'which', 3),
 (u'Spark', 3),
 (u'API.', 3),
 (u'high', 3),
 (u'examples', 3)]

### Compact Version
Using the power of piplining, we could do all the above steps in a very compact way:

In [23]:
sc.stop() # stop previous 'context'
sc = SparkContext(appName="PythonWordCount") # create a new context
# the following does all the job at once:
output = sc.textFile("./spark.txt").flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add).collect()
# and the output:
sorted(output, key=lambda x: x[1], reverse = True)[0:10] # top 10 

[(u'a', 6),
 (u'', 5),
 (u'of', 5),
 (u'the', 4),
 (u'RDD', 4),
 (u'which', 3),
 (u'Spark', 3),
 (u'API.', 3),
 (u'high', 3),
 (u'examples', 3)]