# Parallel Programmin in Scala 

## Instructors: Viktor Kincak and Aleksander Prokopek

# Week 1: Summary

**Author:** [Ehsan M.Kermani](https://ca.linkedin.com/in/ehsanmkermani)


# Parallel vs. Concurrent

## Parallel: uses parallel hardware to execute computation more quickly. Efficiency is its main concern.

* Divisions into subproblems
* Optional use of parallel hardware
* Usage: Numerical, simulation and Big Data applications
* Concerns with Speed-up

## Concurrent: may or may not use multiple executions at the same time. Improves modularity, responsiveness and maintainability.

* When can an execution start?
* How can information exchange occur?
* How to manage access to shared resources?
* Usage: Web servers, user interface and databases
* Concers with convenience

# Parallelism Granularity

* Bit-level: processing multiple bits of data in parallel --> hardware
* Structure-level: excuting different instructions from the same instruction stream in parallel --> hardware
* Task-level: excuting separate instruction streams in parallel --> software [Course Focus]

## NOTE: Course focus: programming multi-cores and SMPs (symmetric multi-processors) with shared memory

# Parallelism on JVM

## Process: An instance of a program executing in OS 

* OS assignes resources, such as execution time on cpu, memory address space, etc.
* Each process has a **unique** process id
* Process is the most coarsed-grained unit in shared memory system
* Multitasking: dividing different processes into (consecutive) time slices for execution
* Each process is **isolated** from other processes, i.e. two different processes cannot access each other memory

## Thread: 

* Each process can contain multiple independent concurrency units called thread
* Threads can be started from the same program and can share the same memory address
* Each thread has a program counter (i.e. position of the current method) and program stack (i.e. sequence of memory containing methods being executed)
* Different threads cannot modify each other program stacks
* Communication happens by modifying heap-memory

### Create and Start Threads:

* Each **JVM process** starts with a **main thread** (sequential only uses main method to run, parallel must start
additional threads
* Starting additional thread:

    1. Define a *Thread* subclass
    2. Instatiate a new *Thread* object (tracking the execution)
    3. Call *start* on the *Thread* object
    
* A custom Thread subclass can be used to start multiple threads 

In [2]:
/** HelloThread **/

/* 1. Define */
class HelloThread extends Thread {
    override def run() = {
        println("Hello, world222")
    }
}

/*2. Instantiate*/
val t = new HelloThread

/*3. Start*/
t.start()

/** When main thread calls join, it blocks its execution until t completes.
    Then after t completes, main thread continues
**/
t.join()

Hello, world222


defined [32mclass [36mHelloThread[0m
[36mt[0m: [32m$user[0m.[32mHelloThread[0m = Thread[Thread-14,5,]

## Threads can overlap:

In [3]:
class HelloThread extends Thread {
    override def run() = {
        println("Hello")
        println("world")
    }
}

def main() = {
    val t = new HelloThread
    val s = new HelloThread
    t.start()
    s.start()
    t.join()
    s.join()
    println("\n")
}

/*execute as many time as necessary to see the effect*/
main()
main()
main()

Hello
world
Hello
world


Hello
world
Hello
world


Hello
world
Hello
world




defined [32mclass [36mHelloThread[0m
defined [32mfunction [36mmain[0m

To ensure a sequence of statements in a specific thread executes at once

## Atomicity:

* An operation is **atomic** if it appears *as if* it occured instantaneously from the view point of other threads 

In [None]:
/*NOT atomic*/
private var uidCount = 0L
def getUniqueId(): Long = {
    uidCount = uidCount + 1
    uidCount
}

def startThread() = {
    val t = new Thread {
        override def run() = {
            val uids = for(i <- 1 until 10) yield getUniqueId()
            println(uids)
        }
    }
    t.start()
    t
}

startThread()
startThread()

so the results are not disjoint from each other! because `getUniqueId` method is **NOT** atomic and separate `uidCount` can *interleave* arbitrarily when executed on different threads 

## Synchronized block: to enforce atomicity

* A code block after a `synchronized` call on an *object* x, is **never** executed by two different threads at the same time

* JVM ensures this by storing a *monitor* (which is a resource) in each object
* At most *one thread* can own a monitor at a particular time
* A sybchronized block is an example of a *synchronized primitive* (allowing different threads to exchange info)

NOTE: *synchronized method* **must** be invoked on an instance of some object

In [None]:
private val x = new AnyRef {} /* just an object to invoke synchronized on*/
private var uidCount = 0L
/* make getUniqueId atomic*/
def getUniqueId(): Long = x.synchronized {
    uidCount = uidCount + 1
    uidCount
}

def startThread() = {
    val t = new Thread {
        override def run() = {
            val uids = for(i <- 1 until 10) yield getUniqueId()
            println(uids)
        }
    }
    t.start()
    t
}

startThread()
startThread()

## Compose (nest) synchronized blocks:

In [None]:
/** Does NOT terminate: it is a Deadlock **/

/**
    Don't want to have a global synchronized block as it becomes the bottleneck in parallel execution
**/
class Account(private var amount: Int = 0) {
    
    def transfer(target: Account, n: Int) = 
        this.synchronized { /* first monitor this*/
            target.synchronized { /* second monitor both*/
                this.amount -= n
                target.amount += n 
            }
        }
}

def startThread(a: Account, b: Account, n: Int) = {
    val t = new Thread {
        override def run() = {
            for(i <- 1 until n) {
                a.transfer(b, 1)
            }
        }
    }
    t.start()
    t
}

val a1 = new Account(100)
val a2 = new Account(200)

val t = startThread(a1, a2, 50)
val s = startThread(a2, a1, 50)

t.join()
s.join()

Above doesn't terminate, because the main thread program is **blocked** waiting for t, s to complete and in fact, the threads t, s never complete and just **waiting** for each other to finish (Deadlock)

## Deadlock: A senario where two or more threads compete for *resources* (such as monitor ownership)  and wait for each other to finish *without releasing* the already aquired resources

## Resolving Deadlock:

* One approach is to define ordering on resources and aquire them accordingly

In [1]:
class Account(var amount: Int = 0) {
    
    private var uidCount = 0L
    private def getUniqueId(): Long =  this.synchronized {
    uidCount = uidCount + 1
    uidCount
    }
    
    private def lockAndTransfer(target: Account, n: Int) = {
        this.synchronized {
            target.synchronized {
                this.amount -= n
                target.amount += n
            }
        }
    }
    
    def transfer(target: Account, n: Int) = 
        if (this.getUniqueId() < target.getUniqueId()) { 
            lockAndTransfer(target, n)
            println(s"target amount after transfer: $target")
        }
        else {
            target.lockAndTransfer(this, -n)
            println(s"target amount after transfer: $target")
        }
    
    override def toString() = amount.toString()
}

def startThread(a: Account, b: Account, n: Int) = {
    val t = new Thread {
        override def run() = {
            println(s"target amount before transfer: $b")
            a.transfer(b, n)
        }
    }
    t.start()
    t
}


val a1 = new Account(100)
val a2 = new Account(200)

val t = startThread(a1, a2, 50)
val s = startThread(a2, a1, 50)

t.join()
s.join()

target amount before transfer: 200
target amount before transfer: 100
target amount after transfer: 100
target amount after transfer: 200


defined [32mclass [36mAccount[0m
defined [32mfunction [36mstartThread[0m
[36ma1[0m: [32m$user[0m.[32mAccount[0m = 100
[36ma2[0m: [32m$user[0m.[32mAccount[0m = 200
[36mt[0m: [32mThread[0m = Thread[Thread-13,5,]
[36ms[0m: [32mThread[0m = Thread[Thread-14,5,]

## Memory model:

Specific set of rules describing how threads can interact when accessing share memory such as

* Two threads writing to separate locations in memory don't need synchronization
* A thread $X$ that calls `join` on other thread $Y$ is *guaranteed to observe* all the writes by thread $Y$ after `join` returns

## Parallel Construct:

If we have a parallel construct `parallel` taking to functions and executing them in parallel, we would expect to 
have the following signature:

`def paralle[A, B](taskA: => A, taskB: => B): (A, B) = {...}`

It's crucial `taskA, taskB` are evaluated by *name*, otherwise, it'd just be the same as sequantial executions!

The *minimum* time of parallel execution of `taskA, taskB` is the *maximum* time of separate executions of `taskA, taskB`.

NOTE: In parallel applications, be aware of bottlenecks, such as, memory, network band-width (in cluster computing), etc.

## More Flexible Construct with Task:

* `t = task(e)` is a *task* which starts computation in background
* Current (other) computation proceeds in parallel with t
* `t.join()` obtains the result of the computation
* `t.join()` *blocks* and waits until the result is computed
* Subsequent `t.join()` calls quickly return the same results

### Minimal interface:

```scala
trait Task[A] {
    def join(): A
}
def task(c: => A): Task[A]
```

where `task` and `join` establish **maps** between *computations* and *tasks*. 

**Identity:** `task(e).join()` equals evaluating `e.`

Omit `join` by implicit conversion: `implicit def getJoin[T](x: Task[T]) = x.join()`

Now, we can use `task` to define `parallel` construct as follows;

```scala
def parallel[A, B](cA: => A, cB: => B): (A, B) = {
     val t: Task[A] = task(cA)
     val s: A = cB /* computes cB directly*/
     (t.join(), s)
 }
```

However, if we'd called `join()` earlier in when computing `val t = task(cA).join()`, that would indeed result in *sequential* computation, not parallel!

## Running Time Analysis:

* Sequential `reduce()` (contains, `sum(), max(), min()`, etc.) is of $O(n)$
* Parallel `reduce()` is of $O(\log n)$, given enough resources and in general, $c_1\log n + c_2\dfrac{n}{p} + c_3 ,$ given $p$ processors

### Definition:

* W(e): time for sequential execution of e 
* D(e): time for parallel execution of e as if there're unlimited number of resources

Then

* $W(\text{parallel}(e_1, e_2)) = W(e_1) + W(e_2) + c$ some constant $c$
* $D(\text{parallel}(e_1, e_2)) = \max(D(e_1), D(e_2)) + c'$

and for function call $f(e_1, \cdots, e_n)$ (assuming arguments are all call-by-value!)

* $W(f(e_1, \cdots, e_n)) = W(e_1) + \cdots + W(e_n) + W(f)(v_1, \cdots, v_n)$
* $D(f(e_1, \cdots, e_n)) = D(e_1) + \cdots + D(e_n) + D(f)(v_1, \cdots, v_n)$

where $v_i$ is the value of $e_i.$ 

Assuming constant communication time, given $p$ processors, we'd have:

$$D(e) \leq D(e) + \dfrac{W(e)}{p} \leq W(e)$$

* If $p$ is constant and input $e$ grows, then $D(e) + \dfrac{W(e)}{p} \rightarrow W(e)$ sequential
* If $e$ is constant but $p$ grows, then $D(e) + \dfrac{W(e)}{p} \rightarrow D(e)$

## Amdahl's Law:

If $f$ is the sequential fraction of a computation that cannot be made faster, and we can make the remain $1-f$ fraction, $p$ times faster, the **speedup** will be *bounded* by

$$\dfrac{1}{f + \left(\dfrac{1-f}{p} \right)}$$

Note: Amdahl's law is based on fixed input size. More realist bound is given by [Gustafson's law](https://en.wikipedia.org/wiki/Gustafson%27s_law).

## Performance Factors in Benchmarking:

* Processors speed
* Number of processors
* Memory latency and throughput
* Cache behavior
* Runtime behavior (e.g. garbage collection, JIT compilation, etc.)

For more details, see [What Every Programmer Should Know About Memory](https://people.freebsd.org/~lstewart/articles/cpumemory.pdf)

For measurement methodologies, see [Statistically Rigorous Java Performance Evaluation](https://buytaert.net/files/oopsla07-georges.pdf)

### ScalaMeter: benchmarking and performance regression testing framework on JVM

Add dependency by to sbt:

`libraryDependencies += "com.storm-enroute" %% "scalameter-core" % "0.6"`

then

`import org.scalameter._`

`val time = measure {
    (0 until 1000000).toArray
}`

and to warm-up JVM

`
val time = withWarmer(new Warmer.Default) measure {
    (0 until 1000000).toArray
}
`
or can add more configuration. For more details, see [ScalaMeter](https://scalameter.github.io/)