# (Slightly More) Advanced Topics in Spark
### Table of Contents

1. [Spark on EC2](#ec2)
1. [Key Terminology](#keyterminology)
1. [Understanding the Shuffle](#shuffle)
1. [Programming best practices](#programming practices)
1. [Resource Tuning](#resource tuning)

<span id="ec2"></span>
### Spark on EC2

```bash
# launch
$ ./spark-ec2 --key-pair=springfellows2015 --identity-file=/path/to/springfellows2015.pem -s 5 --instance-type=m3.xlarge --region=us-east-1 --zone=us-east-1a --copy-aws-credentials launch myclustername

# pause
$ ./spark-ec2 --region=<ec2-region> stop <cluster-name>

# restart
$ ./spark-ec2 -i <key-file> --region=<ec2-region> start <cluster-name>

# kill
$ ./spark-ec2 --region=<ec2-region> destroy <cluster-name>
```

#### Running jobs
```bash
# ssh in
$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>

# scp your jar (or directory if you want to package the app on the master node)
$ scp -i <key-pair-file> <jarpath> root@<master-public-dns>:/root/<path>

# run the jar
$ spark-submit --master spark://$MASTERDNS:7077 --class <class> <application-jar> [application args]
```

**Job monitoring and inspection:** You can view the status of the cluster via its web UI: `http://<master-hostname>:8080.`

### Comparison to MapReduce

In MapReduce, the highest-level unit of computation is a job. The system loads the data, applies a map function, shuffles it, applies a reduce function, and writes it back out to persistent storage. Spark has a similar job concept (although a job can consist of more stages than just a single map and reduce), but it also has a higher-level construct called an “application,” which can run multiple jobs, in sequence or in parallel.

![spark_driver](images/spark-driver.png)

<span id='keyterminology'></span>
## Key Terminology
- **Application**: This may be a single job, a sequence of jobs, a long-running service issuing new commands as needed or an interactive exploration session. This corresponds to an instance of the `SparkContext` class.
- **Spark Driver**: The Spark driver is the process running the spark context (which represents the application session). This driver is responsible for converting the application to a directed graph of individual steps to execute on the cluster. There is one driver per application.   
- **Spark Executor**: A single JVM instance on a node that serves a single Spark application. An executor runs multiple tasks over its lifetime, and multiple tasks concurrently. A node may have several Spark executors and there are many nodes running Spark Executors for each client application.  
- **Spark Task**: A Spark Task represents a unit of work on a partition of a distributed dataset. 
- **Stage:** The job's transformations are assembled into *stages*, which correspond to a collection of tasks that all execute the same code, each on a different subset of data. 
    - What determines a stage boundary: any transformations that invoke a *shuffle* of the full data. *Narrow* transformations like map and filter can be executed entirely within a single partition of a parent RDD. *Wide* transformations like groupByKey and reduceByKey require looking at records across partitions of a parent RDD.
- **Spark Application Master**: The Spark Application Master is responsible for negotiating resource requests made by the driver with YARN and finding a suitable set of hosts/containers in which to run the Spark applications. There is one Application Master per application. 
- **Resource tuning**: Configuring Spark to take advantage of everything the cluster has to offer.
- **Tuning parallelism**: The most difficult, and important, parameter in job performance.
- **Data representation**: On-disk (use Avro or Parquet!), and the in-memory format it takes as it's cached or moves through the system.

<span id='shuffle'></span>
## Shuffles

#### What's a shuffle?
In an "all-to-all" operation, the entire dataset as a whole must be considered: the contents of each output record can depent on records that come from many different partitions. The "shuffle" refers to the repartitioning and aggregation of data during an all to all operation. 

**Map task:** task writing out shuffle data  
**Reduce task:** task reading the shuffle data. The same task can first be a reduce task then be a map task on that data.

Every map task writes out data to local disk, then the reduce tasks make remote requests to fetch that data. The job of the map side of the shuffle is to write out records in such a way that all records headed for the same reduce task are grouped next to each other for easy fetching. 

**General rule of thumb:** Minimize the number of shuffles, *except when* an extra shuffle can be advantageous to performance when it increases parallelism.

#### Primary example: Use `reduceByKey` over `groupByKey`
- `reduceByKey`: Performs a map-side combine. I.e. data is combined *first* within each partition, so that each partition outputs at most *one value per key*, which is then sent over the network to the reduce workers.
- `groupByKey`: All the data for a given key is sent to a reduce worker, whereupon all those data are reduced. 

So there is a wasteful use of network in `groupByKey`, and there can potentially be out of disk problems. Also prefer `aggregateByKey`, `foldByKey`, `combineByKey` over `groupByKey`. [A helpful illustration.](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html)

- **Limitations:** `reduceByKey` requires combining all your values into another value with the exact same type.

**Exceptions to the rule:**
- **Example 1:** Your data arrives in a few large unsplittable files, and the partitioning indicated by `InputFormat` might place large numbers of records in each partition while not generating enough partitions to take advantage of all the available cores.  
In this case, invoking repartition with a high number of partitions after loading the data will allow the operations that come after it to leverage more of the cluster's CPU.
- **Example 2:** If using the reduce or aggregate action to aggregate data into the driver. If there are a high number of partitions, aggregating over them can cause a bottleneck in the single thread on the driver merging all the results together.



## Data Partitioning
**Purpose:** In a distributed program, communication is very expensive, so laying out data to minimize network traffic can greatly improve performance. Partitioning is useful when a dataset is reused multiple times in key-oriented operations.

**Definition:** Grouping elements amongst nodes based on a function of each key.  
The programmer can't control which worker node each key goes to, but can ensure that a set of keys will appear together on some node. E.g.:
1. Hash-partition an RDD into 100 partitions => Keys with the same hash value modulo 100 appear on the same node
2. Range-partition an RDD into sorted ranges of keys => Elements with keys in the same range appear on the same node

**Example operations:** `joins, groupByKey(), reduceByKey(), combineByKey(), lookup(), cogroup(), groupWith()`

### An Example
- Consider an application that keeps a large table of user information in memory: an RDD of (UserID, UserInfo) pairs, with UserInfo containing a list of the user's subscribed topics.
- The application periodically combines this table with a smaller file representing events that happened in the past five minutes, e.g. a table of (UserID, LinkInfo) pairs for users who have clicked a link on a website in those five minutes.
- E.g. count how many times a user visited a link that was not one of their subscribed topics.

**Inefficient way:** 
- Code:

```scala
// We load the user info from a Hadoop SequenceFile on HDFS.
// This distributes elements of userData by the HDFS block where they are found,
// and doesn't provide Spark with any way of knowing in which partition a
// particular UserID is located.
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()

// Function called periodically to process a logfile of events in the past 5 minutes;
// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
def processNewLogs(logFileName: String) 
{
    val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
    val joined = userData.join(events) // RDD of (UserID, (UserInfo, LinkInfo)) pairs 
    val offTopicVisits = joined.filter { 
        case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
            !userInfo.topics.contains(linkInfo.topic)
    }.count()
  println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
```

- *Why?* `join()` doesn't know how keys are partitioned in the datasets and will hash all the keys of both datasets, sneding elements with the same key hash across the network to the same machine, then joining together the elements with the same key on that machine. `userData` table is hashed and shuffled across the network on every call even though it doesn't change.

￼![no_partition](images/spark_partition_inefficient.png)

**Efficient Way:**
- Use the `partitionBy()` transformation on `userData` to hash-partition it at the start of the program. `events` RDD is local to `processNewLogs()` and only used once, so we don't need to partition it. 
```scala
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
                  .partitionBy(new HashPartitioner(100)) // Create 100 partitions 
                  .persist()
```
- `join()` will now know that userData is hash-partitioned and take advantage of it: Spark will shuffle only the events RDD, sedning events with each userID to the machine with the corresponding hash partition of userData. => Much less network communication => Much faster runtime.
![partitioned](images/spark_partition_efficient.png)

<span id='programming practices'></span>
## Programming best practices
### Serialization
- What is Serialization?  
When data is cached in a serialized format, transferred over the network for a shuffle, Spark needs a byte stream representation of RDD contents. Spark accepts a pluggable Serializer for defining this serialization and deserialization. 
Nearly always, Spark should be configured to instead use Kryo serialization. Kryo defines a more compact format that serializes and deserializes far faster. The “catch” is that, to get this efficiency, Kryo requires register‐ ing any custom classes defined in the application up front. Kryo will still work without registering the classes, but the serialization will take up more space and time because the class name must be written out before each record. Turning on Kryo and registering classes in code looks like:
```scala
val conf = new SparkConf().setAppName("MyApp")
conf.registerKryoClasses(Array(classOf[MyCustomClass1], classOf[MyCustomClass2]))
```

### Java 3rd-Party libraries
- Advantage of running on the JVM: the sheer volume of code that's been developed for the Java platform over the years. For any kind of data type or algorithm you need to use, it's likely that someone has already written an (open-source) Java library to solve your problem. 
- Requirements for smooth integration with Scala and Spark: 
    1. Pleasant to use for interactive data analysis
    1. the data types implement the `Serializable` interface and/or can be easily serialized using libraries like Kryo.
    1. As few external dependencies as possible: just grab and load the JAR
    1. Prefer Java libraries with Scala wrappers, and simple/rich APIs that don't make extensive use of Java design patterns. 
    
**Example: Temporal data** : JodaTime and its Scala wrapper, NScalaTime 
```scala
import com.github.nscala_time.time.Imports._
```
- These libraries are centered around immutable `DateTime` objects
```scala
// 3 pm on Oct 31 2014
val dt2 = new DateTime(2014, 10, 31, 15, 0)
dt2: org.joda.time.DateTime = 2014-10-31T15:00:00.000-07:00
```

- To convert a String date into a `DateTime` object:
```scala
import java.text.SimpleDateFormat
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = format.parse("2014-10-12 10:30:44")
val datetime = new DateTime(date)
```
#### `sbt assembly` 
Instead of `sbt package`, package with `sbt assembly` to fetch external jars.
```bash
$ sbt assembly
[info] Loading project definition from /Users/elizachang/projects/spark_wikipedia/project
[info] Set current project to MyProject (in build file:/Users/elizachang/projects/spark_wikipedia/)
[info] Including: nscala-time_2.10-1.8.0.jar
[info] Including: joda-convert-1.2.jar
[info] Including: joda-time-2.7.jar
[info] Checking every *.class/*.jar file's SHA-1.
[info] Merging files...
[warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
[warn] Strategy 'discard' was applied to a file
[warn] Strategy 'rename' was applied to 2 files
[info] SHA-1: ddea66472da84f95270c8d041d84d55a7b8aa9d9
[info] Packaging /Users/elizachang/projects/spark_wikipedia/target/scala-2.10/MyProject-assembly-1.0.jar ...
```



<span id='resource tuning'></span>
## Resource Tuning

### Tuning resource allocation
Two main resources: **CPU** and **memory**.  
Disk & network I/O play a part, but Spark & YARN can't do anything to manage them.

Every Spark executor in an application has the same fixed *number of cores* and the same *fixed heap size.*  

Ways to specify number of cores:
- `spark-submit --conf spark.executor.cores=2` 
- In `$SPARK_HOME/conf/spark-defaults.conf`,
    `spark.executor.cores   2`
- In your program: 
```scala
val conf = new SparkConf()
conf.set("spark.executor.cores",2)
```    

The heap size impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins. Ways to specify heap size:
- `--executor-cores`: Number of concurrent tasks an executor can run
- `--executor-memory` and `spark.executor.memory`: the executor heap size

**Dynamic Resource Allocation:** As of Spark 1.3.0, dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle. Set `spark.dynamicAllocation.enabled=True`

### Tuning parallelism
Spark is limited in its ability to optimize the number of tasks per stage. You want to take advantage of all the available CPU, so you want to maximize the number of tasks you can run. 

*The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage.* So, to tune the number of tasks, tune your number of partitions! The number of partitions in an RDD: `rdd.partitions().size()`

The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, except for:
- `coalesce` : allows creation of an RDD with fewer partitions than its parent RDD
- `union` : creates an RDD with the *sum* of its parents' partitions
- `cartesian` : creates an RDD with the product of its parents partitions
    
RDDs with no parents: e.g. produced by `textFile` or `hadoopFile` typically have 1 partition per HDFS block being read 

**Memory constraints:** You want to run enough tasks so that the data destined for each task fits in the memory available to that task.

The memory available to each task is:  
(spark.executor.memory $*$ spark.shuffle.memoryFraction $*$ spark.shuffle.safetyFraction)$/$spark.executor.cores  
Memory fraction and safety fraction default to 0.2 and 0.8 respectively.

#### Sources
- http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
- http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
- http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/

#### Exit Tickets
1. Under what circumstances would you want to decrease parallelism?
1. Explain the ideas of stages and shuffling to someone new to Spark.
1. How does cluster resource allocation affect how you partition your data?

*Copyright &copy; 2015 The Data Incubator.  All rights reserved.*