# First steps in PySpark 

In this notebook we will learn the fundamentals of functional programming, as well as the basic abstraction of a distributed object in Spark, the RDD. The notebook has been divided into two parts:

Part 1: map/reduce basics

Part 2: Work with RDD and Pair RDD abstractions 

<a href = "http://yogen.io"><img src="http://yogen.io/assets/logo.svg" alt="yogen" style="width: 200px; float: right;"/></a>

# Part 1: map/reduce basics

![Hadoop Logo](https://upload.wikimedia.org/wikipedia/commons/thumb/0/0e/Hadoop_logo.svg/220px-Hadoop_logo.svg.png)
# **Apache Hadoop (MapReduce)**

It is an open source software framework written in Java for distributed storage and distributed processing of very large data sets on computer clusters built from commodity hardware. All the modules in Hadoop are designed with a fundamental assumption that hardware failures (of individual machines, or racks of machines) are common and thus should be automatically handled in software by the framework.

The core of Apache Hadoop consists of a storage part (Hadoop Distributed File System (HDFS)) and a processing part (MapReduce). Hadoop splits files into large blocks and distributes them amongst the nodes in the cluster. To process the data, Hadoop MapReduce transfers packaged code for nodes to process in parallel, based on the data each node needs to process. This approach takes advantage of data locality — nodes manipulating the data that they have on hand — to allow the data to be processed faster and more efficiently than it would be in a more conventional supercomputer architecture that relies on a parallel file system where computation and data are connected via high-speed networking.

![caption](http://d152j5tfobgaot.cloudfront.net/wp-content/uploads/2012/07/mapreduce.png)

Since data and computation are distributed, we should avoid the use of variables, i.e. mutable data. Thus, in contrast to impertaive programming, we shall use the functional approach (lambda calculus).

### The goal of the following excercises is to understand basic lambda calculus with python.

### (1a) Functional programming in Python

So, what is Functional Programming? From Wikipedia: 

« …a  programing paradigm that treats computation as the evaluation of mathematical functions and **avoids changing-state and mutable  data**.»

It´s based upon Lambda calculus, wich consist of:
 * Function definition (declaration of expressions)
 * Function application (evaluation of those expressions)
 * Recursion (iteration)

We have already used this in python!!! :)

Recall the typical "lambda x: x+1" we have been using as the first argument of map, reduce and filter methods:
 * **map** maps each value in the input collection to a different value. It´s just the classical mathematical funciton we are used to!
 * **reduce** takes two values from the input collection and returns a new value (of the same type) by appliying a commutative operation to them. 
 * **filter** filters the elements in the input collection according to a certain (boolean) criteria.
 

**Mapping**

`map` is a Higher Order Function (HOF) that takes a function f and a sequence and returns a new sequence formed by applying f to each element in the original sequence. 

![map](https://cosminpupaza.files.wordpress.com/2015/10/map.png?w=505)

Very often we write lambdas, anonymous functions, to use in defining simple transformations with `map`:


In [1]:
input_list = [12, 56, 5, 2]

list(map(lambda x: x ** 2, input_list)) # No nos devuelve una lista, hay que poner list antes

[144, 3136, 25, 4]

In [2]:
list(map(lambda string: string + "s", ["coche", "moto"]))

['coches', 'motos']

We can also use named functions, of course. Note that we pass them as arguments to `map`, not execute them. You can think of them as tools we give to `map` for it to use.

In [3]:
def plural(string):
    return string + "s"

list(map(plural, ["coche", "moto"]))

['coches', 'motos']

Consider the following very common pattern: create empty list, append transformed elements one by one:

In [4]:
input_sequence = ["coche", "moto"]
result = []

for element in input_sequence:
    result.append(plural(element))
    
result

['coches', 'motos']

We can abstract over every possible list by defining a function that takes the list as an argument:

In [5]:
def pluralize(input_sequence): 
    result = []
    for element in input_sequence:
        result.append(plural(element))
    
    return result

pluralize(["coche", "moto"])
pluralize(["mantis_shrimp", "sand_flea", "baboon"])

['mantis_shrimps', 'sand_fleas', 'baboons']

Consider this other piece of code: we are doing a very similar thing, but with a different function. A lot of the code is common between the two little programs.

Wouldn't it be convenient if we could abstract over every function, as we did over every list?

In [6]:
def initial(word):
    return word[:1]

input_sequence = ["coche", "moto"]
result = []

for element in input_sequence:
    result.append(initial(element))

result

['c', 'm']

We can, taking the function to be applied as an argument. This is because Python treats functions as first-class objects, so they can be used as arguments to other functions.

In [7]:
def process_seq(f, input_sequence):
    result = []

    for element in input_sequence:
        result.append(f(element))
    
    return result

In [8]:
vehicles = ["moto", "coche"]
absurd_animals = ["mantis_shrimp", "sand_flea", "baboon"]

process_seq(initial, absurd_animals)

['m', 's', 'b']

In [9]:
process_seq(plural, vehicles)

['motos', 'coches']

In [10]:
process_seq(lambda s: s[-1:], vehicles)

['o', 'e']

Esta última función que hemos escrito es justo lo que hace map. Coge una función y se la aplica a una lista

That function we just wrote, `process_list`, is exactly what `map` is

In [11]:
list(map(initial, absurd_animals))

['m', 's', 'b']

**Filtering**

`filter` encodes another very common pattern: building a sequence from some of the elements of an input sequence, deciding whether to include each based on the result of evaluating a function, often called the `predicate`, on each of the elements.


![filter](https://cosminpupaza.files.wordpress.com/2015/11/filter.png?w=405)

Filter es como map y lo que hace es decidir que se queda y que no. Devuelve un booleano

In [12]:
input_list = [17, 39, 41]

list(filter(lambda x: x > 20, input_list)) # Al igual que en map hay que ponerle un list para que nos devuelva la lista

[39, 41]

Again, we can use either lambdas or named functions.

In [13]:
def greater_than_20(number):
    return number > 20

list(filter(greater_than_20, input_list))

[39, 41]

In [14]:
pred = lambda x: x > 20 # Esto es un literal de función
pred(50)

True

#### Exercise

write a filter to extract those words that have more than 3 letters from `collection_of_strings`

In [15]:
collection_of_strings = ["Holi", "que", "tal", "te", "van", "las", "cosas?"]

list(filter(lambda word: len(word) > 3, collection_of_strings))

['Holi', 'cosas?']

#### Exercise

Write a filter to extract prime numbers from the following list of integers:

```python
[12, 17, 19, 18, 23, 24]
```

You will need to write a function that determines whether a single number is a prime or not.

In [16]:
input_list = [1, 2, 3, 12, 17, 19, 18, 23, 24, 42]

def isprime(number):
    possible_factors = range(2, number)
    real_factors = filter(lambda n: number % n == 0, possible_factors)
    
    return len(list(real_factors)) == 0

list(filter(isprime, input_list))

[1, 2, 3, 17, 19, 23]

In [17]:
list(map(isprime, input_list))

[True, True, True, False, True, True, False, True, False, False]

In [18]:
list(map(lambda x: x ** 2, filter(lambda x: x % 2 == 0, input_list)))

[4, 144, 324, 576, 1764]

`map` and `filter` are not  used explicitly very often in Python, in part because of their cumbersome syntax. 

Nevertheless, there is a well-known feature in Python that is in fact just a convenient way to write `map`s and `filter`s: **list comprehensions**:

In [19]:
number_list = [1, 2, 3, 12, 17, 19, 18, 23, 24, 25, 42]

[x ** 2 for x in number_list if x % 2 == 0]

[4, 144, 324, 576, 1764]

The above list comprehension is exactly equivalent to:

Which is a pipeline of two steps: first a `filter`, then a `map`

**Reducing** 

`reduce` is the third basic foundation of functional programming. Reduce uses a function, often called the `combiner`, to transform a sequence of elements of type T (that is, any type) into a single T. The combiner must take 2 Ts and return only one T.


Recall it must be commutative! Think about the importance of this when parallelizing computations

![reduce](https://cosminpupaza.files.wordpress.com/2015/11/reduce.png?w=500)

In [20]:
from functools import reduce

reduce(lambda x, y: x + y, number_list)

186

In [21]:
reduce(lambda x, y: x * y, number_list)

242625196800

#### Exercise

Write a function_x such that, when applied with `reduce`, that reduce returns the highest number in a list. Don't use the built-in `max`!

In [22]:
number_list = [1, 2, 3, 12, 17, 50, 23, 24, 25, 42]

def function_x(x, y):
    return x if x > y else y
    
reduce(function_x, number_list)

50

## (1b) Exercise: Calculate the mean of a collection of real numbers using map/reduce
Recall:

$$\bar x = \frac{\sum_{i=1}^{N} x_i}{N} $$

It´s straightforward to do this with python built-in mehots sum() and len(). However, how would you do that with map/reduce? We have already shown how to sum the elements of an array. Thus, you have to calculate the length of the array. For this:
 * Create another array of the same size, consisting of 1s.
 * Sum the elements of that array

#### First part

* Do a reduce to do the sum, and a different map-reduce to get the length

In [23]:
number_list = [1, 2, 3, 12, 17, 50, 23, 24, 25, 42]

In [24]:
suma = reduce(lambda x, y: x + y, number_list)
suma

199

In [25]:
longitud = reduce(lambda x, y: x + y, list(map(lambda x: 1, number_list)))
longitud

10

In [26]:
media = suma / longitud
media

19.9

That was also a pipeline of two steps. It's equivalent to:

#### Combine them in one pass

Think about what it would mean to finish one computation and only then do another in a distributed environment.

Combine both in one pass:

In [27]:
def combiner(tuple_1, tuple_2):
    
    partial_total = tuple_1[0] + tuple_2[0]
    partial_length = tuple_1[1] + tuple_2[1]
    
    return (partial_total, partial_length)

(total, count) = reduce(combiner, map(lambda n: (n, 1), number_list))
mean = total / count
mean

19.9

## (1c) Exercise: Calculate the standard deviation of a collection of real numbers

We'll use the typical definition of the standard deviation:

$$\sigma_x^2 = \frac{\sum_{i=1}^{N} (x_i-\bar x)^2}{N-1}$$

For this, use the *mean* and *count* variables from the previous excercise.

With `map` and `reduce`:

In [28]:
numerator = reduce(lambda x, y: x + y, map(lambda n: (n - mean) ** 2, number_list))
numerator

2480.8999999999996

In [29]:
variance = numerator / (count - 1)
variance

275.6555555555555

In [30]:
import numpy as np
np.var(number_list, ddof = 1)

275.65555555555557

In standard idiomatic Python we'd do it like this:

In [31]:
temp = []
for n in number_list:
    sqdiff = (mean - n) ** 2
    temp.append(sqdiff)
    
sum(temp)

2480.8999999999996

#### Calculate the standard deviation in one pass.

Again, think about what it would mean to finish one computation and only then do another in a distributed environment.


If we can, we should avoid broadcasting variables all over the cluster. In this case, the following alternative definition of standard deviation can come in handy:

$$\sigma_x^2 = \frac{\sum_{i=1}^{N} (x_i-\bar x)^2}{N-1} =
\frac{\sum_{i=1}^{N} (x_i^2+{\bar x}^2-2x_i\bar x)}{N-1} =
\frac{1}{N-1}\left(\sum_i x_i^2-N\bar x^2\right)$$



In [32]:
def combiner_of_three(tuple_1, tuple_2):
    
    partial_length = tuple_1[0] + tuple_2[0]
    partial_total = tuple_1[1] + tuple_2[1]
    partial_sq = tuple_1[2] + tuple_2[2]
    
    return (partial_length, partial_total, partial_sq)

(length, total, sum_squares) = reduce(combiner_of_three, map(lambda n: (1, n, n ** 2), number_list))

varianza = 1 / (length - 1) * (sum_squares - length * (total / length) ** 2)
varianza

275.6555555555556

#### Side note

In Python 2, map and filter returned lists. In Python 3, they return generators, which are lazy collections. They are somewhat similar to files in that they can be depleted of elements after iterating through them.

## (1d) Twe 'word-count' problem: creating histograms
Given a set of keys in an input collection, calculate the frequency of each key. 

In order to understand better how map/reduce works, we will implement this simple calculation in several forms.

We are going to use as test case a list of elements chosen from a small set:

In [33]:
import random

random.seed(42)

# Esta es una buena manera de crear 100 valores aleatorios. Lo usaremos como datos de entrada para hacer el word count
hats = [ random.choice(["panama", "sombrero", "fedora", "top hat"]) for _ in range(100) ]
hats

['panama',
 'panama',
 'fedora',
 'sombrero',
 'sombrero',
 'sombrero',
 'panama',
 'panama',
 'top hat',
 'panama',
 'panama',
 'panama',
 'sombrero',
 'sombrero',
 'panama',
 'sombrero',
 'top hat',
 'sombrero',
 'top hat',
 'fedora',
 'panama',
 'sombrero',
 'top hat',
 'fedora',
 'fedora',
 'sombrero',
 'sombrero',
 'fedora',
 'panama',
 'panama',
 'top hat',
 'panama',
 'fedora',
 'fedora',
 'fedora',
 'panama',
 'top hat',
 'panama',
 'top hat',
 'panama',
 'fedora',
 'fedora',
 'sombrero',
 'panama',
 'panama',
 'sombrero',
 'fedora',
 'panama',
 'sombrero',
 'panama',
 'top hat',
 'fedora',
 'top hat',
 'fedora',
 'sombrero',
 'fedora',
 'fedora',
 'sombrero',
 'fedora',
 'panama',
 'sombrero',
 'sombrero',
 'sombrero',
 'top hat',
 'top hat',
 'fedora',
 'sombrero',
 'fedora',
 'panama',
 'sombrero',
 'panama',
 'fedora',
 'top hat',
 'fedora',
 'panama',
 'sombrero',
 'fedora',
 'sombrero',
 'top hat',
 'top hat',
 'top hat',
 'sombrero',
 'fedora',
 'sombrero',
 'sombrero',


### (1d.1) Simple approach

 * Start with an empty dict
 * If a new key is not present in the dict, create it.
 * Otherwise, increase the frequency of the key by one.

In [34]:
result = {}

for hat in hats:
    if hat not in result:
        result[hat] = 1
    else:
        result[hat] += 1
        
result

{'fedora': 24, 'panama': 27, 'sombrero': 29, 'top hat': 20}

### (1d.2) Map/reduce

* Recall that *reduce* applies an operation to 2 elements of the same type, and returns another element of that type. Thus, the first thing to do is to map our collection to the type of the output. 
 
* The final result will be a dictionary of words in the vocabulary and counts. Therefore, we need to map each input word to this type.
 
* Then, we have to define a function, the combiner, that combines these dictionaries two at a time.

In [35]:
def dict_combiner(a_dict, b_dict):
    
    result = {}
    keys = set(a_dict.keys()).union(set(b_dict.keys()))
    
    for key in keys:
        result[key] = a_dict.get(key, 0) + b_dict.get(key, 0)
    
    return result

reduce(dict_combiner, map(lambda string: {string: 1}, hats))

{'fedora': 24, 'panama': 27, 'sombrero': 29, 'top hat': 20}

# Part 2: Spark. Work with RDD and Pair RDD abstractions 

![drawing](https://prateekvjoshi.files.wordpress.com/2015/10/1-main4.png)

# ** Apache Spark**

Apache Spark is an open source cluster computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop's two-stage disk-based MapReduce paradigm, Spark's multi-stage in-memory primitives provides performance up to 100 times faster for certain applications.

![](http://image.slidesharecdn.com/sparkandshark-120620130508-phpapp01/95/spark-and-shark-8-728.jpg?cb=1340197567)

By allowing user programs to load data into a cluster's memory and query it repeatedly, Spark is well-suited to machine learning algorithms.
![](http://spark.apache.org/images/logistic-regression.png)

Spark comes with a number of components that provide flexibility and generality.

<img src="http://spark.apache.org/images/spark-stack.png" alt="Drawing" style="width: 500px;"/>


## In this part, we keep on working on the word-count example, this time with spark. The basic abstraction of Spark is the Resilient Distributed Dataset (RDD):

#### «RDDs are fault-tolerant, parallel data structures that let users explicitly persist intermediate results in memory, control their partitioning to optimize data placement, and manipulate them using a rich set of operators.»

 * Read only, partitioned collection of records (immutable).
 * Stores the transformations used to build a dataset (its lineage), instead of the data itself. This property ensures fault-tolerance.
 * Users can control partitioning and persistence (caching).
 * RDDs are statically typed.
 * … and yes, everything is written in scala ;p. So you could use learning a bit of it!
 
<img src="http://eng.trueaccord.com/wp-content/uploads/2014/10/scala-logo.png" alt="Drawing" style="width: 200px;"/>

#### We will be trying to understand this abstraction with simple examples, using the [Python API](http://spark.apache.org/docs/latest/api/python/index.html)




### ** (2a) Create a base RDD: parallelize, actions and transformations **
We'll start by generating a base RDD by using a Python list and the `sc.parallelize` method.  Then we'll print out the type of the base RDD.

We use sc.parallelize to convert a standard Python collection into an RDD.

In [36]:
sc # Esto se genera en nuestro entorno cuando ejecutamos pyspark en nuestro terminal

In [37]:
hats

['panama',
 'panama',
 'fedora',
 'sombrero',
 'sombrero',
 'sombrero',
 'panama',
 'panama',
 'top hat',
 'panama',
 'panama',
 'panama',
 'sombrero',
 'sombrero',
 'panama',
 'sombrero',
 'top hat',
 'sombrero',
 'top hat',
 'fedora',
 'panama',
 'sombrero',
 'top hat',
 'fedora',
 'fedora',
 'sombrero',
 'sombrero',
 'fedora',
 'panama',
 'panama',
 'top hat',
 'panama',
 'fedora',
 'fedora',
 'fedora',
 'panama',
 'top hat',
 'panama',
 'top hat',
 'panama',
 'fedora',
 'fedora',
 'sombrero',
 'panama',
 'panama',
 'sombrero',
 'fedora',
 'panama',
 'sombrero',
 'panama',
 'top hat',
 'fedora',
 'top hat',
 'fedora',
 'sombrero',
 'fedora',
 'fedora',
 'sombrero',
 'fedora',
 'panama',
 'sombrero',
 'sombrero',
 'sombrero',
 'top hat',
 'top hat',
 'fedora',
 'sombrero',
 'fedora',
 'panama',
 'sombrero',
 'panama',
 'fedora',
 'top hat',
 'fedora',
 'panama',
 'sombrero',
 'fedora',
 'sombrero',
 'top hat',
 'top hat',
 'top hat',
 'sombrero',
 'fedora',
 'sombrero',
 'sombrero',


In Spark, `map` is a method of the RDD type.

In [38]:
my_first_rdd = sc.parallelize(hats)
my_first_rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

In [39]:
hat_plurals = my_first_rdd.map(plural)
hat_plurals

PythonRDD[1] at RDD at PythonRDD.scala:48

**Nothing has actually happened!**

`parallellize` tells spark to distribute the data, but this is not actually done until we perform some action.

Possible actions include counting, collecting, reducing, taking, etc. Take a look at the [Spark programming guide](http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions)


In [40]:
hat_plurals.take(5) # únicamente se aplica la función plurals 5 veces. Hasta que no le pedimos respuesta hat_plurals 
                    # no ejecuta la función, la tiene como guardada para aplicarla

['panamas', 'panamas', 'fedoras', 'sombreros', 'sombreros']

#### Exercise

Calculate, from the rdd generated from hats, how many hats we have whose name does **not** start with an 's'

In [41]:
# Como hacerlo con filter sin spark
len(list(filter(lambda x: x[0] != "s", hats)))

71

In [42]:
hat_not_s = my_first_rdd.filter(lambda hat: hat[0] != "s")
hat_not_s.count()

71

A very common way to write these chained operations in Spark is to put each on a line. Due to Python's syntax, we need to escape the newline character with a backslash.

In [43]:
hat_plurals\
    .filter(lambda x: x[0] != "s")\
    .count()

71

### Actions and Transformations

There are two main types of operations in Spark:
[Actions](http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions) and [Transformations](http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#transformations).

**Transformations** produce an RDD. Some of the most important are `map`, `filter`, and `reduceByKey`. `sc.parallelize` and `sc.textFile` are not technically Transformations but you can think of them as such in one very important respect: they are *lazyly evaluated*. That is, when we perform a Transformation, we are only describing the operation to be performed at some point in the future. They attach a node to the execution graph.

**Actions** are implemented as methods on an RDD, and return an object of a type **that is not an RDD**. When we perform an Action, we give the order to execute the previously described transformations: we trigger the execution of the graph. Some of the most important are `reduce`, `collect`, `take`, `takeOrdered`, and `count`.

In [44]:
my_file = sc.textFile('tocoto')
my_file

tocoto MapPartitionsRDD[6] at textFile at NativeMethodAccessorImpl.java:0

In [45]:
my_file.take(5)

Py4JJavaError: An error occurred while calling o66.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/dsc/Clases_Master/Material_Clase/Spark/tocoto
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [46]:
hat_plurals.map(lambda x: whatever(x))

PythonRDD[7] at RDD at PythonRDD.scala:48

In [47]:
hat_plurals.map(lambda x: whatever(x)).collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-47-4acb0762661e>", line 1, in <lambda>
NameError: name 'whatever' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-47-4acb0762661e>", line 1, in <lambda>
NameError: name 'whatever' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


### **(2b) Persisting and the RDD lineage**

So far, we have seen that Spark RDDs are *lazyly evaluated*, i.e. nothing is actually done until an action is performed. In the RDD, the set of transformations to be applied are remembered: this is known as its *lineage*. It has the important consequence of making Spark RDDs *fault tolerant* automatically.

![](http://images.slideplayer.com/14/4499833/slides/slide_10.jpg) 

It might be interesting to store some intermediate results, though: perhaps because we want to apply several different transformations starting from that point, or because we are going to apply an iterative computation (as is customary in machine learning algorithms). For this, Spark has [several ways of persisting](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)

In [48]:
from pyspark import StorageLevel

# Some of the possible storage levels
StorageLevel.DISK_ONLY
StorageLevel.MEMORY_AND_DISK_2
StorageLevel.MEMORY_ONLY_SER

StorageLevel(False, True, False, False, 1)

In [49]:
hat_plurals.cache()

PythonRDD[1] at RDD at PythonRDD.scala:48

In [50]:
hat_plurals.map(lambda x: 2).sum()

200

In [51]:
hat_plurals.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [52]:
# Si queremos cambiar el nivel de almacenamiento no se puede hacer así
# hat_plurals.persist(StorageLevel.MEMORY_AND_DISK_2)

# Habría que hacerlo así
hat_plurals.unpersist()
hat_plurals.persist(StorageLevel.MEMORY_AND_DISK_2)

PythonRDD[1] at RDD at PythonRDD.scala:48

In [53]:
hat_plurals.getStorageLevel()

StorageLevel(True, True, False, False, 2)

Cuando estamos procesando datos grandes se realizan procesos laboriosos. Entonces esos procesos los guardaríamos para volver a utilizarlos y no tener que hacerlos de nuevo. Por eso el StorageLevel

StorageLevel cannot be changed after it has been set. We need to `unpersist` first.

### **(2c) Partitioning **

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.

To get the number of partitions of an RDD, just use `getNumPartitions()` on your RDD. You can change the partitions during RDD creation (with `parallelize(collection,numPartitions)` or `fromTextFile(file,numPartitions)`), or afterwards with methods like `repartition(), coalesce()`, etc.

In [54]:
hat_plurals.getNumPartitions()

2

hat_plurals tiene dos particiones. Esto quiere decir que tenemos el cluster partido en dos nodos.
Podemos especificar el número de particiones que queramos que tenga nuestro RDD

Podemos repartirlo como queramos

In [55]:
repartitioned = hat_plurals.repartition(5)
repartitioned.getNumPartitions()

5

We can see the partitions using [glom()](http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=glom#pyspark.RDD.glom): it retruns an RDD created by coalescing all elements within each partition into a list.

Podemos ver en qué consisten cada partición. Las particiones no tienen porque ser homogéneas

In [56]:
repartitioned.glom().collect()

[['fedoras',
  'fedoras',
  'sombreros',
  'panamas',
  'panamas',
  'sombreros',
  'fedoras',
  'panamas',
  'sombreros',
  'panamas',
  'sombreros',
  'top hats',
  'panamas',
  'panamas',
  'panamas',
  'sombreros',
  'sombreros',
  'top hats',
  'panamas',
  'top hats'],
 ['panamas',
  'panamas',
  'fedoras',
  'sombreros',
  'sombreros',
  'sombreros',
  'panamas',
  'panamas',
  'top hats',
  'panamas',
  'top hats',
  'fedoras',
  'top hats',
  'fedoras',
  'sombreros',
  'fedoras',
  'fedoras',
  'sombreros',
  'fedoras',
  'panamas'],
 ['panamas',
  'panamas',
  'sombreros',
  'sombreros',
  'panamas',
  'sombreros',
  'top hats',
  'sombreros',
  'top hats',
  'fedoras',
  'sombreros',
  'sombreros',
  'sombreros',
  'top hats',
  'top hats',
  'fedoras',
  'sombreros',
  'fedoras',
  'panamas',
  'sombreros'],
 ['panamas',
  'sombreros',
  'top hats',
  'fedoras',
  'fedoras',
  'sombreros',
  'sombreros',
  'fedoras',
  'panamas',
  'panamas',
  'panamas',
  'fedoras',
  't

Partitions are one of the most powerfull concepts in Spark: you can decide how to distribute your data so it can fit in memory, and more importantly, you can perform computations on each partition *before* speaking to other partitions. This can have an enormous impact on performance

### **(2c) Pair RDDs: *grouping* strategies in Spark**

The next step in writing our word counting program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple (k, v) where k is the key and v is the value. In this example, we will create a pair consisting of ('<word>', 1) for each word element in the RDD, as we did in the map/reduce version of the histogram in Python, section (1d.2).

We can create the pair RDD using the map() transformation with a lambda() function to create a new RDD.

In [57]:
my_first_rdd.take(5)

['panama', 'panama', 'fedora', 'sombrero', 'sombrero']

In [58]:
pair_rdd = my_first_rdd.map(lambda word: (word, 1))
pair_rdd.take(5)

[('panama', 1), ('panama', 1), ('fedora', 1), ('sombrero', 1), ('sombrero', 1)]

### ** (2c.1) `groupByKey()` approach **
An approach you might first consider (we'll see shortly that there are better ways) is based on using the [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) transformation. As the name implies, the `groupByKey()` transformation groups all the elements of the RDD with the same key into a single list in one of the partitions. There are two problems with using `groupByKey()`:
  + The operation requires a lot of data movement to move all the values into the appropriate partitions.
  + The lists can be very large. Consider a word count of English Wikipedia: the lists for common words (e.g., the, a, etc.) would be huge and could exhaust the available memory in a worker.
 
Use `groupByKey()` to generate a pair RDD of type `('word', iterator)`. Next, sum the iterator using a `map()` transformation.  The result should be a pair RDD consisting of (word, count) pairs.

In [59]:
groups = pair_rdd.groupByKey()
groups.collect()

[('panama', <pyspark.resultiterable.ResultIterable at 0x7f4f8d20bef0>),
 ('fedora', <pyspark.resultiterable.ResultIterable at 0x7f4f8d20b7f0>),
 ('sombrero', <pyspark.resultiterable.ResultIterable at 0x7f4f8d216048>),
 ('top hat', <pyspark.resultiterable.ResultIterable at 0x7f4f8d20bf60>)]

In [60]:
groups.map(lambda tuple_kv: (tuple_kv[0], len(tuple_kv[1]))).collect()

[('panama', 27), ('fedora', 24), ('sombrero', 29), ('top hat', 20)]

### ** (2c.2)  `reduceByKey` approach **
A better approach is to start from the pair RDD and then use the [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) transformation to create a new pair RDD. 

The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.

![](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/images/reduce_by.png)

In [61]:
from operator import add
pair_rdd = my_first_rdd.map(lambda word: (word, 1))
pair_rdd.reduceByKey(add).collect()

[('panama', 27), ('fedora', 24), ('sombrero', 29), ('top hat', 20)]

In [62]:
pair_rdd.reduceByKey(lambda x, y: x + y).collect()

[('panama', 27), ('fedora', 24), ('sombrero', 29), ('top hat', 20)]

In [63]:
import operator
from operator import add

help(operator)

Help on module operator:

NAME
    operator - Operator interface.

MODULE REFERENCE
    https://docs.python.org/3.6/library/operator
    
    The following documentation is automatically generated from the Python
    source files.  It may be incomplete, incorrect or include features that
    are considered implementation detail and may vary between Python
    implementations.  When in doubt, consult the module reference at the
    location listed above.

DESCRIPTION
    This module exports a set of functions implemented in C corresponding
    to the intrinsic operators of Python.  For example, operator.add(x, y)
    is equivalent to the expression x+y.  The function names are those
    used for special methods; variants without leading and trailing
    '__' are also provided for convenience.

CLASSES
    builtins.object
        attrgetter
        itemgetter
        methodcaller
    
    class attrgetter(builtins.object)
     |  attrgetter(attr, ...) --> attrgetter object
     |  
     

Ejemplo de como concatenar varias con un filter previo

In [64]:
my_first_rdd\
    .filter(lambda hat: hat[0] != "s")\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda x, y: x + y)\
    .collect()

[('panama', 27), ('fedora', 24), ('top hat', 20)]

#### Summary: word count in Spark:

In [65]:
my_first_rdd\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda x, y: x + y)\
    .collect()

[('panama', 27), ('fedora', 24), ('sombrero', 29), ('top hat', 20)]

### ** (2c.3)  `combineByKey` approach: the mother of dragons **

The [combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None)](http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=combinebykey#pyspark.RDD.combineByKey) method is a generic (and powerful!) function to combine the elements for each key using a custom set of aggregation functions.

It turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C. Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).

Users provide three functions:

#### * createCombiner, which turns a V into a C (e.g., creates a one-element list)
#### * mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
#### * mergeCombiners, to combine two C’s into a single one.

## (2d) Apply word count to a file

### ** (2d.1) Load a text file **
For the next part of this lab, we will use the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). To convert a text file into an RDD, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation and change all text to lowercase.  Since the file is large we use `take(15)`, so that we only print(15 lines.)

In [66]:
!wget -v http://www.gutenberg.org/files/100/100-0.txt -O shakespeare.txt

--2018-09-14 17:07:57--  http://www.gutenberg.org/files/100/100-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5852404 (5,6M) [text/plain]
Saving to: ‘shakespeare.txt’


2018-09-14 17:08:20 (246 KB/s) - ‘shakespeare.txt’ saved [5852404/5852404]



In [67]:
! head shakespeare.txt

﻿
Project Gutenberg’s The Complete Works of William Shakespeare, by William
Shakespeare

This eBook is for the use of anyone anywhere in the United States and
most other parts of the world at no cost and with almost no restrictions
whatsoever.  You may copy it, give it away or re-use it under the terms
of the Project Gutenberg License included with this eBook or online at
www.gutenberg.org.  If you are not located in the United States, you’ll
have to check the laws of the country where you are located before using


### ** (2d.2) Capitalization and punctuation **
Real world files are more complicated than the data we have been using in this lab. Some of the issues we have to address are:
  + Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word).
  + All punctuation should be removed.
  + Any leading or trailing spaces on a line should be removed.
 
Define the function `removePunctuation` that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces.  Use the Python [re](https://docs.python.org/2/library/re.html) module to remove any text that is not a letter, number, or space. Reading `help(re.sub)` might be useful.

In [68]:
import re

test_line = "This eBook is for the use of anyone anywhere in the United States and..."

# [^\w] esto significa todas las expresiones que no formen palabras y espacios

def clean(line):
    return re.sub("[^\w ]", "", line).lower()
    
clean(test_line)

'this ebook is for the use of anyone anywhere in the united states and'

In [69]:
clean_lines = sc.textFile("shakespeare.txt").map(clean)
clean_lines.take(25)

['',
 'project gutenbergs the complete works of william shakespeare by william',
 'shakespeare',
 '',
 'this ebook is for the use of anyone anywhere in the united states and',
 'most other parts of the world at no cost and with almost no restrictions',
 'whatsoever  you may copy it give it away or reuse it under the terms',
 'of the project gutenberg license included with this ebook or online at',
 'wwwgutenbergorg  if you are not located in the united states youll',
 'have to check the laws of the country where you are located before using',
 'this ebook',
 '',
 'see at the end of this file  content note added in 2017 ',
 '',
 '',
 'title the complete works of william shakespeare',
 '',
 'author william shakespeare',
 '',
 'release date january 1994 ebook 100',
 'last updated june 14 2018',
 '',
 'language english',
 '',
 'character set encoding utf8']

http://www.regular-expressions.info/quickstart.html

https://regex101.com/

### ** (2d.3) Words from lines **
Before we can use the `wordcount()` function, we have to address two issues with the format of the RDD:
  + The first issue is that  that we need to split each line by its spaces.
  + The second issue is we need to filter out empty lines.
 
Apply a transformation that will split each element of the RDD by its spaces. For each element of the RDD, you should apply Python's string [split()](https://docs.python.org/3/library/stdtypes.html#str.split) function. You might think that a `map()` transformation is the way to do this, but think about what the result of the `split()` function will be.

Check out the Spark Programming Guide for an alternative transformation that helps us here.

flatMap es la función que nos permite ir línea a línea y hacer split

In [70]:
words = clean_lines.flatMap(lambda line: line.split())
words.take(5)

['project', 'gutenbergs', 'the', 'complete', 'works']

### ** (2d.4) Remove empty elements **
The next step is to filter out the empty elements.  Remove all entries where the word is `''`.

In [71]:
words = words.filter(lambda word: word != "")
words

PythonRDD[48] at RDD at PythonRDD.scala:48

In [101]:
words.take(25)

['project',
 'gutenbergs',
 'the',
 'complete',
 'works',
 'of',
 'william',
 'shakespeare',
 'by',
 'william',
 'shakespeare',
 'this',
 'ebook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'in',
 'the',
 'united',
 'states',
 'and']

### (2d.5) Count the words and show the top 15

We know the drill at this point, don't we? We map to a tuple then `reduceByKey`

We can view the top 15 words by using the `takeOrdered()` action; however, since the elements of the RDD are pair tuples, we need a custom sort function that sorts using the value part of the pair rather than the key.

You'll notice that many of the words are common English words (know as stopwords).

Use our map reduce and and `takeOrdered()` to obtain the fifteen most common words and their counts.

In [72]:
counts = words\
    .map(lambda word: (word, 1))\
    .reduceByKey(add)\

In [73]:
counts.take(10)

[('project', 107),
 ('of', 18815),
 ('shakespeare', 26),
 ('this', 7185),
 ('ebook', 14),
 ('is', 9709),
 ('use', 362),
 ('anyone', 9),
 ('anywhere', 8),
 ('in', 12032)]

In [74]:
counts.takeOrdered(50, lambda tup: -tup[1])

[('the', 30002),
 ('and', 28358),
 ('i', 21867),
 ('to', 20816),
 ('of', 18815),
 ('a', 15992),
 ('you', 14437),
 ('my', 13191),
 ('in', 12032),
 ('that', 11781),
 ('is', 9709),
 ('not', 9067),
 ('with', 8518),
 ('me', 8270),
 ('for', 8187),
 ('it', 8174),
 ('his', 7572),
 ('be', 7365),
 ('this', 7185),
 ('your', 7079),
 ('he', 6804),
 ('but', 6771),
 ('have', 6268),
 ('as', 6179),
 ('thou', 5846),
 ('him', 5537),
 ('so', 5458),
 ('will', 5286),
 ('what', 4762),
 ('her', 4595),
 ('thy', 4357),
 ('all', 4216),
 ('by', 4093),
 ('no', 4076),
 ('do', 3912),
 ('shall', 3847),
 ('if', 3800),
 ('are', 3729),
 ('we', 3554),
 ('thee', 3381),
 ('on', 3316),
 ('our', 3296),
 ('lord', 3157),
 ('now', 3012),
 ('good', 2959),
 ('king', 2931),
 ('sir', 2900),
 ('from', 2899),
 ('o', 2765),
 ('at', 2743)]

# Practical

## ETL with airline coupon data

Load the data first: coupon data

In [75]:
!head coupon150720.csv

79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0
79062005698500,2,AUH,CDG,9W,9W,84.34,USD,1,H,H,6120,150905,OK,IAF0
79062005924069,1,CJB,MAA,9W,9W,60.0,USD,1,H,H,2768,150721,OK,IAA0
79065668570385,1,DEL,DXB,9W,9W,160.63,USD,2,S,S,0546,150804,OK,INA0
79065668737021,1,AUH,IXE,9W,9W,152.46,USD,1,V,V,0501,150803,OK,INA0
79062006192650,1,RPR,BOM,9W,9W,68.5,USD,1,K,K,2202,150720,OK,IAE0
79062006192650,2,BOM,RPR,9W,9W,68.5,USD,1,H,H,0377,150721,OK,IAE0
79062005733853,1,DEL,DED,9W,9W,56.16,USD,1,V,V,2839,150801,OK,INA0
79062005836987,1,ATL,LGA,AA,AA,28.3,USD,1,V,V,3237,150903,OK,INB0
79062005836987,2,LGA,EWR,,,0.0,USD,1,,,VOID,,,INA0


#### Exercise

Take fields 0, 2, 3, 4, and 6 from each line of coupons

In [76]:
lines = sc.textFile("coupon150720.csv")
lines.take(5)

['79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0',
 '79062005698500,2,AUH,CDG,9W,9W,84.34,USD,1,H,H,6120,150905,OK,IAF0',
 '79062005924069,1,CJB,MAA,9W,9W,60.0,USD,1,H,H,2768,150721,OK,IAA0',
 '79065668570385,1,DEL,DXB,9W,9W,160.63,USD,2,S,S,0546,150804,OK,INA0',
 '79065668737021,1,AUH,IXE,9W,9W,152.46,USD,1,V,V,0501,150803,OK,INA0']

In [77]:
test_line = "79062005924069,1,CJB,MAA,9W,9W,60.0,USD,1,H,H,2768,150721,OK,IAA0"

def coupon_from_line(line):
    
    fields = line.split(",")  
    return fields[0], fields[2], fields[3], fields[4], float(fields[6])
    
coupon_from_line(test_line)

('79062005924069', 'CJB', 'MAA', '9W', 60.0)

In [78]:
coupons = lines.map(coupon_from_line)
coupons.take(10)

[('79062005698500', 'MAA', 'AUH', '9W', 56.79),
 ('79062005698500', 'AUH', 'CDG', '9W', 84.34),
 ('79062005924069', 'CJB', 'MAA', '9W', 60.0),
 ('79065668570385', 'DEL', 'DXB', '9W', 160.63),
 ('79065668737021', 'AUH', 'IXE', '9W', 152.46),
 ('79062006192650', 'RPR', 'BOM', '9W', 68.5),
 ('79062006192650', 'BOM', 'RPR', '9W', 68.5),
 ('79062005733853', 'DEL', 'DED', '9W', 56.16),
 ('79062005836987', 'ATL', 'LGA', 'AA', 28.3),
 ('79062005836987', 'LGA', 'EWR', '', 0.0)]

#### Intermission: Stack traces in Spark

Stack traces in pyspark are quite convoluted because we have many layers of processing. Look for the python stack trace contained within the Py4JJavaError message.

#### Exercise

Keep only the amount. Get average, max, min and std

In [79]:
amounts = coupons.map(lambda coupon: coupon[4])
print("mean = ", amounts.mean())
print("max = ", amounts.max())
print("min = ", amounts.min())
print("std = ", amounts.stdev())

mean =  149.94532037167986
max =  6355194.0
min =  0.0
std =  9978.482086122693


In [80]:
amounts.reduce(lambda x, y: x if x > y else y)

6355194.0

In [81]:
amounts.reduce(lambda x, y: x if x < y else y)

0.0

In [82]:
amounts.cache()

PythonRDD[65] at RDD at PythonRDD.scala:48

In [83]:
count, total, sum_squares = amounts\
    .map(lambda amount: (1, amount, amount ** 2))\
    .reduce(lambda t1, t2: (t1[0] + t2[0], t1[1] + t2[1], t1[2] + t2[2]))
    
count, total, sum_squares

(1232662, 184831898.49994844, 122763999131054.75)

In [84]:
avg = total / count
avg

149.94532037164157

In [85]:
var = (sum_squares - count * (avg) ** 2) / count
var

99570104.7430839

Vemos que después de hacer el cache, cuando volvemos a ejecutar tarda mucho menos

#### All in one go!



In [102]:
amounts.take(5)

[56.79, 84.34, 60.0, 160.63, 152.46]

In [105]:
amounts_tuples = amounts.map(lambda amount: (amount, amount, amount ** 2, amount, 1))

In [107]:
amounts_tuples.take(5)

[(56.79, 56.79, 3225.1041, 56.79, 1),
 (84.34, 84.34, 7113.235600000001, 84.34, 1),
 (60.0, 60.0, 3600.0, 60.0, 1),
 (160.63, 160.63, 25801.9969, 160.63, 1),
 (152.46, 152.46, 23244.051600000003, 152.46, 1)]

In [115]:
from operator import add

tuple_a = (56.79, 56.79, 3225.1041, 56.79, 1)
tuple_b = (84.34, 84.34, 7113.235600000001, 84.34, 1)

def combiner(tuple_a, tuple_b):
    
    operations = (max, min, add, add, add)
    
    return tuple(op(a, b) for a, b, op in zip(tuple_a, tuple_b, operations))
    
combiner(tuple_a, tuple_b)

(84.34, 56.79, 10338.3397, 141.13, 2)

In [118]:
amounts_tuples.reduce(combiner)

(6355194.0, 0.0, 122763999131054.75, 184831898.49994844, 1232662)

In [125]:
coupons\
    .map(lambda coupon: coupon[4])\
    .map(lambda amount: (amount, amount, amount ** 2, amount, 1))\
    .reduce(combiner)

(6355194.0, 0.0, 122763999131054.75, 184831898.49994844, 1232662)

#### Exercise

Get stats on ticket amount for all tickets with destination MAD

You will need to extract ticket amounts with destination MAD, and then calculate:

1. Total ticket amounts per origin
2. Top 10 airlines by average amount

#### Part 1

We need to filter coupons with destination Madrid, and after that group on the origin. 

In [119]:
coupons.take(5)

[('79062005698500', 'MAA', 'AUH', '9W', 56.79),
 ('79062005698500', 'AUH', 'CDG', '9W', 84.34),
 ('79062005924069', 'CJB', 'MAA', '9W', 60.0),
 ('79065668570385', 'DEL', 'DXB', '9W', 160.63),
 ('79065668737021', 'AUH', 'IXE', '9W', 152.46)]

In [120]:
coupons_dest_MAD = coupons.filter(lambda x: x[2] == "MAD")

At this point, we only need the origin, to group on, and the value, to sum

In [121]:
coupons_dest_MAD = coupons_dest_MAD.map(lambda x: (x[1], x[4]))

In [122]:
coupons_dest_MAD.take(5)

[('BRU', 21.02), ('BRU', 27.66), ('CDG', 46.97), ('CDG', 3.38), ('CDG', 26.02)]

We are now ready to reduce all amounts per origin.

In [123]:
coupons_MAD_group_orig = coupons_dest_MAD.reduceByKey(lambda x, y: x + y)

In [124]:
coupons_MAD_group_orig.takeOrdered(10, lambda tup: -tup[1])

[('CCS', 94528.68),
 ('GRU', 87192.63999999998),
 ('EZE', 81074.63999999997),
 ('BOG', 74644.45000000001),
 ('LHR', 69609.53000000003),
 ('LPA', 60483.92),
 ('MEX', 56316.73000000001),
 ('JFK', 53496.169999999984),
 ('TLV', 53436.220000000016),
 ('TFN', 50034.949999999866)]

In [93]:
coupons\
    .filter(lambda x: x[2] == "MAD")\
    .map(lambda x: (x[1], x[4]))\
    .reduceByKey(lambda x, y: x + y)\
    .takeOrdered(10, lambda tup: -tup[1])

[('CCS', 94528.68),
 ('GRU', 87192.63999999998),
 ('EZE', 81074.63999999997),
 ('BOG', 74644.45000000001),
 ('LHR', 69609.53000000003),
 ('LPA', 60483.92),
 ('MEX', 56316.73000000001),
 ('JFK', 53496.169999999984),
 ('TLV', 53436.220000000016),
 ('TFN', 50034.949999999866)]

#### Part 2

This is very similar, with two differences: we need to group on the airline, and to calculate averages we need to keep track of both the total amount on the coupons and the number of coupons.

In [94]:
coupons\
    .filter(lambda x: x[2] == "MAD")\
    .map(lambda x: (x[3], (x[4], 1)))\
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
    .map(lambda x: (x[0], (x[1][0], x[1][1]), x[1][0] / x[1][1]))\
    .takeOrdered(10, lambda tup: -tup[2])

[('V0', (81271.48, 15), 5418.098666666667),
 ('AC', (3703.1000000000004, 5), 740.6200000000001),
 ('KE', (8950.84, 13), 688.5261538461539),
 ('SV', (25999.189999999995, 47), 553.1742553191489),
 ('OB', (4819.54, 9), 535.5044444444444),
 ('AR', (10784.14, 21), 513.5304761904762),
 ('AV', (70680.63000000008, 157), 450.19509554140177),
 ('AM', (8373.95, 19), 440.73421052631585),
 ('C2', (795.74, 2), 397.87),
 ('LA', (33815.880000000005, 89), 379.9537078651686)]

Now we reduce tuples, summing each component of the tuple separately.

This reduceByKey generates an RDD of the form (k, (v1, v2)). We can map it like this:

Or we can use the mapValues method to only process the (v1, v2) part, ignoring the key for processing without discarding it.

In [132]:
coupons\
    .filter(lambda x: x[2] == "MAD")\
    .map(lambda x: (x[3], (x[4], 1)))\
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
    .mapValues(lambda t: round(t[0] / t[1], 2))\
    .takeOrdered(10, lambda t: -t[1])

[('V0', 5418.1),
 ('AC', 740.62),
 ('KE', 688.53),
 ('SV', 553.17),
 ('OB', 535.5),
 ('AR', 513.53),
 ('AV', 450.2),
 ('AM', 440.73),
 ('C2', 397.87),
 ('LA', 379.95)]

And we're ready to take our result.

#### Stretch: 

Get the totals from first origin without using the coupon number

In [95]:
coupons.take(5)

[('79062005698500', 'MAA', 'AUH', '9W', 56.79),
 ('79062005698500', 'AUH', 'CDG', '9W', 84.34),
 ('79062005924069', 'CJB', 'MAA', '9W', 60.0),
 ('79065668570385', 'DEL', 'DXB', '9W', 160.63),
 ('79065668737021', 'AUH', 'IXE', '9W', 152.46)]

We'll use groupBy this time, because we need to look at every coupon in a ticket in order to identify first origin.

In [96]:
coupons.groupBy(lambda x: x[0]).take(5)

[('79062005924069', <pyspark.resultiterable.ResultIterable at 0x7f4f8d20b6d8>),
 ('79065668737021', <pyspark.resultiterable.ResultIterable at 0x7f4f8d20b2e8>),
 ('79062005836987', <pyspark.resultiterable.ResultIterable at 0x7f4f8d1e1748>),
 ('79065668498895', <pyspark.resultiterable.ResultIterable at 0x7f4f8d20b470>),
 ('79062005563920', <pyspark.resultiterable.ResultIterable at 0x7f4f8d1e1a58>)]

A ResultIterable is a lazy collection, so we can take it and iterate over it, or turn it into a list to materialize it. This is useful to get a test case to test the function we will be writing.

In [97]:
coupons\
    .groupBy(lambda x: x[0])\
    .filter(lambda x: x[0] == '79062005698500')\
    .map(lambda x: (x[0], list(x[1])))\
    .collect()

[('79062005698500',
  [('79062005698500', 'MAA', 'AUH', '9W', 56.79),
   ('79062005698500', 'AUH', 'CDG', '9W', 84.34)])]

We are going to compare the set of origins to the set of destinations. The one that is in the first but not in the second should be the first origin.

In [98]:
coupons\
    .groupBy(lambda x: x[0])\
    .filter(lambda x: x[0] == '79062005698500')\
    .map(lambda x: list(x[1])[0][4])\
    .collect()

[56.79]

In [99]:
coupons\
    .groupBy(lambda x: x[0])\
    .filter(lambda x: x[0] == '79062005698500')\
    .mapValues(list)\
    .map(lambda x: x[1])\
    .map(lambda x: x[0:2])\
    .collect()

[[('79062005698500', 'MAA', 'AUH', '9W', 56.79),
  ('79062005698500', 'AUH', 'CDG', '9W', 84.34)]]

Now we have have our values, so we are ready to get the totals and sort. Before we reduceByKey, we need to reshape our tuples so the first origin is the key.

### Further resources

https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf