# First steps in PySpark 

In this notebook we will learn the fundamentals of functional programming, as well as the basic abstraction of a distributed object in Spark, the RDD. The notebook has been divided into two parts:

Part 1: map/reduce basics

Part 2: Work with RDD and Pair RDD abstractions 




<img src="../assets/yogen-logo.png" alt="yogen" style="width: 200px; float: right;"/>

# Part 1: map/reduce basics

![Hadoop Logo](https://upload.wikimedia.org/wikipedia/commons/thumb/0/0e/Hadoop_logo.svg/220px-Hadoop_logo.svg.png)
# **Apache Hadoop (MapReduce)**

It is an open source software framework written in Java for distributed storage and distributed processing of very large data sets on computer clusters built from commodity hardware. All the modules in Hadoop are designed with a fundamental assumption that hardware failures (of individual machines, or racks of machines) are common and thus should be automatically handled in software by the framework.

The core of Apache Hadoop consists of a storage part (Hadoop Distributed File System (HDFS)) and a processing part (MapReduce). Hadoop splits files into large blocks and distributes them amongst the nodes in the cluster. To process the data, Hadoop MapReduce transfers packaged code for nodes to process in parallel, based on the data each node needs to process. This approach takes advantage of data locality — nodes manipulating the data that they have on hand — to allow the data to be processed faster and more efficiently than it would be in a more conventional supercomputer architecture that relies on a parallel file system where computation and data are connected via high-speed networking.

![caption](http://d152j5tfobgaot.cloudfront.net/wp-content/uploads/2012/07/mapreduce.png)

Since data and computation are distributed, we should avoid the use of variables, i.e. mutable data. Thus, in contrast to impertaive programming, we shall use the functional approach (lambda calculus).

In [1]:
def make_tuples(word): 
    return (word, 1)

In [2]:
def word_reducer(tuple_1, tuple_2):
    return (tuple_1[0], tuple_1[1] + tuple_2[1])

### The goal of the following excercises is to understand basic lambda calculus with python.

### (1a) Functional programming in Python

So, what is Functional Programming? From Wikipedia: 

« …a  programing paradigm that treats computation as the evaluation of mathematical functions and **avoids changing-state and mutable  data**.»

It´s based upon Lambda calculus, wich consist of:
 * Function definition (declaration of expressions)
 * Function application (evaluation of those expressions)
 * Recursion (iteration)

We have already used this in python!!! :)

Recall the typical "lambda x: x+1" we have been using as the first argument of map, reduce and filter methods:
 * **map** maps each value in the input collection to a different value. It´s just the classical mathematical funciton we are used to!
 * **reduce** takes two values from the input collection and returns a new value (of the same type) by appliying a commutative operation to them. 
 * **filter** filters the elements in the input collection according to a certain (boolean) criteria.
 

**Mapping**

![map](https://cosminpupaza.files.wordpress.com/2015/10/map.png?w=505)

In [3]:
collection = range(10)

list(map(lambda x: x + 10, collection))

[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [4]:
list(map(lambda x: x ** 2, collection))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [5]:
collection_of_strings = ['Hola', 'amijo', 'que', 'tal', 'estas']

initials = list(map(lambda word: word[0], collection_of_strings))
initials

['H', 'a', 'q', 't', 'e']

In [6]:
def initial(word):
    return word[0]

initials = list(map(initial, collection_of_strings))
initials

['H', 'a', 'q', 't', 'e']

In [7]:
list(map(lambda word: len(word), collection_of_strings))

[4, 5, 3, 3, 5]

**Filtering**
![filter](https://cosminpupaza.files.wordpress.com/2015/11/filter.png?w=405)

In [8]:
collection

list(filter(lambda x: x%2 ==0, collection))

[0, 2, 4, 6, 8]

#### Exercise

write a filter to extract those words that have more than 3 letters from `collection_of_strings`

In [9]:
list(filter(lambda word: len(word)>3, collection_of_strings))

['Hoal', 'amijo', 'estas']

**Reducing** Recall it must be commutative! Think about the importance of this when parallelizing computations

![](https://cosminpupaza.files.wordpress.com/2015/11/reduce.png?w=500)

In [10]:
from functools import reduce


reduce(lambda x, y: x + y, collection)

45

In [11]:
reduce(lambda x, y: x - y, collection)

-45

In [12]:
reduce(lambda x, y: x - y, sorted(collection, reverse=True))

-27

## (1b) Exercise: Calculate the mean of a collection of real numbers using map/reduce
Recall:

$$\bar x = \frac{\sum_{i=1}^{N} x_i}{N} $$

It´s straightforward to do this with python built-in mehots sum() and len(). However, how would you do that with map/reduce? We have already shown how to sum the elements of an array. Thus, you have to calculate the length of the array. For this:
 * Create another array of the same size, consisting of 1s.
 * Sum the elements of that array

#### First part

* Do a reduce to do the sum, and a different map-reduce to get the length

In [13]:
sum_ = reduce(lambda x, y: x + y, collection)
len_ = reduce(lambda x, y: x + y, map(lambda x: 1, collection))
sum_ / len_


4.5

#### Combine them in one pass

In [14]:
def f(element):
    return (element, 1)
    

In [15]:
list(map(f, collection))

[(0, 1),
 (1, 1),
 (2, 1),
 (3, 1),
 (4, 1),
 (5, 1),
 (6, 1),
 (7, 1),
 (8, 1),
 (9, 1)]

In [16]:
def g(tuple_1, tuple_2):
    partial_sum = tuple_1[0] + tuple_2[0]
    partial_count = tuple_1[1] + tuple_2[1]
    
    return (partial_sum, partial_count)
    
sum_, len_ = reduce(g, map(f, collection))

sum_ / len_

4.5

## (1c) Exercise: Calculate the standard deviation of a collection of real numbers
Recall:

$$\sigma_x^2 = \frac{\sum_{i=1}^{N} (x_i-\bar x)^2}{N-1}$$

For this, use the *mean* and *count* variables from the previous excercise.

In [17]:
import math

sum_, len_ = reduce(g, map(f, collection))
average = sum_ / len_

square_differences = map(lambda x: (x - average)**2, collection)
sum_square_differences = reduce(lambda x, y: x + y, square_differences)

math.sqrt(sum_square_differences / (len_ - 1))

3.0276503540974917

In [18]:
import numpy as np

np.std(collection, ddof=1)

3.0276503540974917

in one pass

$$\sigma_x^2 = \frac{\sum_{i=1}^{N} (x_i-\bar x)^2}{N-1} =
\frac{\sum_{i=1}^{N} (x_i^2+{\bar x}^2-2x_i\bar x)}{N-1} =
\frac{1}{N-1}\left(\sum_i x_i^2-N\bar x^2\right)$$


## (1c.bis) Exercise: all at once! 
For the std calculation, we have obtained separatedly the sum of elements, the lenght and the sum of the elements squared. That is, we have swept the array three times! Can you do it in a two step process using map/reduce? Do you think it might matter at some point?
 * Hint: recall that reduce takes two arguments of the same type, and returns another value of that type. So, instead of using numbers as the elements of our array, use tuples!!

In [19]:
def make_tuple(element):
    return (element ** 2, element, 1)
    
def combine_tuples(tuple_1, tuple_2):
    el_0 = tuple_1[0] + tuple_2[0]
    el_1 = tuple_1[1] + tuple_2[1]
    el_2 = tuple_1[2] + tuple_2[2]
    
    return (el_0, el_1, el_2)

sum_squares, sum_, len_ = reduce(combine_tuples, 
                                 map(make_tuple, 
                                     collection)
                                )

In [20]:
math.sqrt((sum_squares - len_ * (sum_ / len_) ** 2 )/ (len_ - 1))

3.0276503540974917

In [21]:
np.std(collection, ddof=1)

3.0276503540974917

## (1d) Twe 'word-count' problem: creating histograms
Given a set of keys in an input collection, calculate the frequency of each key. 

In order to understand better how map/reduce works, we will implement this simple calculation in several forms.

For simplicity, we are going to create a list of numbers between 1 and 9, that can be repeated a (random) number of times.

In [22]:
import random

random.seed(42)
a = [random.randint(1, 9) for _ in range(500)]
a

[2,
 1,
 5,
 4,
 4,
 3,
 2,
 9,
 2,
 7,
 1,
 1,
 2,
 4,
 4,
 9,
 1,
 9,
 4,
 9,
 7,
 4,
 8,
 5,
 1,
 3,
 7,
 6,
 5,
 3,
 4,
 6,
 2,
 2,
 7,
 2,
 6,
 6,
 5,
 1,
 8,
 9,
 2,
 7,
 2,
 9,
 5,
 6,
 4,
 2,
 1,
 4,
 5,
 2,
 4,
 2,
 7,
 5,
 8,
 6,
 3,
 6,
 6,
 4,
 5,
 2,
 3,
 9,
 4,
 3,
 8,
 7,
 5,
 9,
 4,
 6,
 1,
 4,
 1,
 6,
 7,
 5,
 2,
 4,
 6,
 4,
 8,
 7,
 8,
 3,
 5,
 3,
 4,
 9,
 9,
 5,
 7,
 7,
 6,
 4,
 3,
 9,
 8,
 2,
 1,
 2,
 3,
 3,
 7,
 2,
 7,
 7,
 8,
 9,
 5,
 9,
 1,
 2,
 9,
 5,
 6,
 2,
 5,
 7,
 3,
 8,
 1,
 5,
 9,
 3,
 9,
 2,
 5,
 9,
 4,
 3,
 6,
 3,
 9,
 9,
 1,
 6,
 8,
 1,
 2,
 6,
 5,
 4,
 1,
 4,
 2,
 2,
 8,
 2,
 9,
 3,
 3,
 8,
 9,
 3,
 5,
 9,
 7,
 4,
 9,
 4,
 5,
 7,
 6,
 8,
 9,
 8,
 2,
 4,
 4,
 2,
 6,
 1,
 9,
 4,
 4,
 1,
 2,
 1,
 4,
 2,
 1,
 6,
 2,
 9,
 4,
 5,
 8,
 4,
 9,
 3,
 8,
 4,
 8,
 7,
 4,
 2,
 2,
 7,
 6,
 7,
 7,
 8,
 1,
 2,
 1,
 7,
 6,
 2,
 4,
 4,
 4,
 9,
 8,
 3,
 7,
 3,
 5,
 8,
 4,
 2,
 8,
 9,
 2,
 1,
 9,
 1,
 2,
 4,
 3,
 7,
 8,
 8,
 4,
 7,
 1,
 3,
 7,
 1,
 7,
 5,
 8,
 5,
 7,
 9,


### (1d.1) Simple approach

 * Start with an empty dict
 * If a new key is not present in the dict, create it.
 * Otherwise, increase the frequency of the key by one.

In [23]:
def word_count(input_list):
    
    result_dict = {}
    
    for element in input_list:
        if element not in result_dict:
            result_dict[element] = 1
        else:
            result_dict[element] += 1
            
    return result_dict
            
word_count(a)

{1: 52, 2: 64, 3: 49, 4: 70, 5: 55, 6: 47, 7: 56, 8: 44, 9: 63}

We need to create the dictionary before we put in the first element because:

In [24]:
result_dict = {}
result_dict['tocoto'] += 1

KeyError: 'tocoto'

### (1d.2) Map/reduce

 * Recall that *reduce* applies an operation to 2 elements of the same type, and returns another element of that type. Thus, first thing to do is to map our collection to the type of the output. We cannot use dicts, as dict(list) removes duplictaed keys. We will use list of tuples instead.
 * Then, we have to define a method in the reducer that combines keys. There are two steps:
   * Obtain the keys in the left list
   * Then, check that the key in the second list already exists in the first one

In [25]:
def word_count_mr(input_list):
    tuples = map(lambda word: [(word, 1)], input_list)
    
    return reduce(combine_keys, tuples)
    
    
def combine_keys(left, right):
    
    # Copy input for purity
    left = left.copy()
    
    # need to check if this is the first time we see the word
    already_seen_words = list(map(lambda t: t[0], left))
    this_word = right[0][0]
    
    if this_word not in already_seen_words:
        left = left + right
    else:
        position = already_seen_words.index(this_word)
        left[position] = (this_word, left[position][1] + 1)
    
    return left
    
    

In [26]:
intermediate = [('tocoto', 3), ('holi', 2)]
tocoto = [('tocoto', 1)]
guapi = [('guapi', 1)]

assert(combine_keys(intermediate, tocoto) == [('tocoto', 4), ('holi', 2)])
assert(combine_keys(intermediate, guapi) == [('tocoto', 3), ('holi', 2), ('guapi', 1)])

In [27]:
random.seed(42)
stupid_words = [random.choice(['holi', 'guapi', 'tocoto', 'chachi', 'lerelele']) for _ in range(500)]
word_count(stupid_words)

{'chachi': 88, 'guapi': 106, 'holi': 107, 'lerelele': 101, 'tocoto': 98}

In [28]:
word_count_mr(stupid_words)

[('holi', 107),
 ('tocoto', 98),
 ('guapi', 106),
 ('lerelele', 101),
 ('chachi', 88)]

Note the difference with the previous method, based on dictionaries: now, keys are not sorted!!

But, where did we sort the keys in *word_count*??? Well, we didn´t, but python *dictionary* does that internally for us to speed up things. See the difference in time...

In [29]:
dict(word_count_mr(stupid_words)) == word_count(stupid_words)

True

In [30]:
print(word_count_mr(stupid_words))
print(word_count(stupid_words))

%timeit word_count_mr(stupid_words)
%timeit word_count(stupid_words)

[('holi', 107), ('tocoto', 98), ('guapi', 106), ('lerelele', 101), ('chachi', 88)]
{'holi': 107, 'tocoto': 98, 'guapi': 106, 'lerelele': 101, 'chachi': 88}
821 µs ± 7.51 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
38.3 µs ± 531 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)


**(1d.3) Map/reduce with pre-sorting**  As shown, the sorting of keys used by a dictionary actually speed up the process. 

However, our *combine_keys* method is creating an array of keys and checking whether a new key is already present in every step. This can be avoided by sorting the initial list first.

In [31]:
def word_count_mr_sorted(input_list):
    
    sorted_list = sorted(input_list)
    tuples = map(lambda word: [(word, 1)], sorted_list)
    
    return reduce(combine_sorted_keys, tuples)
    
    
def combine_sorted_keys(left, right):
    
    # Copy input for purity
    left = left.copy()
    
    # need to check if this is the first time we see the word
    this_word = right[0][0]
    last_word = left[-1][0]
    
    already_seen = this_word == last_word
    
    if not already_seen:
        left = left + right
    else:
        counts = left[-1][1]
        left[-1] = (this_word, counts + 1)
        
    return left
    
    

Now, computing times get closer. Still, our map/reduce methods are slower, since we cannot use dictionaries...

In [32]:
print(word_count_mr(stupid_words))
print(word_count(stupid_words))
print(word_count_mr_sorted(stupid_words))

%timeit word_count_mr(stupid_words)
%timeit word_count(stupid_words)
%timeit word_count_mr_sorted(stupid_words)

[('holi', 107), ('tocoto', 98), ('guapi', 106), ('lerelele', 101), ('chachi', 88)]
{'holi': 107, 'tocoto': 98, 'guapi': 106, 'lerelele': 101, 'chachi': 88}
[('chachi', 88), ('guapi', 106), ('holi', 107), ('lerelele', 101), ('tocoto', 98)]
818 µs ± 2.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
38.6 µs ± 642 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
353 µs ± 5.17 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


# Part 2: Spark. Work with RDD and Pair RDD abstractions 

![drawing](https://prateekvjoshi.files.wordpress.com/2015/10/1-main4.png)

# ** Apache Spark**

Apache Spark is an open source cluster computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop's two-stage disk-based MapReduce paradigm, Spark's multi-stage in-memory primitives provides performance up to 100 times faster for certain applications.

![](http://image.slidesharecdn.com/sparkandshark-120620130508-phpapp01/95/spark-and-shark-8-728.jpg?cb=1340197567)

By allowing user programs to load data into a cluster's memory and query it repeatedly, Spark is well-suited to machine learning algorithms.
![](http://spark.apache.org/images/logistic-regression.png)

Spark comes with a number of components that provide flexibility and generality.

<img src="http://spark.apache.org/images/spark-stack.png" alt="Drawing" style="width: 500px;"/>


## In this part, we keep on working on the word-count example, this time with spark. The basic abstraction of Spark is the Resilient Distributed Dataset (RDD):

#### «RDDs are fault-tolerant, parallel data structures that let users explicitly persist intermediate results in memory, control their partitioning to optimize data placement, and manipulate them using a rich set of operators.»

 * Read only, partitioned collection of records (immutable).
 * Stores the transformations used to build a dataset (its lineage), instead of the data itself. This property ensures fault-tolerance.
 * Users can control partitioning and persistence (caching).
 * RDDs are statically typed.
 * … and yes, everything is written in scala ;p. So you better learn a little bit of it!
 
<img src="http://eng.trueaccord.com/wp-content/uploads/2014/10/scala-logo.png" alt="Drawing" style="width: 200px;"/>

#### We will be trying to understand this abstraction with simple examples, using the [Python API](http://spark.apache.org/docs/latest/api/python/index.html)




### ** (2a) Create a base RDD: parallelize, actions and transformations **
We'll start by generating a base RDD by using a Python list and the `sc.parallelize` method.  Then we'll print out the type of the base RDD.

We use the sc.parallelise to convert a standard Python collection into an RDD.

In [33]:
import random

random.seed(42)
stupid_words = [random.choice(['holi', 
                               'guapi', 
                               'tocoto', 
                               'chachi', 
                               'lerelele']) for _ in range(500)]

In [34]:
wordsRDD = sc.parallelize(stupid_words)
type(wordsRDD)

pyspark.rdd.RDD

In [35]:
wordsRDD[0]

TypeError: 'RDD' object does not support indexing

**Nothing has actually happened!**

`parallellize` tells spark to distribute the data, but this is not actually done until we perform some action.

Possible actions include couting, collecting, reducing, taking, etc. Take a look at the [Spark programming guide](http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions)


In [36]:
res = wordsRDD.take(2)
res, type(res)

(['holi', 'holi'], list)

Apart from actions, we can apply transformations to an RDD. Spark won´t do anything, until an action is performed.  

In [37]:
pluralsRDD = wordsRDD.map(lambda word: word + 's')

For instance, we can obtain the length of each word

In [38]:
lenghtsRDD = wordsRDD.map(lambda word: len(word))
lenghtsRDD.take(5)

[4, 4, 6, 5, 5]

In [39]:
wordsRDD.filter(lambda word: len(word) > 4).take(10)

['tocoto',
 'guapi',
 'guapi',
 'guapi',
 'lerelele',
 'lerelele',
 'chachi',
 'guapi',
 'guapi',
 'lerelele']

In [40]:
plurals = pluralsRDD.collect()

type(plurals)

list

### **(2b) Persisting and the RDD lineage**

So far, we have seen that Spark RDDs are *lazy evaluated*, i.e. nothing is actually done until an action is performed. In the RDD, the set of transformations to be applied are remembered: this is known as its *lineage*. It has the important consequence of making Spark RDDs *fault tolerant* automatically.

![](http://images.slideplayer.com/14/4499833/slides/slide_10.jpg) 

It might be interesting to store some intermediate results, though: perhaps because we want to apply several different transformations starting from that point, or because we are going to apply an iterative computation (as is customary in machine learning algorithms). For this, Spark has [several ways of persisting](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)

In [41]:
sc

In [42]:
wordsRDD.first()

'holi'

In [43]:
wordsRDD.cache()

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175

In [44]:
lengthsRDD = wordsRDD.map(lambda word: len(word))
lengthsRDD.take(3)

[4, 4, 6]

In [45]:
wordsRDD.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [46]:
wordsRDD.getStorageLevel?

In [47]:
from pyspark import StorageLevel

StorageLevel.MEMORY_ONLY

StorageLevel(False, True, False, False, 1)

In [48]:
wordsRDD.unpersist()

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175

In [49]:
wordsRDD.persist(StorageLevel.MEMORY_AND_DISK_2)

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175

In [50]:
wordsRDD.first()

'holi'

In [51]:
wordsRDD.getStorageLevel()

StorageLevel(True, True, False, False, 2)

In [52]:
# wordsRDD.cache() is a synonim for:

# wordsRDD.persist(StorageLevel.MEMORY_ONLY)

### **(2c) Partitioning **

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.

To get the number of partitions of an RDD, just use `getNumPartitions()` on your RDD. You can change the partitions during RDD creation (with `parallelize(collection,numPartitions)` or `fromTextFile(file,numPartitions)`), or afterwards with methos like `repartition(), coalesce()`, etc.

In [53]:
wordsRDD.getNumPartitions()

8

In [54]:
otherRDD = sc.parallelize(stupid_words, 12)

otherRDD.getNumPartitions()

12

In [55]:
repartitionedRDD = otherRDD.repartition(5)
repartitionedRDD.getNumPartitions(), otherRDD.getNumPartitions()

(5, 12)

We can see the partitions using [glom()](http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=glom#pyspark.RDD.glom): it retruns an RDD created by coalescing all elements within each partition into a list.

In [56]:
random.seed(42)
smallRDD = sc.parallelize([random.choice(['Cafe', 'Te']) for _ in range(20)])

In [57]:
smallRDD.getNumPartitions()

8

In [58]:
smallRDD.glom().collect()

[['Cafe', 'Cafe'],
 ['Te', 'Cafe'],
 ['Cafe', 'Cafe'],
 ['Cafe', 'Cafe', 'Te', 'Cafe'],
 ['Cafe', 'Cafe'],
 ['Cafe', 'Cafe'],
 ['Cafe', 'Cafe'],
 ['Te', 'Cafe', 'Te', 'Te']]

In [59]:
smallRDD.repartition(3).glom().collect()

[[],
 ['Cafe',
  'Cafe',
  'Cafe',
  'Cafe',
  'Cafe',
  'Cafe',
  'Cafe',
  'Cafe',
  'Te',
  'Cafe',
  'Te',
  'Te'],
 ['Te', 'Cafe', 'Cafe', 'Cafe', 'Te', 'Cafe', 'Cafe', 'Cafe']]

Partitions are one of the most powerfull concepts in Spark: you can decide how to distribute your data so it can fit in memory, and more importantly, you can perform computations on each partition *before* speaking to other partitions. This can have an enorumous impact on performance

In [60]:
smallRDD.coalesce(3).glom().collect()

[['Cafe', 'Cafe', 'Te', 'Cafe'],
 ['Cafe', 'Cafe', 'Cafe', 'Cafe', 'Te', 'Cafe', 'Cafe', 'Cafe'],
 ['Cafe', 'Cafe', 'Cafe', 'Cafe', 'Te', 'Cafe', 'Te', 'Te']]

### **(2c) Pair RDDs: *grouping* strategies in Spark**

The next step in writing our word counting program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple (k, v) where k is the key and v is the value. In this example, we will create a pair consisting of ('<word>', 1) for each word element in the RDD, as we did in the map/reduce version of the histogram in Python, section (1d.2).

We can create the pair RDD using the map() transformation with a lambda() function to create a new RDD.

In [61]:
pairRDD = wordsRDD.map(lambda word: (word, 1))

pairRDD.take(6)

[('holi', 1),
 ('holi', 1),
 ('tocoto', 1),
 ('guapi', 1),
 ('guapi', 1),
 ('guapi', 1)]

### ** (2c.1) `groupByKey()` approach **
An approach you might first consider (we'll see shortly that there are better ways) is based on using the [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) transformation. As the name implies, the `groupByKey()` transformation groups all the elements of the RDD with the same key into a single list in one of the partitions. There are two problems with using `groupByKey()`:
  + The operation requires a lot of data movement to move all the values into the appropriate partitions.
  + The lists can be very large. Consider a word count of English Wikipedia: the lists for common words (e.g., the, a, etc.) would be huge and could exhaust the available memory in a worker.
 
Use `groupByKey()` to generate a pair RDD of type `('word', iterator)`. Next, sum the iterator using a `map()` transformation.  The result should be a pair RDD consisting of (word, count) pairs.

In [62]:
groupedRDD = pairRDD.groupByKey()

In [63]:
groupedRDD.collect()

[('lerelele', <pyspark.resultiterable.ResultIterable at 0x7f8f7305ad30>),
 ('holi', <pyspark.resultiterable.ResultIterable at 0x7f8f7305ac50>),
 ('chachi', <pyspark.resultiterable.ResultIterable at 0x7f8f7305ada0>),
 ('guapi', <pyspark.resultiterable.ResultIterable at 0x7f8f7305ae10>),
 ('tocoto', <pyspark.resultiterable.ResultIterable at 0x7f8f7305ae80>)]

In [64]:
countsRDD = groupedRDD.map(lambda tupl: (tupl[0], len(tupl[1])))
countsRDD.collect()

[('lerelele', 101),
 ('holi', 107),
 ('chachi', 88),
 ('guapi', 106),
 ('tocoto', 98)]

In [65]:
countsRDD = groupedRDD.mapValues(lambda it: len(it))
countsRDD.collect()

[('lerelele', 101),
 ('holi', 107),
 ('chachi', 88),
 ('guapi', 106),
 ('tocoto', 98)]

### ** (2c.2)  `reduceByKey` approach **
A better approach is to start from the pair RDD and then use the [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) transformation to create a new pair RDD. 

The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.

![](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/images/reduce_by.png)

In [66]:
pairRDD = wordsRDD.map(lambda word: (word, 1))

In [67]:
pairRDD.reduceByKey(lambda v1, v2: v1 + v2).collect()

[('lerelele', 101),
 ('holi', 107),
 ('chachi', 88),
 ('guapi', 106),
 ('tocoto', 98)]

### ** (2c.3)  `combineByKey` approach: the mother of dragons **

The [combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None)](http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=combinebykey#pyspark.RDD.combineByKey) method is a generic (and powerful!)function to combine the elements for each key using a custom set of aggregation functions.

It turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C. Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).

Users provide three functions:

#### * createCombiner, which turns a V into a C (e.g., creates a one-element list)
#### * mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
#### * mergeCombiners, to combine two C’s into a single one.

In [None]:
# let's return the count and the length of words:
wordPairs = wordsRDD....
wordPairs.combineByKey(
    ...,
    ...,
    ...
).collectAsMap()

In [None]:
# let's return the count and the list of words:
wordPairs = wordsRDD.map(lambda i:(i,i))
wordPairs.combineByKey(
    ...,
    ...,
    ...
).collectAsMap()

## (2d) Apply word count to a file

### ** (2d.1) Load a text file **
For the next part of this lab, we will use the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). To convert a text file into an RDD, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation and change all text to lowercase.  Since the file is large we use `take(15)`, so that we only print(15 lines.)

In [69]:
!wget -v http://www.gutenberg.org/files/100/100-0.txt -O shakespeare.txt

--2018-03-29 12:10:26--  http://www.gutenberg.org/files/100/100-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5858792 (5,6M) [text/plain]
Saving to: ‘shakespeare.txt’


2018-03-29 12:10:44 (329 KB/s) - ‘shakespeare.txt’ saved [5858792/5858792]



In [70]:
!head shakespeare.txt


Project Gutenberg’s The Complete Works of William Shakespeare, by William
Shakespeare

This eBook is for the use of anyone anywhere in the United States and
most other parts of the world at no cost and with almost no restrictions
whatsoever.  You may copy it, give it away or re-use it under the terms
of the Project Gutenberg License included with this eBook or online at
www.gutenberg.org.  If you are not located in the United States, you’ll
have to check the laws of the country where you are located before using


In [71]:
linesRDD = sc.textFile('shakespeare.txt')

linesRDD

shakespeare.txt MapPartitionsRDD[38] at textFile at NativeMethodAccessorImpl.java:0

In [72]:
linesRDD.getNumPartitions()

2

In [73]:
linesRDD.take(10)

['',
 'Project Gutenberg’s The Complete Works of William Shakespeare, by William',
 'Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere in the United States and',
 'most other parts of the world at no cost and with almost no restrictions',
 'whatsoever.  You may copy it, give it away or re-use it under the terms',
 'of the Project Gutenberg License included with this eBook or online at',
 'www.gutenberg.org.  If you are not located in the United States, you’ll',
 'have to check the laws of the country where you are located before using']

### ** (2d.2) Capitalization and punctuation **
Real world files are more complicated than the data we have been using in this lab. Some of the issues we have to address are:
  + Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word).
  + All punctuation should be removed.
  + Any leading or trailing spaces on a line should be removed.
 
Define the function `removePunctuation` that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces.  Use the Python [re](https://docs.python.org/2/library/re.html) module to remove any text that is not a letter, number, or space. Reading `help(re.sub)` might be useful.

In [74]:
import re

In [75]:
help(re.sub)

Help on function sub in module re:

sub(pattern, repl, string, count=0, flags=0)
    Return the string obtained by replacing the leftmost
    non-overlapping occurrences of the pattern in string by the
    replacement repl.  repl can be either a string or a callable;
    if a string, backslash escapes in it are processed.  If it is
    a callable, it's passed the match object and must return
    a replacement string to be used.



In [76]:
def remove_punctuation(string):
    
    clean_string = re.sub('[^\w ]', '', string)
    
    return clean_string.lower()

In [77]:
clean_linesRDD = linesRDD.map(remove_punctuation)

clean_linesRDD.take(25)

['',
 'project gutenbergs the complete works of william shakespeare by william',
 'shakespeare',
 '',
 'this ebook is for the use of anyone anywhere in the united states and',
 'most other parts of the world at no cost and with almost no restrictions',
 'whatsoever  you may copy it give it away or reuse it under the terms',
 'of the project gutenberg license included with this ebook or online at',
 'wwwgutenbergorg  if you are not located in the united states youll',
 'have to check the laws of the country where you are located before using',
 'this ebook',
 '',
 'see at the end of this file  content note added in 2017 ',
 '',
 '',
 'title the complete works of william shakespeare',
 '',
 'author william shakespeare',
 '',
 'release date january 1994 ebook 100',
 'last updated february 19 2018',
 '',
 'language english',
 '',
 'character set encoding utf8']

### ** (2d.3) Words from lines **
Before we can use the `wordcount()` function, we have to address two issues with the format of the RDD:
  + The first issue is that  that we need to split each line by its spaces.
  + The second issue is we need to filter out empty lines.
 
Apply a transformation that will split each element of the RDD by its spaces. For each element of the RDD, you should apply Python's string [split()](https://docs.python.org/2/library/string.html#string.split) function. You might think that a `map()` transformation is the way to do this, but think about what the result of the `split()` function will be.

In [78]:
linelists_RDD = clean_linesRDD.map(lambda string: string.split(' '))
linelists_RDD.take(3)

[[''],
 ['project',
  'gutenbergs',
  'the',
  'complete',
  'works',
  'of',
  'william',
  'shakespeare',
  'by',
  'william'],
 ['shakespeare']]

In [79]:
# This could break the session, because it is going to collect 
# the whole corpus into the driver.

# linelists_RDD.reduce(lambda x, y: x + y)

In [99]:
shakespeare_wordsRDD = clean_linesRDD\
                                .flatMap(lambda string: string.split(' '))

### ** (2d.4) Remove empty elements **
The next step is to filter out the empty elements.  Remove all entries where the word is `''`.

In [100]:
only_shakespeare_wordsRDD = shakespeare_wordsRDD\
                            .filter(lambda word: len(word)>0)

In [101]:
only_shakespeare_wordsRDD.take(25)

['project',
 'gutenbergs',
 'the',
 'complete',
 'works',
 'of',
 'william',
 'shakespeare',
 'by',
 'william',
 'shakespeare',
 'this',
 'ebook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'in',
 'the',
 'united',
 'states',
 'and']

### (2d.5) Count the words and show the top 15

We know the drill at this point, don't we? We map to a tuple then `reduceByKey`

We can view the top 15 words by using the `takeOrdered()` action; however, since the elements of the RDD are pair tuples, we need a custom sort function that sorts using the value part of the pair rather than the key.

You'll notice that many of the words are common English words (know as stopwords).

Use our map reduce and and `takeOrdered()` to obtain the fifteen most common words and their counts.

In [102]:
word_countsRDD = only_shakespeare_wordsRDD\
                    .map(lambda word: (word, 1))\
                    .reduceByKey(lambda x, y: x + y)
word_countsRDD

PythonRDD[77] at RDD at PythonRDD.scala:48

In [83]:
# By default, takeOrdered sorts lexicografically, just as sorted does in raw Python

word_countsRDD.takeOrdered(15)

[('1', 90),
 ('10', 3),
 ('100', 3),
 ('1000', 1),
 ('1000txt', 1),
 ('1000zip', 1),
 ('1004', 1),
 ('1009', 1),
 ('101', 1),
 ('1012', 1),
 ('1016', 1),
 ('102', 1),
 ('1020', 1),
 ('1024', 1),
 ('1028', 1)]

In [84]:
# The key argument allows us to provide a custom sorting function

word_countsRDD.takeOrdered(15, key=lambda t: -t[1])

[('the', 29980),
 ('and', 28353),
 ('i', 21859),
 ('to', 20811),
 ('of', 18811),
 ('a', 15984),
 ('you', 14438),
 ('my', 13191),
 ('in', 12025),
 ('that', 11782),
 ('is', 9710),
 ('not', 9068),
 ('with', 8519),
 ('me', 8270),
 ('for', 8183)]

# Practical

## ETL with airline coupon data

Load the data first: coupon data

In [86]:
%ls data/

coupon150720.csv  transm150720.csv


In [87]:
cps = sc.textFile('./data/coupon150720.csv').cache()

In [88]:
cps.take(5)

['79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0',
 '79062005698500,2,AUH,CDG,9W,9W,84.34,USD,1,H,H,6120,150905,OK,IAF0',
 '79062005924069,1,CJB,MAA,9W,9W,60.0,USD,1,H,H,2768,150721,OK,IAA0',
 '79065668570385,1,DEL,DXB,9W,9W,160.63,USD,2,S,S,0546,150804,OK,INA0',
 '79065668737021,1,AUH,IXE,9W,9W,152.46,USD,1,V,V,0501,150803,OK,INA0']

#### Exercise

Take fields 0, 2, 3, 4, and 6 from each line of cps

In [89]:
coupons = cps.map(lambda line: line.split(','))\
             .map(lambda tupl: (tupl[0], tupl[2], tupl[3], tupl[4], float(tupl[6])))

In [90]:
coupons.take(5)

[('79062005698500', 'MAA', 'AUH', '9W', 56.79),
 ('79062005698500', 'AUH', 'CDG', '9W', 84.34),
 ('79062005924069', 'CJB', 'MAA', '9W', 60.0),
 ('79065668570385', 'DEL', 'DXB', '9W', 160.63),
 ('79065668737021', 'AUH', 'IXE', '9W', 152.46)]

#### Exercise

Keep only the amount. Get average, max, min and std

In [91]:
coupons.map(lambda coupon: coupon[4]).mean()

149.94532037167986

In [92]:
coupons.map(lambda coupon: coupon[4]).meanApprox(timeout=1)

149.94532037167986

In [93]:
coupons.map(lambda coupon: coupon[4]).max()

6355194.0

In [94]:
coupons.map(lambda coupon: coupon[4]).min()

0.0

In [95]:
coupons.map(lambda coupon: coupon[4]).stdev()

9978.4820861226926

#### Exercise

Get stats for all tickets with destination MAD

You will need to extract ticket amounts with destination MAD, and then calculate:

1. Total ticket amounts per origin
2. Top 10 airlines by average amount

In [96]:
madrid_coupons = coupons.filter(lambda t: t[2]=='MAD')

In [97]:
madrid_coupons.map(lambda coupon: (coupon[1], coupon[4]))\
              .reduceByKey(lambda x, y: x + y)\
              .takeOrdered(10, lambda t: -t[1])

[('CCS', 94528.68),
 ('GRU', 87192.63999999998),
 ('EZE', 81074.63999999997),
 ('BOG', 74644.45000000001),
 ('LHR', 69609.53000000003),
 ('LPA', 60483.92),
 ('MEX', 56316.73000000001),
 ('JFK', 53496.169999999984),
 ('TLV', 53436.220000000016),
 ('TFN', 50034.949999999866)]

In [98]:
madrid_coupons\
    .map(lambda coupon: (coupon[3], (coupon[4], 1)))\
    .reduceByKey(lambda t1, t2: (t1[0] + t2[0], t1[1] + t2[1]))\
    .takeOrdered(10, key=lambda t: -t[1][0] / t[1][1])

[('V0', (81271.48, 15)),
 ('AC', (3703.1000000000004, 5)),
 ('KE', (8950.84, 13)),
 ('SV', (25999.189999999995, 47)),
 ('OB', (4819.54, 9)),
 ('AR', (10784.14, 21)),
 ('AV', (70680.63000000008, 157)),
 ('AM', (8373.95, 19)),
 ('C2', (795.74, 2)),
 ('LA', (33815.880000000005, 89))]