# Learning Apache Spark · PySpark

Apache Spark is an open-source cluster-computing framework. Spark Core is the foundation of the overall project. It is exposed by three APIs (Java, Scala and Python). In this book we will present the Python API called Pyspark.

![Contexto de Spark](../images/slide1.png "Contexto de Spark")

## 0 · Spark's Workflow
Once the driver triggers an **action** on a RDD (we'll see later what those concepts are).
* Spark submits a **Job**, which is formed by basic work units called **tasks** (*one per RDD partition*)  
* **Tasks** are executed in parallel, one per partition of a single RDD, which form a **Stage**. (*Physical Unit of Execution*)
* A **job** can be made of one or more **stages**, organized in DAG (*Directed Acyclic Graphs*).
![Jobs](../images/slide2.png)
![Stages](../images/slide3.png)

## 1 · Open the Spark Context
First of all, we have to create a **SparkContext** object, which is the ***entry point*** to the Spark cluster. The driver hosts the SparkContext.
We can set the Spark Application Configuration by **SparkConf** object. For the moment we are going to set just two parameters,  
1. **AppName**: Application Name
2. **Master**: Cluster's master URL  
 * ***'local'***: Connect to localhost with just one worker. No parallelization at all.
 * ***'local[***\****]'***: Connect to localhost with as many workers as cores on our machine.

In [2]:
import pyspark

conf = pyspark.SparkConf().setAppName('MiPrimeraSparkApp').setMaster('local[*]') #Creamos la configuración
sc = pyspark.SparkContext(conf = conf) #Abrimos el contexto de Spark

#### Spark WebUI
We have just started the session locally. Let's connect to the port 4040, configured by default by the SparkConf as the WebUI. In this interface we can navigate through:
* Jobs
* Stages
* Environment
* Executors  
  
We could change the webUI port with *spark.ui.port* parameter, e.g. ***SparkConf.set("spark.ui.port","8080")***.
![SparkwebUI](../images/SparkwebUI.png)

## 2 · RDDs: Resilient Distributed Datasets
The basic data structure in Spark is called RDD, **Resilient Distributed Dataset**. They are data collections that can be operated in parallel. Collections are cutted into **partitions** and distributed among the **worker nodes**. We can parallelize data collections using different methods.
### Internal Structures
1. **.parallelize(** **):** Python collections, such as ***lists*** or ***tuples***
2. **.range(start, end = None,step = 1, numSlices = None):** It creates an ***integer RDD*** which contains elements from start to end, increased by step every element. If it is called with a single argument, start is set to 0

In [3]:
data = ['Pepa','1812-03-19',20,True]

distributedData = sc.parallelize(data)
print(distributedData.collect())

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 5)
# numSlices=5, RDD's partition number
print(wordsRDD.collect())

wordsRDD.getNumPartitions()

['Pepa', '1812-03-19', 20, True]
['cat', 'elephant', 'rat', 'rat', 'cat']


5

In [3]:
secuencia = sc.range(2,12,2)
secuencia.collect()

[2, 4, 6, 8, 10]

### External Structures
1. **.textFile(name, minPartitions=None, use_unicode=True):** It creates a ***string RDD*** from a ***text file***
2. **.pickleFile(name, minPartitions=None):** It loads an RDD previously saved using ***.saveAsPickleFile()***
3. **.hadoopFile(** **):**
  
#### Recap. Data formats.
1. **Archivos de texto:** 
    * Delimited and readable text files
    * Inefficient reading
    * It does not support compression
2. **SequenceFile:** 
    * Binary data structure for key-value pairs datasets
    * *Row-based*
3. **Apache (Hadoop Distributed File System)**
    1. **Avro:** 
        * *Row-based*, binary and compact data format
        * Based on JSON embedded schemas 
    2. **Parquet:** 
        * *Column-based*, binary data format
        * Based on embedded schema
        * Structure based on row groups (50Mb < row.group < 1Gb) and column chunk.
        * It supports compression
  
![EstructuraDatos](../images/avroparquet.png)

In [4]:
Shakespeare = sc.textFile("../data/shakespeare.txt",1) #minPartitions=1
Shakespeare.take(5)

['The Project Gutenberg EBook of The Complete Works of William Shakespeare, by',
 'William Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or']

In [5]:
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) # Para entender los pair RDD, muévase más adelante.

rdd.saveAsSequenceFile("../data/sequencefile")
rddseqFile = sc.sequenceFile("../data/sequencefile")

print(rddseqFile.collect())

[(2, 'aa'), (1, 'a'), (3, 'aaa')]


In [6]:
distributedData.saveAsPickleFile("../data/picklefile", 5) # El 5 corresponde al número mínimo de particiones.
secuenciaRecuperada = sc.pickleFile("../data/picklefile",3)

secuenciaRecuperada.collect()

['1812-03-19', 20, 'Pepa', True]

In [7]:
# Borramos el directorio previamente creado.
import shutil
shutil.rmtree("../data/picklefile",True)
shutil.rmtree("../data/sequencefile",True)

##### Apache Hadoop formats reading
**Avro** and **parquet** are structured data files. For this reason, it's convenient to read and modify them by SparSQL module. We will focus on SparkSQL during the next session.

### RDD Operations
RDDs support two types of operations,
1. **Transformations:** 
    * A **new RDD** is created from an existing one
    * Transformations are ***lazy***. Computations are not executed when they are submited 
    * Instead, RDDs ***remember*** transformations previously applied on them
2. **Actions:** They **return a result** to the driver after computations are done.  

####  Passing functions to Spark
Spark supports using functions in three different ways,
1. **lambda** expressions
2. Declared local functions **(def)**
3. Functions included in any **module**

In [11]:
# 1. Lambda expressions
pluralLambdaRDD = wordsRDD.map(lambda x: x + 's')
print (pluralLambdaRDD.collect())

['cats', 'elephants', 'rats', 'rats', 'cats']


In [13]:
# 2. local functions
def makePlural(word):
    return word + 's'

print(makePlural('cat'))
wordsRDD.map(makePlural).collect()

cats


['cats', 'elephants', 'rats', 'rats', 'cats']

In [14]:
def makeSingular(word):
    return word[:-1]

pluralLambdaRDD.map(makeSingular).collect()

['cat', 'elephant', 'rat', 'rat', 'cat']

In [15]:
if __name__ == "__main__":
    def SepararPalabras(s):
        words = s.split(" ")
        return len(words)

ShakWord = Shakespeare.map(SepararPalabras)
ShakWord.take(10)

[12, 2, 1, 14, 13, 11, 7, 1, 11, 11]

In [16]:
# 3. Functions from modules
from operator import neg,is_

NegativeWords = ShakWord.map(neg)
NegativeWords.take(10)

[-12, -2, -1, -14, -13, -11, -7, -1, -11, -11]

**is_(a,b)** is a function with ***two arguments*** which returns True or False depending on the equality or inequality between a and b.  
This function can not be called through .map() function, due to the multiple input. First, we have to modify the function call. To do this, we mask the call with a new function with just one argument.

In [17]:
def Is_(b):
    def _is_(dataline):
        return is_(dataline,b)
    return _is_

IsEqualToOne = NegativeWords.map(Is_(-1))
IsEqualToOne.take(11)

[False, False, True, False, False, False, False, True, False, False, False]

In [18]:
pluralLengths = (pluralLambdaRDD.map(lambda x: len(x)).collect())
print( pluralLengths)

[4, 9, 4, 4, 4]


## 3 · Pair RDDs: Key-Value Pairs
Pair RDDs are formed by key-value pair objects. In Python, they are built by parallelizing **tuples**.

In [15]:
listaDeTuples = [(1,2),(2,3),(2,1)]
unPairRDD = sc.parallelize(listaDeTuples)
unPairRDD.collect()

[(1, 2), (2, 3), (2, 1)]

In [16]:
print(unPairRDD.values().collect()) # Show pairRDD values
print(unPairRDD.keys().collect()) # Show pairRDD keys

[2, 3, 1]
[1, 2, 2]


### Transformations
Common transformations supported by Spark.
##### map

In [21]:
pairs = Shakespeare.map(lambda s: (s, 1))

##### reduceByKey

In [22]:
counts = pairs.reduceByKey(lambda a, b: a + b)
# reduceByKey: It combines same key values applying a function to them.

##### sortByKey

In [20]:
CountsOrdered = counts.sortByKey()

##### flatMap

In [21]:
shakespeareWords = Shakespeare.flatMap(lambda x: x.split(' '))
# Similar to .map() but it can return multiple values.

##### union, intersection y distinct

In [22]:
conjunto1RDD = sc.parallelize([1, 2, 3])
conjunto2RDD = sc.parallelize([2, 3, 4, 5])

print(conjunto1RDD.intersection(conjunto2RDD).collect())

print(conjunto1RDD.union(conjunto2RDD).collect())

print(conjunto1RDD.union(conjunto2RDD).distinct().collect())

[2, 3]
[1, 2, 3, 2, 3, 4, 5]
[1, 2, 3, 4, 5]


##### filter

In [42]:
conjunto3RDD = sc.parallelize([8,7,5,4,1,1,7,2,5])
conjunto3filtrado = conjunto3RDD.filter(lambda x:x>1)

### Actions
##### first

In [24]:
# pairs = Shakespeare.map(lambda s: (s, 1))
pairs.first()

('The Project Gutenberg EBook of The Complete Works of William Shakespeare, by',
 1)

##### take

In [80]:
# counts = pairs.reduceByKey(lambda a, b: a + b)
for key, value in counts.take(8):
    print ('{}: {}'.format(key, str(value)))

: 9679
    May be call'd ransom, let it come. Sufficeth: 1
    God, it holds yet.: 1
  STEPHANO. Here; swear then how thou escap'dst.: 1
    She is delivered, lord; she is delivered.: 1
    I fear our purpose is discovered.: 1
    Nay, and you shall hear some.  [To BRUTUS] Will you be gone?: 1
  PAROLLES. It is to be recovered. But that the merit of service is: 1


In [72]:
# CountsOrdered = counts.sortByKey()
CountsOrdered.take(10)

[('', 9679),
 ("                                                          'Goneril.'", 1),
 ('                                                          Exeunt', 45),
 ('                                                          Exit GURNEY', 1),
 ('                                                          Guns heard.', 1),
 ("                                                          HAMLET.'", 1),
 ('                                                          Music.', 1),
 ('                                                          [Dies]', 3),
 ('                                                         Exeunt LORDS', 1),
 ('                                                         Exeunt.', 56)]

##### takeSample

In [23]:
counts.takeSample(False,10) # First parameter indicates Replacement (True) or not

[('    eloquence in a sugar touch of them than in the tongues of the', 1),
 ('  JULIA. You mistake; the musician likes me not.', 1),
 ('  THIRD SOLDIER. It signs well, does it not?', 1),
 ('    State-statues only.', 1),
 ('            him over to a gaoler. Exeunt omnes', 1),
 ("  OTHELLO. That's he that was Othello. Here I am.", 1),
 ("  FLEANCE. I take't 'tis later, sir.", 1),
 ("  SLY. Third, or fourth, or fifth borough, I'll answer him by law.", 1),
 ("  BOTH. Name them, my lord; let's know them.", 1),
 ('    O, had it been a stranger, not my child,', 1)]

##### collect

In [85]:
# shakespeareWords = Shakespeare.flatMap(lambda x: x.split(' '))
shakespeareWords.collect()

['The',
 'Project',
 'Gutenberg',
 'EBook',
 'of',
 'The',
 'Complete',
 'Works',
 'of',
 'William',
 'Shakespeare,',
 'by',
 'William',
 'Shakespeare',
 '',
 'This',
 'eBook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'almost',
 'no',
 'restrictions',
 'whatsoever.',
 '',
 'You',
 'may',
 'copy',
 'it,',
 'give',
 'it',
 'away',
 'or',
 're-use',
 'it',
 'under',
 'the',
 'terms',
 'of',
 'the',
 'Project',
 'Gutenberg',
 'License',
 'included',
 'with',
 'this',
 'eBook',
 'or',
 'online',
 'at',
 'www.gutenberg.org',
 '',
 '**',
 'This',
 'is',
 'a',
 'COPYRIGHTED',
 'Project',
 'Gutenberg',
 'eBook,',
 'Details',
 'Below',
 '**',
 '**',
 'This',
 'is',
 'a',
 'COPYRIGHTED',
 'Project',
 'Gutenberg',
 'eBook,',
 'Details',
 'Below',
 '**',
 '**',
 'This',
 'is',
 'a',
 'COPYRIGHTED',
 'Project',
 'Gutenberg',
 'eBook,',
 'Details',
 'Below',
 '**',
 '**',
 'This',
 'is',
 'a',
 'COPYRIGHTED',
 'Project',
 'Gutenberg',
 'eB

##### count

In [86]:
shakespeareWords.count()

1410735

##### reduce

In [30]:
# conjunto2RDD = sc.parallelize([2, 3, 4, 5])
conjunto2RDD.reduce(lambda a, n: a + n)

14

##### takeOrdered

In [47]:
# conjunto3RDD = sc.parallelize([8,7,5,4,1,1,7,2,5])
# conjunto3filtrado = conjunto3RDD.filter(lambda x:x>1)
conjunto3filtrado.takeOrdered(5)

[2, 4, 5, 5, 7]

### Writing RDDs into external files
1. **saveAsTextFile(path)**
2. **saveAsSequenceFile(path)** (Previously explained)
3. **saveAsObjectFile(path)** (Only for Java and Scala)

In [55]:
shakespeareWords.saveAsTextFile("../data/PalabrasShakespeare")

In [58]:
# Borramos los directorios previamente creados.
import shutil
shutil.rmtree("../data/PalabrasShakespeare",True)

## 4 · RDD Persistance
One of the most important capabilities in Spark is **persisting (or caching)** a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.  

You can mark an RDD to be persisted using the **persist()** or **cache()** methods on it. In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object to persist(). 
The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory).
For more information, you can visit https://spark.apache.org/docs/latest/rdd-programming-guide.html.  

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

## 5 · Shared Variables · Cluster Computing
To execute jobs, Spark **breaks up the processing of RDD operations into tasks**, each of which is **executed by an executor**. Prior to execution, Spark computes the task’s closure. The ***closure is those variables and methods which must be visible for the executor to perform its computations on the RDD***. This closure is serialized and sent to each executor.  

The variables within the closure sent to each executor are now **copies** and thus, those variables **are not longer the variables on the driver node** and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: **broadcast variables** and **accumulators**.

In [23]:
counter = 0
data = [1,5,7,8,9,10,2,41,5]
rdd = sc.parallelize(data)

# Wrong: El contador no se actualizará!
def increment_counter(x):
    global counter
    counter += x

rdd.foreach(increment_counter)
print("Counter value: ", counter)

Counter value:  0


#### Broadcast
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. Their main features are:
* **Immutable**, they cannot be changed later on.
* Have to be able to **fit in memory** on one machine. That means that they definitely should NOT be anything super large.
* **Distributed to the cluster**
  
e.g. Reference table which is compared with other data several times during the application execution.

In [106]:
Var = sc.parallelize([(1,"bye"), (2,"adiós"), (3,"ciao")]).collectAsMap()
broadcastVar = sc.broadcast(Var)
broadcastVar.value
# Se puede comprobar que no existe el método collect() para este tipo de variables,
# ya que no está repartido entre las máquinas del cluster 

{1: 'bye', 2: 'adiós', 3: 'ciao'}

#### Accumulators
Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.
  
Integer and float accumulator examples.

In [10]:
accum = sc.accumulator(0)
accum

Accumulator<id=3, value=0>

In [13]:
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value

30

programmers can add support for new types by subclassing ***AccumulatorParam*** class.
This class has two methods:
1. **zero**: It provides a "zero value" to the new accumulator type.
2. **addInPlace**: It indicates the way two values are added.
  
Vector-type accumulator example.

In [2]:
# Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. 
# For other types, a custom AccumulatorParam can be used.
# accumulator(value, accum_param=None)

# Then, create an Accumulator of this type:
# vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

from pyspark.accumulators import AccumulatorParam
class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return [0.0] * len(value)
    def addInPlace(self, val1, val2):
        for i in xrange(len(val1)):
            val1[i] += val2[i]
        return val1

For accumulator updates performed inside ***actions*** only, Spark guarantees that **each task’s update to the accumulator will only be applied once**, i.e. restarted tasks will not update the value. In ***transformations***, users should be aware of that **each task’s update may be applied more than once if tasks or job stages are re-executed**.

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, **their value is only updated once that RDD is computed as part of an action**. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map().

In [16]:
distributedData = sc.parallelize([2,85,6,4,8,9,6,1,0])
accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
distributedData.map(g)
print(accum)
# accum todavía vale 0 ya que no hay ninguna acción lanzada.

0


## 6 · Application Deployment to a cluster
The spark-submit script in Spark’s bin directory is used to launch applications on a cluster. It can use all of Spark’s supported cluster managers through a uniform interface so you don’t have to configure your application especially for each one.  

If your code depends on other projects, you will need to package them alongside your application in order to distribute the code to a Spark cluster. To do this, use the **--py-files** argument of spark-submit to add .py, .zip or .egg files to be distributed with your application. If you depend on multiple Python files we recommend packaging them into a .zip or .egg.
#### Deployment examples

In [None]:
# Run application locally on 8 cores
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  100

# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \Hola 
  1000

# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master Finalizamosyarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000

# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  http://path/to/examples.jar \
  1000Session

# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master k8s://xx.yy.zz.ww:443 \
  --deploy-mode cluster \
  --executor-memory 20G \
  --num-executors 50 \
  http://path/to/examples.jar \
  1000

## 7 · Stop Spark Context

In [80]:
sc.stop()