### Advanced Spark Programming

We will revisit accumulators and broadcast variables in this notebook.

---

#### Accumumators

When variable from the driver program are sent to functions used in say map or filter operations, their value is sent to these partitions and they can be used on different partitions, however, their final values are not propagated back to the driver program. Thus normal variables are ok to be used in the use case where we want to add a fixed number to all numbers in the RDD, but not in case where we want to say count some numbers across various partitions of the RDD.

In [11]:
//Ok in the following case
val delta = 2

val rdd = sc.parallelize(1 to 10)
println("Adding delta to all values gives us " + rdd.map(_ + delta).collect.toList)

var count = 0
//Not ok for following case, where we want to count all even numbers

rdd.foreach(x => if(x % 2 == 0) count += 1)

print("Value of count is " + count)


Adding delta to all values gives us List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
Value of count is 0


Accumulators to the rescue. 
For the above use case we need to use an accumulator which is distributed across the cluster and does exactly what we need. Accumulator is similar to seeing one value across the entire cluster and the final result propagated back to driver program. Following code demonstrates the use case of counting even numbers.


In [14]:
val countAcc = sc.accumulator(0)
rdd.foreach(x => if(x % 2 == 0) countAcc += 1)
print("Value of countAcc is " + countAcc)

Value of countAcc is 5


Note that accumulator value will be visible only when an action is executed. For example, the following mapping function males use of an accumulator but the value is not visible as transformations are lazy.

In [20]:
val countAcc1 = sc.accumulator(0)
val mappedRdd = rdd.map(x => {
   if(x % 2 == 0) {
      countAcc1 += 1
      x
   } else {
       -x
   }
})
println("Value of countAcc1 is " + countAcc1)
mappedRdd.count()
println("After executing an action on mappedRdd countAcc1 is " + countAcc1)
mappedRdd.count()
println("After executing an action another time on mappedRdd countAcc1 is " + countAcc1)
val cachedRdd = mappedRdd.cache()
cachedRdd.count()
println("After invoking an action on cached RDD countAcc1 is " + countAcc1)
cachedRdd.count()
println("After invoking an action another time on cached RDD countAcc1 is " + countAcc1)


Value of countAcc1 is 0
After executing an action on mappedRdd countAcc1 is 5
After executing an action another time on mappedRdd countAcc1 is 10
After invoking an action on cached RDD countAcc1 is 15
After invoking an action another time on cached RDD countAcc1 is 15



An interesting observation of the above output is that each time an action is invoked on an RDD that's not cached, the mapping is re executed. So unless the RDD is cached in which case the mapping is invoked at least once and subsequent calls on any action of a cached RDD will not necessarily invoke the transformation and thus we will not consistently see the ``countAcc1`` incremented.

This is an important observation and also a good reason why accumulators are not really a good idea to code a mission critical business logic and should be used for debugging or non mission critical logic only. For a use case like to count the even numbers (or a similar logic), we are better off using ``reduce`` or an ``aggregate`` function on RDD amongst many possible ways.

Note that accumulators are available to be read only in driver program and not available to be read in the function running in parallel across the cluster. Simily allowing the workers to only write to the accumulator makes it easy to ensure integrity of the accumulator and not worry about propagating the state of the variable across the cluster. Following example demonstrates that the mapping function cannot read the value of the accumulator variable but just write to it.

In [33]:
val acc = sc.accumulator(0)
val mappedRdd1 = rdd.map(x => {
   if(x % 2 == 0) {
      countAcc1 += 1
      x
   } else {
      x + countAcc1.value
   }
})
try {
    mappedRdd1.count()    
} catch {
    case e: org.apache.spark.SparkException => println(e.getMessage)
    case e: Throwable => throw e
}

Job aborted due to stage failure: Task 0 in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage 26.0 (TID 52, localhost, executor driver): java.lang.UnsupportedOperationException: Can't read accumulator value in task
	at org.apache.spark.Accumulable.value(Accumulable.scala:117)
	at $line176.$read$$iw$$iw$$iw$$iw$$anonfun$1.apply$mcII$sp(<console>:28)
	at $line176.$read$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
	at $line176.$read$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1835)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1162)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1162)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sc

()


##### Accumulators and Fault tolerance.

We have already seen how the accumulator values are not reliable when used in a transformation. On each call of an action the transformation is triggered giving us an unreliable value. The value is not reliable even if the action was called only once. Its possible for large RDDs a partition might have failed or runs the operation very slowly triggering the execution of the transformation on the same chunk of data on another machine. This is how spark guarantees availibility and recovery from failure by executing the series of transformations failed chunks of data on alternate machines. When such duplicate execution of transformation occurs on chunks of data the value of the accumulator is not reliable.

To reliably use the value of the accumulator variable, it should be used in the action like ``foreach`` and not in transformation.

We can define custom Accumulators (TODO: Show with an example) provided that the operation we intend to implement is commutative and associative.


### Broadcast variables

Broadcast variables are read only variables those are available on each worker node. We will see a couple of examples and see where exactly can these be used.

In [37]:
val rnd = scala.util.Random
val someNumbers = (for (i <- 1 to 5) yield rnd.nextInt(10)).toList
println(someNumbers)

List(3, 1, 7, 9, 4)



We generated 5 numbers randomly and we now have a couple of filter operations which will give us a list of numbers excluding the above numbers from the output as follows

In [44]:
val oneToTenRdd = sc.parallelize(1 to 10)
val elevenToTwentyRDD = sc.parallelize(11 to 20)
println("Excluding someNumbers from List of numbers from 1 to 10 gives " + 
    oneToTenRdd.filter(x => !someNumbers.contains(x)).collect.toList)
    
println("Excluding someNumbers from List of numbers from 11 to 20 gives " + 
    elevenToTwentyRDD.filter(x => !someNumbers.contains(x)).collect.toList)

Excluding someNumbers from List of numbers from 1 to 10 gives List(2, 5, 6, 8, 10)
Excluding someNumbers from List of numbers from 11 to 20 gives List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)



The above code works as expected but has some potential performance issues. Imagine that the list ``someNumbers`` is really huge, or perhaps it is some lookup table. In this case for each transformation, the contents of ``someNumbers`` is serialized to all the worker nodes which is expensive and inefficient.

Its is ideal in such scenarios to broadcast such frequently read big data structures once to all the workers in the cluster who can use the local versions of this read only data structure in all the transformations executed.

Following change is all thats needed to first broadcast the list and then use it across different transformations.


In [46]:
val someNumbersBroadcast = sc.broadcast(someNumbers)
println("Excluding someNumbers from List of numbers from 1 to 10 gives " + 
    oneToTenRdd.filter(x => !someNumbersBroadcast.value.contains(x)).collect.toList)
    
println("Excluding someNumbers from List of numbers from 11 to 20 gives " + 
    elevenToTwentyRDD.filter(x => !someNumbersBroadcast.value.contains(x)).collect.toList)

Excluding someNumbers from List of numbers from 1 to 10 gives List(2, 5, 6, 8, 10)
Excluding someNumbers from List of numbers from 11 to 20 gives List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)



As we see above, the output is identical to the the version where we serialized the list for each transformation performed on the cluster on different RDDs. We first boradcast the variable we wish to read requently and use it in the transformation in the RDD by invoking the ``value`` on the broadcast variable.

By default in Java serialization is used in Java and Spark to serialize java variables which is not efficient. To speed up the serialization we may implement our own serialization logic for our datastructure by implementing  ``java.io.Externalizable`` interface or by using an alternate serialization library like Kryo which can be set in spark using the ``spark.serializer`` property. More on property setting later in another notebook.

#### Working on per partition basis

Certain operations like opening a database connection pool, initializing random number generator etc are operations we dont want to perform per element but we want to do per partition. Operations like ``map`` and ``foreach`` allos us to operatr on per partition basis. Following is an example of per partition operation 

In [18]:
oneToTenRdd.mapPartitions{
    elems =>
        val rnd = new scala.util.Random(0)
        elems.map(x => (rnd.nextInt(10), x))        
}.collect.toList

List((0,1), (8,2), (9,3), (7,4), (5,5), (0,6), (8,7), (9,8), (7,9), (5,10))

An interesting observation we can make here is that the numbers 1 to 5 probably form a partition and 6 to 10 form another partition. Both of then initialize a random number generator with seed 0 and thus each partition is paired with the generated the random numbers 0, 8, 9, 7 and 5. Similarly we have another method ``mapPartitionsWithIndex`` which provides us with a unique number per partition. As expected we see we have two partitions with index 0 and 1 in the following snippet and each partition generates the 

In [19]:
oneToTenRdd.mapPartitionsWithIndex{
    (idx, elems) =>
        val rnd = new scala.util.Random(0)
        elems.map(x => (idx, rnd.nextInt(10), x))        
}.collect.toList

List((0,0,1), (0,8,2), (0,9,3), (0,7,4), (0,5,5), (1,0,6), (1,8,7), (1,9,8), (1,7,9), (1,5,10))


#### Pipe Operations

Another interesting operation is the pipe operation which lets us pipe the contents of the RDD to another program and read the results written by the external program back in driver program.
For example, suppose we have a shell script called ``testScript.sh`` in current directory as follows

```
#!/bin/bash
while read line
do
   echo '==='${line}'==='    
done

```

We can pipe the contents of the RDD to the script which is forked by the driven program. The script reads the contents of the RDD from STDIN and writes the contents to STDOUT. This we we can stream the data between two programs. The STDOUT of the script becomes is then captured by Spark to create a new ``PipedRDD``. The performance is not great in such transformation but becomes essential when a complicated logic already implemented in a legacy system which cannot be easily replaced.

In [20]:
println(oneToTenRdd.pipe("./testScript.sh").collect.toList)

List(===1===, ===2===, ===3===, ===4===, ===5===, ===6===, ===7===, ===8===, ===9===, ===10===)



#### Numeric RDD Operations

RDDs allows us to perform gather some numeric statistics on RDDs by making a single pass over the data. Following code snippet shows us some of the statistics available to us.

In [32]:
println("1. Count is " + oneToTenRdd.count())
println("2. Mean is " + oneToTenRdd.mean())
println("3. Stdev is " + oneToTenRdd.stdev())
println("4. Max is " + oneToTenRdd.max())
println("5. Min is " + oneToTenRdd.min())
println("6. Variance is " + oneToTenRdd.variance())
println("7. Sample Variance is " + oneToTenRdd.sampleVariance())
println("8. Sample Stdev is " + oneToTenRdd.sampleStdev())


1. Count is 10
2. Mean is 5.5
3. Stdev is 2.8722813232690143
4. Max is 10
5. Min is 1
6. Variance is 8.25
7. Sample Variance is 9.166666666666666
8. Sample Stdev is 3.0276503540974917



All stats above are self explanatory with the Sample Variance and Sample Stdev taken with the denominator as N - 1 instead of N. The following code snippets computes these two stats. The Stdev is simply square root of these stats. 

In [37]:
val mean = oneToTenRdd.mean()
val count = oneToTenRdd.count()
val nr = oneToTenRdd.map(x => (x - mean) * (x - mean)).sum()
println("Variance is " + nr / count + " Sample Variance is " + nr / (count - 1))

Variance is 8.25 Sample Variance is 9.166666666666666
