# Parallel Programming in Scala 

## Instructors: Viktor Kincak and Aleksander Prokopek

> # Week 3: Data-Parallel Programming

**Author:** [Ehsan M. Kermani](https://ca.linkedin.com/in/ehsanmkermani)

Codes are available [here](https://github.com/axel22/parprog-snippets/tree/master/src/main/scala/lectures/dataparallelism)

## Task Parallelism vs. Data Parallelism:

**Task parallel programming:** Form of parallelization that distributes *execution processes* across computing nodes 

* Previous lecture was about *task* and *parallel* constructs

**Data parallel programming:** Form of parallelization that distributed *data* across computing nodes

* Different data parallel programs have different workloads (workload is a function that maps each input element to the amount of work required to process it)

* Goal of *data parallel scheduler* is to efficiently balance the workloads across processes without any knowledge of each individual workload

## Scala Data-Parallel Collections with `.par` method:
* In Scala most collection operations can become data-parallel with `.par` method

In [1]:
// example: count the number of palindroms in parallel between 1 and 1000 (exclusive) divisible by 3
(1 until 1000).par.filter(n => n % 3 == 0).count(n => n.toString == n.toString.reverse)

[36mres0[0m: Int = [32m36[0m

* However, operations such as `foldLeft, foldRight, reduceLeft, reduceRight, scanLeft, scanRight` do not run in parallel even when invoked on parallel collections, due to their signatures being *dependent* and *waiting* for the previouly computed result(s). More precisly, the signature of foldLeft is as follows: `def foldLeft[B](zero: B)(f: (B, A) => B): B`, so the inner function $f: (B, A) => B$ causes the computation to run sequentially, as it waits for the previous computation to take place to use it as its first argument to do the next computation.


* As seen in lecture 2, we can implement parallel `reduce` or in general `fold` (`
def fold[A](zero: A)(f: (A, A) => A): A`) using three-like aggregation, with the condition that the structure the neutral element $zero$ and the binary operation $f$ form a [monoid](https://en.wikipedia.org/wiki/Monoid#Definition) i.e. the existent of associativity and identity. (Commutativity of $f$ is not a requirement)


* It is possible to implement foldLeft-like operations (i.e. the operations where the resulting type can be *different* from the input type) in parallel. One such implementation is `aggregate` function containing a zero element `zero`, a sequential operation `seqOp` and parallel combine `combOp`:

```scala
def aggregate[B](zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B
```


* Functions such as `sum, max, reduce, fold, count, aggregate`  etc. returning a single value on a parallel collection is called **accessors.** (In Apache Spark terminalogy, called *actions*).

* Functions such as `map, flatMap, filter, groupBy` etc. returning a new collections as results are called **transformers.**

## Scala Sequential and Parallel Collections Hierarchy:

<img src="http://docs.scala-lang.org/resources/images/parallel-collections-hierarchy.png",
     height=1500, width=1000>
     
More details, [here](http://docs.scala-lang.org/overviews/collections/overview.html)

* For codes that are agnostic about parallelism, there exists separate hierarchy of *generic* collection traits, like `GenIterable[T], GenSet[T]`, etc. (i.e. generic collection traits allow us to write code that is unaware of parallelism)

In [2]:
load.ivy("com.storm-enroute" %% "scalameter-core" % "0.6")

:: problems summary ::
	Unable to reparse com.github.alexarchambault.jupyter#jupyter-scala-api_2.10.5;0.2.0-SNAPSHOT from sonatype-snapshots, using Sun Oct 25 12:00:40 PDT 2015

	Choosing sonatype-snapshots for com.github.alexarchambault.jupyter#jupyter-scala-api_2.10.5;0.2.0-SNAPSHOT

	Unable to reparse com.github.alexarchambault#ammonite-api_2.10.5;0.3.1-SNAPSHOT from sonatype-snapshots, using Wed Oct 21 02:44:30 PDT 2015

	Choosing sonatype-snapshots for com.github.alexarchambault#ammonite-api_2.10.5;0.3.1-SNAPSHOT

	Unable to reparse com.github.alexarchambault.jupyter#jupyter-api_2.10;0.2.0-SNAPSHOT from sonatype-snapshots, using Wed Oct 21 08:05:05 PDT 2015

	Choosing sonatype-snapshots for com.github.alexarchambault.jupyter#jupyter-api_2.10;0.2.0-SNAPSHOT





In [3]:
// Measure running times of changing two sequential collections 
// (List and (Immutable) Vector) into parallel ones. 
// See [[https://github.com/axel22/parprog-snippets/blob/master/src/main/scala/lectures/dataparallelism/Conversion.scala]]

import scala.collection._
import org.scalameter._

object Conversion {
  
  val standardConfig = config(
    Key.exec.minWarmupRuns -> 10,
    Key.exec.maxWarmupRuns -> 20,
    Key.exec.benchRuns -> 20,
    Key.verbose -> true
  ) withWarmer(new Warmer.Default)

  val array = Array.fill(10000000)("")
  val list = array.toList

  def main(args: Array[String]) {
    val listtime = standardConfig measure {
      list.par
    }
    println(s"list conversion time: $listtime ms")

    val arraytime = standardConfig measure {
      array.par
    }
    println(s"array conversion time: $arraytime ms")
    println(s"difference: ${listtime / arraytime}")
  }
  
}


[32mimport [36mscala.collection._[0m
[32mimport [36morg.scalameter._[0m
defined [32mobject [36mConversion[0m

In [4]:
Conversion.main(Array("start"))

Starting warmup.
0. warmup run running time: 221.820565 (covNoGC: NaN, covGC: NaN)
1. warmup run running time: 533.314673 (covNoGC: NaN, covGC: 0.5834)
2. warmup run running time: 2197.968641 (covNoGC: NaN, covGC: 1.0794)
3. warmup run running time: 234.193378 (covNoGC: 0.0384, covGC: 1.1861)
4. warmup run running time: 252.429242 (covNoGC: 0.0384, covGC: 1.2413)
5. warmup run running time: 236.479716 (covNoGC: 0.0342, covGC: 1.2824)
6. warmup run running time: 276.521817 (covNoGC: 0.0342, covGC: 1.2900)
7. warmup run running time: 240.561974 (covNoGC: 0.0346, covGC: 1.3051)
8. warmup run running time: 255.199001 (covNoGC: 0.0346, covGC: 1.3072)
9. warmup run running time: 235.397556 (covNoGC: 0.0302, covGC: 1.3123)
10. warmup run running time: 238.179616 (covNoGC: 0.0280, covGC: 1.3062)
11. warmup run running time: 266.96193 (covNoGC: 0.0280, covGC: 1.3908)
12. warmup run running time: 236.163903 (covNoGC: 0.0257, covGC: 0.0605)
Steady-state detected.
Ending warmup.
measurements: 234.



## Parallelizable Collections:

* Examples: the followings all have sequential counterpart, `ParArray[A], RapRange[A], ParVector[A], immutable.ParHashSet[T], immutable.ParHashMap[K, V], mutable.ParHashSet[T], mutable.ParHashMap[K, V]`


* `ParTrieMap[K, V]`: *thread-safe* parallel map with *atomic snapshots* counter part of `TrieMap[K, V]`


* Other collections, `.par` creates the closes parallel collection, e.g. `List` to `ParVector` 

## Rules Working with Parallel Collections:

### Rule 1: Avoid mutations to the same memory locations without proper synchronization

**Example: Side-effect operations **

In [5]:
// See [[https://github.com/axel22/parprog-snippets/blob/master/src/main/scala/lectures/dataparallelism/IntersectionWrong.scala]]

import scala.collection._
import org.scalameter._

object IntersectionWrong {

  def main(args: Array[String]) {
    def intersection(a: GenSet[Int], b: GenSet[Int]): Set[Int] = {
      val result = mutable.Set[Int]() // mutable culprite!
      for (x <- a) if (b contains x) result += x
      result
    }
    val seqres = intersection((0 until 1000).toSet, (0 until 1000 by 4).toSet)
    val parres = intersection((0 until 1000).par.toSet, (0 until 1000 by 4).par.toSet)
    log(s"Sequential result - ${seqres.size}")
    log(s"Parallel result   - ${parres.size}")
  }
  
}

[32mimport [36mscala.collection._[0m
[32mimport [36morg.scalameter._[0m
defined [32mobject [36mIntersectionWrong[0m

In [6]:
// see the disparity between sequential result and parallel
IntersectionWrong.main(Array("start"))
IntersectionWrong.main(Array("start"))
IntersectionWrong.main(Array("start"))

Sequential result - 250
Parallel result   - 249
Sequential result - 250
Parallel result   - 251
Sequential result - 250
Parallel result   - 250




### Solution 1: Synchronization

In [7]:
// See [[https://raw.githubusercontent.com/axel22/parprog-snippets/master/src/main/scala/lectures/dataparallelism/IntersectionSynchronized.scala]]

import scala.collection._
import scala.collection.convert.wrapAsScala._
import java.util.concurrent._
import org.scalameter._

object IntersectionSynchronized {

  def main(args: Array[String]) {
    def intersection(a: GenSet[Int], b: GenSet[Int]) = {
      val result = new ConcurrentSkipListSet[Int]()
      for (x <- a) if (b contains x) result += x
      result
    }
    val seqres = intersection((0 until 1000).toSet, (0 until 1000 by 4).toSet)
    val parres = intersection((0 until 1000).par.toSet, (0 until 1000 by 4).par.toSet)
    log(s"Sequential result - ${seqres.size}")
    log(s"Parallel result   - ${parres.size}")
  }
  
}

[32mimport [36mscala.collection._[0m
[32mimport [36mscala.collection.convert.wrapAsScala._[0m
[32mimport [36mjava.util.concurrent._[0m
[32mimport [36morg.scalameter._[0m
defined [32mobject [36mIntersectionSynchronized[0m

In [8]:
// correct
IntersectionSynchronized.main(Array("start"))
IntersectionSynchronized.main(Array("start"))
IntersectionSynchronized.main(Array("start"))

Sequential result - 250
Parallel result   - 250
Sequential result - 250
Parallel result   - 250
Sequential result - 250
Parallel result   - 250




### Solution 2: Avoiding side effects

In [9]:
// See [[https://github.com/axel22/parprog-snippets/blob/master/src/main/scala/lectures/dataparallelism/IntersectionCorrect.scala]]

import scala.collection._
import org.scalameter._

object IntersectionCorrect {

  def main(args: Array[String]) {
    def intersection(a: GenSet[Int], b: GenSet[Int]): GenSet[Int] = {
      if (a.size < b.size) a.filter(b(_))
      else b.filter(a(_))
    }
    val seqres = intersection((0 until 1000).toSet, (0 until 1000 by 4).toSet)
    val parres = intersection((0 until 1000).par.toSet, (0 until 1000 by 4).par.toSet)
    log(s"Sequential result - ${seqres.size}")
    log(s"Parallel result   - ${parres.size}")
  }
  
}

[32mimport [36mscala.collection._[0m
[32mimport [36morg.scalameter._[0m
defined [32mobject [36mIntersectionCorrect[0m

In [10]:
IntersectionCorrect.main(Array("start"))
IntersectionCorrect.main(Array("start"))
IntersectionCorrect.main(Array("start"))

Sequential result - 250
Parallel result   - 250
Sequential result - 250
Parallel result   - 250
Sequential result - 250
Parallel result   - 250




### Rule 2: Never modify a parallel collection on which a data-parallel operation is in progress

* Never write to a collection that is concurrently traversed

* Never read from a collection that is concurrently modified

* In either case, the result can be *non-deterministic*

**Example: Concurrent modifications during traversals**

In [11]:
// See [[https://github.com/axel22/parprog-snippets/blob/master/src/main/scala/lectures/dataparallelism/ParallelGraphContraction.scala]]

import scala.collection._

object ParallelGraphContraction {

  def main(args: Array[String]) {
    val graph = mutable.Map[Int, Int]() ++= (0 until 100000).map(i => (i, i + 1))
    graph(graph.size - 1) = 0
    for ((k, v) <- graph.par) graph(k) = graph(v)
    val violation = graph.find({ case (i, v) => v != (i + 2) % graph.size })
    println(s"violation: $violation")
  }
}

[32mimport [36mscala.collection._[0m
defined [32mobject [36mParallelGraphContraction[0m

In [12]:
// violations exit!
ParallelGraphContraction.main(Array("start"))
ParallelGraphContraction.main(Array("start"))
ParallelGraphContraction.main(Array("start"))

violation: Some((41623,41627))
violation: Some((41623,41626))
violation: Some((41623,41626))




### TrieMap Collection is an *exception* to above rules because of the snapshot method used to correctly grab the current state

* snapshot is of $O(1)$

In [13]:
// See [[https://github.com/axel22/parprog-snippets/blob/master/src/main/scala/lectures/dataparallelism/ParallelTrieMapGraphContraction.scala]]

import scala.collection._

object ParallelTrieMapGraphContraction {

  def main(args: Array[String]) {
    val graph = concurrent.TrieMap[Int, Int]() ++= (0 until 100000).map(i => (i, i + 1))
    graph(graph.size - 1) = 0
    val previous = graph.snapshot()
    for ((k, v) <- graph.par) graph(k) = previous(v)
    val violation = graph.find({ case (i, v) => v != (i + 2) % graph.size })
    println(s"violation: $violation")
  }

}

[32mimport [36mscala.collection._[0m
defined [32mobject [36mParallelTrieMapGraphContraction[0m

In [14]:
ParallelTrieMapGraphContraction.main(Array("start"))
ParallelTrieMapGraphContraction.main(Array("start"))
ParallelTrieMapGraphContraction.main(Array("start"))

violation: None
violation: None
violation: None




## Data-Parallel Abstractions:

* Iterators
* Splitters
* Builders
* Combiners

Follow abstraction are simplified:

### Iterator:

```scala
trait Iterator[A] {
    def next: A
    def hasNext: Boolean
}

def iterator: Iterator[A] // must be implemented on every collection
```

* `next` can be called only if `hasNext` return `true`
* after `hasNext` returns `false`, it will always return `false`

### Splitter:

```scala
trait Splitter[A] extends Iterator[A] {
    def split: Seq[Splitter[A]]
    def remaining: Int
}

def splitter: Splitter[A] // must be implemented on every *parallel* collection
```

* after calling `split`, the original splitter is left in an undefined state
* the resulting splitters travere disjoint subsets of the original splitter
* `remaining` is an estimate on the number of remaining elements
* `split` is an efficient method $O(\log n)$ or better

### Builder:

```scala
trait Builder[A, Repr] { // Repr denotes the type of collection that Builder creates
    def +=(elem: A): Builder[A, Repr]
    def result: Repr
}
def newBuilder: Builder[A, Repr] // must be defined on every collection

```
* calling `result` returns a collection of type `Repr`, containing the elements tha were previously added with `+=`
* calling `result` leaves the `Builder` in an undefined state

### Combiner:

```scala
trait Combiner[A, Repr] extends Builder[A, Repr] {
    def combine(that: Combiner[A, Repr]): Combiner[A, Repr] 
}

def newCombiner: Combiner[T, Repr] // must be implemented on every *parallel* collection

```

* calling `combine` returns a new `Combiner` that contains elements of input combiners
* calling `combine` leaves both original `Combiner`s in an undefined state
* `combine` is an efficient method $O(\log n)$ or better