# Parallel Programming in Scala 

## Instructors: Viktor Kincak and Aleksander Prokopek

> # Week 4: Data Structures for Parallel Computing

**Author:** [Ehsan M. Kermani](https://ca.linkedin.com/in/ehsanmkermani)

Codes are available [here](https://github.com/axel22/parprog-snippets/tree/master/src/main/scala/lectures/algorithms)

## Implementing Combiners:

* Recall from the previous lecture, `Builder`s can only be used for sequential transformer (i.e. `map, flatMap, filter` etc.) operations. 

### Builder:

```scala
trait Builder[A, Repr] { // Repr denotes the type of collection that Builder creates
    def +=(elem: A): this.type
    def result: Repr
}
```

* To build a *parallel* transformation combiner, we use the `Combiner` abstraction.

### Combiner:

```scala
trait Combiner[A, Repr] extends Builder[A, Repr] {
    def combine(that: Combiner[A, Repr]): Combiner[A, Repr] 
}

```

**Question:** How to implement an efficient `Combiner`?

*Answer:* Let's investing into some possible senarios first.

* If `Repr` is a `Set` or `Map`, `combine` represents union
* If `Repr` is a sequence, `combine` represents concatenation
* `combine` must be of $O(\log n + \log m),$ where $n,m$ are the size of inputs
* `Array`s cannot be efficiently concatenated (due to occupying  contigious blocks of memory)

#### `Set` data structures have efficient lookup, insertion and deletion
    
    1. Hash tables have expected $O(1),$ lookup, insert and delete
    2. Balanced search trees have $O(\log n),$ lookup, insert and delete
    3. Linked lists have $O(n),$ lookup, insert and delete

* Most standard `Set` implemetations do not have efficient *union* operation

#### `Sequences`

    1. Mutable linked lists have $O(1),$ prepend and append complexity while $O(n),$ insertion
    2. Functional (cons) linked lists have only $O(1),$ prepend complexity and everything else is of $O(n),$
    3. Array lists have *amotized* $O(1),$ append and random access, otherwise $O(n),$

* Mutable linked lists have $O(1),$ concatenation while for most other sequences is of $O(n),$


## Two-Phase Parallel Construction:

* Most data structures can be constructed in parallel using the technique called *two-phase construction* through additional intermediate data structure with the following characteristics:

    * The intermediate data structure has an efficient `combine` method of $O(\log n + \log m),$ or better
    * The intermediate data structure has an efficient `+=` method
    * Can be converted to the resulting data structure in $O(n/p),$ where $n$ is the size of the data structure and $p$ is the number of processors
    
The following is an example for concatenating two arrays with *two-phase construction* `ArrayCombiner`:

In [1]:
load.ivy("com.storm-enroute" %% "scalameter-core" % "0.6")

:: problems summary ::
	Unable to reparse com.github.alexarchambault.jupyter#jupyter-scala-api_2.10.5;0.2.0-SNAPSHOT from sonatype-snapshots, using Sun Oct 25 12:00:40 PDT 2015

	Choosing sonatype-snapshots for com.github.alexarchambault.jupyter#jupyter-scala-api_2.10.5;0.2.0-SNAPSHOT

	Unable to reparse com.github.alexarchambault#ammonite-api_2.10.5;0.3.1-SNAPSHOT from sonatype-snapshots, using Wed Oct 21 02:44:30 PDT 2015

	Choosing sonatype-snapshots for com.github.alexarchambault#ammonite-api_2.10.5;0.3.1-SNAPSHOT

	Unable to reparse com.github.alexarchambault.jupyter#jupyter-api_2.10;0.2.0-SNAPSHOT from sonatype-snapshots, using Wed Oct 21 08:05:05 PDT 2015

	Choosing sonatype-snapshots for com.github.alexarchambault.jupyter#jupyter-api_2.10;0.2.0-SNAPSHOT





In [2]:
/**
  Taken from [[https://github.com/axel22/parprog-snippets]]
*/

import java.util.concurrent._
import scala.util.DynamicVariable

object common {

  val forkJoinPool = new ForkJoinPool

  abstract class TaskScheduler {
    def schedule[T](body: => T): ForkJoinTask[T]
    def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
      val right = task {
        taskB
      }
      val left = taskA
      (left, right.join())
    }
  }

  class DefaultTaskScheduler extends TaskScheduler {
    def schedule[T](body: => T): ForkJoinTask[T] = {
      val t = new RecursiveTask[T] {
        def compute = body
      }
      Thread.currentThread match {
        case wt: ForkJoinWorkerThread =>
          t.fork()
        case _ =>
          forkJoinPool.execute(t)
      }
      t
    }
  }

  val scheduler =
    new DynamicVariable[TaskScheduler](new DefaultTaskScheduler)

  def task[T](body: => T): ForkJoinTask[T] = {
    scheduler.value.schedule(body)
  }

  def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
    scheduler.value.parallel(taskA, taskB)
  }

  def parallel[A, B, C, D](taskA: => A, taskB: => B, taskC: => C, taskD: => D): (A, B, C, D) = {
    val ta = task { taskA }
    val tb = task { taskB }
    val tc = task { taskC }
    val td = taskD
    (ta.join(), tb.join(), tc.join(), td)
  }
}

[32mimport [36mjava.util.concurrent._[0m
[32mimport [36mscala.util.DynamicVariable[0m
defined [32mobject [36mcommon[0m

In [3]:
// See [[https://github.com/axel22/parprog-snippets/blob/master/src/main/scala/lectures/algorithms/ArrayCombiner.scala]]

import scala.collection.parallel.Combiner
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.scalameter._
import common._

class ArrayCombiner[T <: AnyRef: ClassTag](val parallelism: Int)
extends Combiner[T, Array[T]] {
  private var numElems = 0
  private val buffers = new ArrayBuffer[ArrayBuffer[T]]
  buffers += new ArrayBuffer[T]

  def +=(elem: T) = { // amotized O(1)
    buffers.last += elem
    numElems += 1
    this
  }

  def combine[N <: T, That >: Array[T]](that: Combiner[N, That]) = { // takes O(p)
    (that: @unchecked) match {
      case that: ArrayCombiner[T] =>
        buffers ++= that.buffers
        numElems += that.numElems
        this
    }
  }

  def size = numElems

  def clear() = buffers.clear()

  private def copyTo(array: Array[T], from: Int, end: Int): Unit = {
    var i = from
    var j = 0
    while (i >= buffers(j).length) {
      i -= buffers(j).length
      j += 1
    }
    var k = from
    while (k < end) {
      array(k) = buffers(j)(i)
      i += 1
      if (i >= buffers(j).length) {
        i = 0
        j += 1
      }
      k += 1
    }
  }

  def result: Array[T] = {
    val step = math.max(1, numElems / parallelism)
    val array = new Array[T](numElems)
    val starts = (0 until numElems by step) :+ numElems
    val chunks = starts.zip(starts.tail)
    val tasks = for ((from, end) <- chunks) yield task {
      copyTo(array, from, end)
    }
    tasks.foreach(_.join())
    array
  }

}

object ArrayCombiner {

  val standardConfig = config(
    Key.exec.minWarmupRuns -> 20,
    Key.exec.maxWarmupRuns -> 40,
    Key.exec.benchRuns -> 60,
    Key.verbose -> true
  ) withWarmer(new Warmer.Default)

  def main(args: Array[String]) {
    val size = 1000000

    def run(p: Int) {
      val taskSupport = new collection.parallel.ForkJoinTaskSupport(
        new scala.concurrent.forkjoin.ForkJoinPool(p))
      val strings = (0 until size).map(_.toString)
      val time = standardConfig measure {
        val parallelized = strings.par
        parallelized.tasksupport = taskSupport
        def newCombiner = new ArrayCombiner(p): Combiner[String, Array[String]]
        parallelized.aggregate(newCombiner)(_ += _, _ combine _).result
      }
      println(s"p = $p, time = $time ms")
    }

    run(1)
    run(2) // almost linear speedup
    run(4) // memory bottleneck
    run(8) // may even see loss in performance
  }

}

[32mimport [36mscala.collection.parallel.Combiner[0m
[32mimport [36mscala.collection.mutable.ArrayBuffer[0m
[32mimport [36mscala.reflect.ClassTag[0m
[32mimport [36morg.scalameter._[0m
[32mimport [36mcommon._[0m
defined [32mclass [36mArrayCombiner[0m
defined [32mobject [36mArrayCombiner[0m

In [4]:
// java memory error!
// ArrayCombiner.main(Array("start"))

### More on two-phase construction:

#### `ArrayCombiner`:

1. Partition the indices into subintervals
2. Initialize the array in parallel

#### `Hash Tables`:

1. Partition the hash codes into buckets according to hash prefix
2. Allocate the table and map hash codes from different buckets into different regions

#### `Search Trees`:

1. Partition the elements into non-overlapping intervals according to their ordering
2. Costruct search trees in parallel and link non-overlapping trees

## Answer to efficient implementation:

* *Two-phase construction* (as see above)
* When the data structure itself allows efficient concatenation (see next part)
* *Concurrent data structures* where different combiners share the same underlying data structure and rely on **synchronization** to correctly update the data structure when `+=` is called

## Conc - Tree:

* **Conc data type** is the parallel counterpart of the functional cons `List`

* `List`s are built for sequential computations while trees allow parallel computations when they can be sufficiently balanced

* The following simple Tree abstraction, is *not* suitable for parallel computation as the balance between left and right subtrees can be violated, thus leaving us with sequential structure!

```scala
sealed trait Tree[+T]
case class Node[T](left: Tree[T], right: Tree[T]) extends Tree[T]
case class Leaf[T](elem: T) extends Tree[T]
case object Empty extends Tree[Nothing]
```

* `Conc` data type (aka conc-tree) is the remedy for making tree balanced

```scala

sealed trait Conc[+T] {
    val level: Int // longest path from root to leaves
    val size: Int // number of elements in the (sub)tree
    left: Conc[T]
    right: Conc[T]
}

```

One concrete implementation is as follows

```scala

case object Empty extends Conc[Nothing] {
    def level = 0
    def size = 0
}

class Single[T](elem: T) extends Conc[T] {
    def level = 0
    def size = 1
}

case class <>[T](left: Conc[T], right: Conc[T]) extends Conc[T] {
    val level = 1 + math.max(left.level, right.level)
    val size = left.size + right.size
}

```

### Invariants:

1. A `<>` node can never contain `Empty` as its subtree
2. The leve difference between `left` and `right` subtree of a `<>` node is always $1$ or less (guarantees balance)

Resemebles, [AVL-tree](https://en.wikipedia.org/wiki/AVL_tree), but *not necessary* a binary search tree

See [this](https://github.com/axel22/parprog-snippets/blob/master/src/main/scala/lectures/algorithms/Conc.scala) for more details.

The complexity of concatenation is of $O(\mid h_1 - h_2\mid),$ where $h_1, h_2$ are heights of left and right subtrees.

## $O(1),$-amortized append operation:

* This is crucial for implementing `Combiner` efficiently

We can implement the `+=` operation in `Combiner` as follows with $O(\log n),$ complexity

```scala
val xs: Conc[T] = Empty
def +=(elem: T) {
    xs = xs <> Single(elem)
}
```

However, by relaxing the above `Conc` data type and add new `Append` node, we can achieve $O(1),$ appends with low constant factor.

```scala
// relaxes invariant 2 by allowing appending any two Conc trees
case class <>[T](left: Conc[T], right: Conc[T]) extends Conc[T] {  
    val level = 1 + math.max(left.level, right.level)
    val size = left.size + right.size
}
```

Now, we can allocate *constant* (here $2$) objects, when appending a single element to a tree and $O(1),$ operations.

```scala
def appendLeaf[T](xs: Conc[T], y: T): Conc[T] = new Append(xs, new Single(y)) 
```

But, we need to somehow tranfrom back and eliminate `Append` nodes in $O(\log n),$ to achieve $O(1),$ complexity. To be able to do that, we impose another condition that for a tree with $n$ elements, we **don't** allow *more than* $O(\log n),$ `Append` nodes. Otherwise, we will be stuck removing `Append` nodes taking more time than necessary!

We can see that adding $n$ leaves to an `Append` list, requires $O(n),$ work, thus the amortized of $O(1),$ for adding a leaf. And storing $n$ leaves requires $O(\log n),$ `Append` nodes.

Now we can reimplement `appendLeaf` as follows

```scala
def appendLeaf[T](xs: Conc[T], ys: Single[T]): Conc[T] = xs match {
    case Empty => y2
    case xs: Single[T] => new <>(xs, ys)
    case _ <> _ => new Append(xs, ys)
    case xs: Append[T] => append(xs, ys)
}
```
See `append` implementation and more via *tail recursion* [here](https://github.com/axel22/parprog-snippets/blob/master/src/main/scala/lectures/algorithms/Conc.scala) for more details.

Finally, we have implemented a *immutable* data structure with, $O(1),$-**amortized** *append* and $O(\log n),$ concatenation.

## Conc - Tree Combiners:

The following is about the *mutable* implementation of the previous `Conc` tree.

### ConcBuffer:

* `ConcBuffer` appends elements into an array of size $k$
* When the array gets full, it's stored into a `Chunk` node and added to `Conc-tree`

```scala
class ConcBuffer[T: ClassTag](val k: Int, private var conc: Conc[T]) {
    private var chunk: Array[T] = new Array(k)
    private var chunkSize: Int = 0
}
```
If the `chunk` is full, we `expand` and push the array into the `Conc-tree`

```scala
final def +=(elem: T): Unit = {
    if (chunkSize > k) expand()
    chunk(chunkSize) = elem
    chunkSize += 1
}
```

### Chunk node:

* `Chunk` node is similar to `Single` node, but instead of a single element, they hold an array of elements

```scala
class Chunk[T](val array: Array[T], val size: Int) extends Conc[T] {
    def level = 0
}
```

Now with `Chunk` node, `expand` is simple

```scala
private def expand() {
    conc = appendLeaf(conc, new Chunk(chunk, chunkSize))
    chunk = new Array(k)
    chunkSize = 0
}
```

Hence,

```scala
final def combine(that: ConcBuffer[T]): ConcBuffer[T] = {
    val combinedConc = this.result <> that.result
    new ConcBuffer(l, combinedConc)
}

// result packs `chunk` array into the tree and returns the resulting tree
def result: Conc[T] = {
    conc = appendLeaf(conc, new Conc.Chunk(chunk, chunkSize))
    conc
}
```