# Apache Spark Tutorial

University of Liège                                                      
Joeri Hermans (joeri.hermans@doct.ulg.ac.be)                         

This notebook will guide the students of "Large-Scale Distributed Systems" through their initial steps with [Apache Spark](https://spark.apache.org).

## Downloading, unpacking and moving

In [14]:
# Download the latest version of Apache Spark.
!rm -r spark-2.2.0-bin-hadoop2.7.tgz
!wget -q http://apache.tt.co.kr/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz

rm: cannot remove 'spark-2.2.0-bin-hadoop2.7': No such file or directory


In [20]:
# Unpack the framework.
!rm -r apache-spark
!tar -xzf spark-2.2.0-bin-hadoop2.7.tgz
!mv spark-2.2.0-bin-hadoop2.7 apache-spark
!ls -al

rm: cannot remove 'apache-spark': No such file or directory
tar (child): spark-2.2.0-bin-hadoop2.7.tgz: Cannot open: No such file or directory
tar (child): Error is not recoverable: exiting now
tar: Child returned status 2
tar: Error is not recoverable: exiting now
total 20
drwxr-xr-x  4 joeri joeri 4096 Nov 15 01:42 .
drwxr-xr-x 10 joeri joeri 4096 Nov 15 00:36 ..
drwxr-xr-x 12 joeri joeri 4096 Jul  1 01:09 apache-spark
drwxr-xr-x  2 joeri joeri 4096 Nov 15 00:48 .ipynb_checkpoints
-rw-r--r--  1 joeri joeri 2623 Nov 15 01:41 spark-tutorial.ipynb


In [23]:
# Move it to the desired location. Make sure destination is writable.
!rm -rf /opt/apache-spark
!mv apache-spark /opt/

In [25]:
# Now, in order to use the libraries, make sure the libs are included in $PATH and $PYTHONPATH.
# Add this to your .bashrc or .zshrc depending on your shell.
!export SPARK_HOME=/opt/apache-spark
!export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
!export PATH="$SPARK_HOME/bin:$PATH"

## Apache Spark

Test your installation by importing the Python module. If you did everything correctly, this should not produce any errors.

In [27]:
import pyspark

from pyspark import SparkContext
from pyspark import SparkConf

In the following sections we give a basic introduction to most of the concepts in Spark, and how (not) to use them. If you whish to implement something specific, you can always refer to the [documentation](https://spark.apache.org/docs/latest/quick-start.html).

As you saw in the lecture, Apache Spark provides two distinct concepts where computation resides. The first being the *driver*, which is basically the process which is responsible for issuing the tasks to the cluster, it is also used for *collecting* the results of the computation. The second, and final being the *executors*, which actually *execute* the tasks issues by the driver. Furthermore, an executor is also responsible for maintaining the state of the computation. For instance, if you issue a set of tasks from the driver process, and go for a coffee for instance, the (intermediate) result of the computation cannot be lost as long as the driver process is still alive.

Depending on how the cluster is configured (stand-alone, YARN, Mesos, ...), a different master-address has to be specified. The master will basically allocate the resources you requested and assign them to the requestor. In general, you have to specify 4 things before starting your Spark Application, i.e., the `application_name`, the `master_address` or master-mode (e.g., yarn-client, in this case you request yarn to schedule your resources), `num_executors` which obviously defines the number of executors you whish to use, and finally, `num_processes` which is the number of child-processes (you can view them as threads) which are run in a single executor. This option is advantegeous in the sense that the child processes within a single executor will share the same memory. As a result, these processes can share data really quickly (instead of reading data from a different node on the network).

In [28]:
# Define the name of your application.
application_name = "Large-Scale Distributed Systems Spark Tutorial"

# Define the master address, for local mode this is local[*].
master = "local[*]"
# Number of executors.
num_executors = 2
# Number of child processes per executors.
num_processes = 2

Now, using these options, we are able to construct a `SparkConf` object which tells the cluster how to allocate the resources, and specify the behaviour of the cluster. In the configuration specified below, you use the variables which are defines above, including the amount of memory you require on an executor level. In general, Spark thrives on the availability of RAM (more data can be kept in memory, so it can be processed faster).

A different option you can specify is a custom `Serializer`, the sole responsibility of the serializer is to serialize the memory to some kind of persistent medium whenever it doesn't fit in memory. Nevertheless, the way in-memory (heap) objects are serialized can differ from implementation. By default, Spark will use the Java serializer, which in some cases might not be sufficient because it fails to encode dependencies. That's why it is commonly a good-practice to use the Kryo Serializer. Furthermore, it is also faster than the Java serializer.

In [None]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "4g")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

In general, the list of options you can specify are endless, and are application-dependent in order to obtain a good throughput of your processing application.