<h1>Optimization / parallelization</h1>

Spark consists of a *driver* program that executes *parallel* operations on a cluster. There are two main abstractions that Spark provides:

- An ability to divide a dataset into partitions which can be operated on in parallel.
- Shared variables: by default, variables used in each copy of a function are distinct. However, Spark does allow for shared variables: either **broadcast** variables, which cache a value in memory on all nodes, or **accumulators**, which are only added to.

We initiate our Spark session using the following.

In [8]:
val sparkVersion = "2.0.1"
val scalaVersion = scala.util.Properties.versionNumberString

[36msparkVersion[0m: [32mString[0m = [32m"2.0.1"[0m
[36mscalaVersion[0m: [32mString[0m = [32m"2.11.8"[0m

In [9]:
classpath.add(
    "org.apache.spark" %% "spark-yarn" % sparkVersion,
    "org.apache.spark" %% "spark-mllib" % sparkVersion
)

0 new artifact(s)




In [10]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.mllib.util.MLUtils

[32mimport [36morg.apache.spark.sql.SparkSession[0m
[32mimport [36morg.apache.spark.mllib.util.MLUtils[0m

<h2>Understanding closures</h2>

There is a difference between running in local and cluster mode; the main difference is variable values. Note that you may get unexpected behaviour due to assumptions about variable values being updated / not being updated!

<h3>Broadcast variables</h3>

To avoid creating a copy of a variable for each task, an accessible (read-only!) variable can be kept on each machine - this is useful for particularly large datasets which may be needed for multiple tasks. The data broadcasted this way is cached in serialized form and deserialized before running each task.

Broadcast variables are created from a variable $v$ by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around $v$, and its value can be accessed by calling the *value* method.

In [11]:
val sparkSession = SparkSession.builder
  .master("local[1]")
  .appName("Parallelization")
  .getOrCreate()

val sc = sparkSession.sparkContext
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

[36msparkSession[0m: [32mSparkSession[0m = org.apache.spark.sql.SparkSession@32b719ed
[36msc[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mSparkContext[0m = org.apache.spark.SparkContext@77668bb4
[36mbroadcastVar[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mbroadcast[0m.[32mBroadcast[0m[[32mArray[0m[[32mInt[0m]] = Broadcast(2)
[36mres10_3[0m: [32mArray[0m[[32mInt[0m] = [33mArray[0m([32m1[0m, [32m2[0m, [32m3[0m)

<h3>Accumulators</h3>

[Accumulators](https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/Accumulator.html) are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in <tt>MapReduce</tt>) or sums. 

You can create a numeric accumulator by calling *SparkContext.longAccumulator()* or *SparkContext.doubleAccumulator()* to accumulate either Long or Double values (it is possible for users to create their own accumulators of different type), and the accumulator is created with an initial value. Cluster tasks can then add to it using <tt>add</tt>. However, they cannot read its value - that can only be read using <tt>value</tt> by the driver program.

In [12]:
object Acc {
   def example( ) : Unit = {
      val accum = sc.accumulator(0)
      sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
      println(accum.value)
   }
}

Acc.example

10


defined [32mobject [36mAcc[0m

Note that accumulators are only guaranteed to update the value of a variable once for updates within *actions*, within lazy transforms (such as *map*) accumulator updates are not guaranteed to be executed:

In [13]:
val data = Array(1, 2, 3, 4, 5)
val accum2 = sc.longAccumulator
data.map { x => accum2.add(x); x }
println(accum2.value)

15


[36mdata[0m: [32mArray[0m[[32mInt[0m] = [33mArray[0m([32m1[0m, [32m2[0m, [32m3[0m, [32m4[0m, [32m5[0m)
[36maccum2[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mutil[0m.[32mLongAccumulator[0m = LongAccumulator(id: 92, name: None, value: 15)
[36mres12_2[0m: [32mArray[0m[[32mInt[0m] = [33mArray[0m([32m1[0m, [32m2[0m, [32m3[0m, [32m4[0m, [32m5[0m)

<h2>Sequential vs parallel operations</h2>

Some examples of operations carried out in parallel are below. Note that these are executed on small collections for illustration purposes only - in general, you will want to run these on large collections.

<h3>map</h3>

Using a parallel map to transform a collection of <tt>String</tt> to all-uppercase:

In [14]:
val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames.map(_.toUpperCase)

[36mlastNames[0m: [32mcollection[0m.[32mparallel[0m.[32mimmutable[0m.[32mParSeq[0m[[32mString[0m] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)
[36mres13_1[0m: [32mcollection[0m.[32mparallel[0m.[32mimmutable[0m.[32mParSeq[0m[[32mString[0m] = ParVector(SMITH, JONES, FRANKENSTEIN, BACH, JACKSON, RODIN)

<h3>fold</h3>

Summing via fold on a <tt>ParArray</tt>:

In [15]:
val parArray = (1 to 10000).toArray.par   
parArray.fold(0)(_ + _)

[36mparArray[0m: [32mcollection[0m.[32mparallel[0m.[32mmutable[0m.[32mParArray[0m[[32mInt[0m] = ParArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 1

<h3>filter</h3>

Using a parallel filter to select the last names that come alphabetically after the letter “K”.

In [16]:
val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames.filter(_.head >= 'J')

[36mlastNames[0m: [32mcollection[0m.[32mparallel[0m.[32mimmutable[0m.[32mParSeq[0m[[32mString[0m] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)
[36mres15_1[0m: [32mcollection[0m.[32mparallel[0m.[32mimmutable[0m.[32mParSeq[0m[[32mString[0m] = ParVector(Smith, Jones, Jackson, Rodin)

Can you run the parallel operations above sequentially on a bigger collection (for example a list of numbers from 0 - 10000) and do timing experiments to compare speed of execution?

<h2>Parallelized collections</h2>

A parallelized collection is created by calling **parallelize** on an existing collection. Note that the dataset will be copied and then a distributed dataset is created that can be worked on in parallel.

In [17]:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

[36mdata[0m: [32mArray[0m[[32mInt[0m] = [33mArray[0m([32m1[0m, [32m2[0m, [32m3[0m, [32m4[0m, [32m5[0m)
[36mdistData[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mrdd[0m.[32mRDD[0m[[32mInt[0m] = ParallelCollectionRDD[2] at parallelize at Main.scala:30

While the number of *partitions* can be set manually, by passing parallelize a second argument to the sparkContext

    sc.parallelize(data, 10)
    
Spark tries to set the number of partitions automatically based on the cluster, the rule being 2-4 partitions for every CPU in the cluster.

<h3>$\pi$ Estimation</h3>

Spark can also be used for compute-intensive tasks. This code estimates $\pi$ by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be $\pi / 4$, so we use this to get our estimate.

In [18]:
val NUM_SAMPLES = 10000000

val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
    val x = Math.random()
    val y = Math.random()
    if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)

println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)

Pi is roughly 3.142068


[36mNUM_SAMPLES[0m: [32mInt[0m = [32m10000000[0m
[36mcount[0m: [32mInt[0m = [32m7855170[0m

The $\pi$ estimation example is particularly interesting (and it appears in the exercises below!).

<h2>Exercises</h2>

<h3>Exercise 1</h3>

Create the <tt>accum2</tt> program from the <bb>accumulator</bb> section as a standalone program on the HPC to see whether / when it yields unexpected results.

<h3>Exercise 2</h3>

You have run the $\pi$ estimation program above as a standalone program when learning to use the HPC and have seen it fail to scale. If we point you at the function <tt>XORShiftRandom</tt>, can you modify the program so it does scale? (Carry out timing experiments to prove that it now works as expected.)

<h3>Exercise 3</h3>

We introduced **transformations** and **actions** in Notebook 2 and extended our knowledge of them in this notebook. The main advantage of Spark is parallelization. In this exercise, you are to use the [bagofwords Enron .gz](https://archive.ics.uci.edu/ml/datasets/Bag+of+Words) dataset, strip off the first 3 lines, and using the <tt>.partitions.size</tt> method - i.e. for example

    distData.partitions.size
    
to investigate how / if the number of partitions changes after running transformations & actions. Read the file in as a <tt>.gz</tt> file, and investigate the effect of at least 10 of [Spark's most common transformations](http://spark.apache.org/docs/latest/programming-guide.html#transformations) and actions (make sure that your list includes <tt>reduceByKey</tt> and <tt>groupByKey</tt>).