---

# Barriers, Distributed Transactions and Self-Stabilization
### Liang Xu, April 2022

---

## Barrier

##### Sample scenario
In previous presentation a parallel sum algorithm is presented, it divides a huge array into several smaller parts and calculate the sum of these smaller parts concurrently to construct a smaller array or compute the final result.

```
for i:= start to end
    sumTask₀ ‖ sumTask₁ ‖ sumTask₂ ‖ ... ‖ sumTaskₙ₋₁
output sum
```

For this algorithm, in each round, we always need to wait until all threads complete their work before we can enter the next round or move to final step. Each thread may need different amounts of time to execute which might leads to synchronization problems, and a barrier could us help to prevent this problem.



##### Barrier
- Keeps track of whether all threads have completed or not;
- Stop other processes until the last process has completed.


##### Two approaches of waiting
- Spinning on local or remote variables;
- Falling asleep and being woken up later.



### Basic Barrier

A very basic implementation of barrier
- `count`: track the number of threads reached;
- `size`: record the size of whole group of threads;
- `myWait`: increase count by 1, make threads spin on count until the last thread arrives.

In [6]:
%%writefile BasicBarrier.java
public class BasicBarrier{
    AtomicInteger count;
    int size;
    public SimpleBarrier(int n){ 
        count = new AtomicInteger(0); 
        size = n;
    } 
    public void myWait() {
        int position = count.getAndIncrement(); 
        if (position != size-1) { 
            while (count.get() != size){
                synchronized(this){
                    try{wait();}catch(Exception e) {}
                } 
            }
        } else {
            notifyAll();
        }
    }
}

Writing BasicBarrier.java


Drawback: 
- Could not be re-used (may lead to starvation if reset count)

In [7]:
%%writefile BasicBarrier.java
public class BasicBarrier{
    AtomicInteger count;
    int size;
    public SimpleBarrier(int n){ 
        count = new AtomicInteger(0); 
        size = n;
    } 
    public void myWait() {
        int position = count.getAndIncrement(); 
        if (position != size-1) { 
            while (count.get() != size){
                synchronized(this){
                    try{wait();}catch(Exception e) {}
                }
            }  
        } else {
            notifyAll();
            count.set(0);   // happens before all other threads leave the while loop  ->  starvation
        }
    }
}

Overwriting BasicBarrier.java


### Sense-reversing Barrier

Elegant and practical solution for reusing barrier

New variables used in this algorithm:
- `globalSense`: a boolean variable owned by barrier, indicate even-numbered phases or odd-numbered phases;
- `localSence`: a boolean variables owned by each thread.

Instead of using the variable count, both the globalSense and the localSense will be used to determine whether the barrier gate is open or not.

```algorithm
var count: integer = 0
var size: integer = N
var globalSense: boolean = false
```

<div style="float:left;border-left:2em solid white">

```algorithm
procedure wait(localSense: boolean)
    var position := count
    ⟨count := count + 1⟩
    if position ≠ (size - 1) then
        await localSense = globalSense
    else
        count := 0
        globalSense := ¬globalSense
    
```
</div>

<div>
<img src="img/figure1.svg"style="float:left;width: 400px;"/> 
</div>

####  Barrier using Semaphore vs. Sense-reversing Barrier

A semaphore barrier with N processes could be transformed to N sense-reversing barrier
- Similarity
    1. Release of semaphore: process arrives its barrier and increase the local count
    2. Acquire of semaphore: process spins on the count variables of all other barriers
- Difference
    1. A process arrive the semaphore barrier don't spin on its sense field, instead it needs to spin on the sense fields of all other barriers
    2. In a Semaphore barrier, its variable count is visible to all process but could only be modified by its corresponding process. In Sense-reversing barrier, variable count is visible and could be modified by all processes (atomicity is required to keep thread safety)

    
<img src="img/figure38.svg"/> 

In [1]:
%%writefile SenseRevBarrier.java

import java.util.concurrent.atomic.AtomicInteger;

class Barrier{
    AtomicInteger count;
    int size;
    boolean globalSense;
    public Barrier(int n){ 
        count = new AtomicInteger(0); 
        size = n;
        globalSense = false;
    } 
    public boolean myWait(boolean localSense) {
        int position = count.getAndIncrement(); 
        if (position != size-1) { 
            while (localSense != globalSense){
                synchronized(this){
                    try{wait();}catch(Exception e) {}
                }
            }; 
        } else {
            count.set(0); 
            globalSense = localSense;
            synchronized(this){notifyAll();}
        }
        return !localSense;
    }
}
class Worker extends Thread {
    Barrier b;
    boolean localSense; 
    int id;
    public Worker(Barrier b, int i) {
        this.b = b;
        this.id = i;
        this.localSense = true;
    }
    public void run() {
        for (int i = 0; i <= 5; i++) {
            System.out.println("thread id: " + id + ", number of phase: " + i + ", localSense is: " + localSense + ", globalSense is: " + b.globalSense);
            localSense = b.myWait(localSense);
        }
    }
}
public class SenseRevBarrier {
    static int N = 4;
    public static void main(String args[]) {
        Barrier b = new Barrier(N);
        Thread w[] = new Thread[N];

        for (int i = 0; i < N; i++) {
            w[i] = new Worker(b, i);
            w[i].start();
        }
        for (int i = 0; i < N; i++) {
            // wait for all threads to terminate
            try {w[i].join();
            } catch (Exception e) {}
        }
    }
}

Overwriting SenseRevBarrier.java


In [2]:
!javac SenseRevBarrier.java

In [3]:
!java SenseRevBarrier

thread id: 2, number of phase: 0, localSense is: true, globalSense is: false
thread id: 0, number of phase: 0, localSense is: true, globalSense is: false
thread id: 3, number of phase: 0, localSense is: true, globalSense is: false
thread id: 1, number of phase: 0, localSense is: true, globalSense is: false
thread id: 1, number of phase: 1, localSense is: false, globalSense is: true
thread id: 3, number of phase: 1, localSense is: false, globalSense is: true
thread id: 2, number of phase: 1, localSense is: false, globalSense is: true
thread id: 0, number of phase: 1, localSense is: false, globalSense is: true
thread id: 0, number of phase: 2, localSense is: true, globalSense is: false
thread id: 1, number of phase: 2, localSense is: true, globalSense is: false
thread id: 3, number of phase: 2, localSense is: true, globalSense is: false
thread id: 2, number of phase: 2, localSense is: true, globalSense is: false
thread id: 2, number of phase: 3, localSense is: false, globalSense is: true

### Combining Tree Barrier

Combining Tree Barrier is a hierarchical method, by splitting a large barrier into several smaller barriers.
- Use a tree structure to reduce contention on the sense field
- Each node represents a sense-reversing barrier
- Each thread spinning on the global sense field of its node


#### High Contention vs Low Contention
<img src="img/figure2.svg"/>

#### How combining tree barrier works

Let `d` be the depth of tree, `r` be the number of children of each node, a combining tree barrier is able to handle `rᵈ⁺¹` threads.
- The first `r-1` processes to each node will spin on the sense field of this node until it matches their local sense.
- The `rₜₕ` process to the node will move to its parent.
- The `rₜₕ` process to the root will reset count and reverse sense of root.
- Every processes will move to every nodes they visited before, reset count and reverse sense of nodes.
- All processes resume the execution since all sense are reversed.

```algorithm
var count: integer = 0
var size: integer = N
var nodeSense: boolean = false
var parentNode
```

<div style="float:left;border-left:2em solid white">

```algorithm
procedure myWait(localSense: boolean)
    var position := count
    ⟨count := count + 1⟩
    if position ≠ size-1 then
        await localSense = nodeSense
    else
        if parentNode ≠ null
            parentNode.myWait(localSense)
        count := 0
        nodeSense := ¬nodeSense
    
```
</div>

<div>
<img src="img/figure34.svg"style="float:left;width: 400px;"/> 
</div>

#### Example

- 8 processes, require a tree with 7 nodes
<img src="img/figure3.svg"/>

- `p1`, `p3`, `p5`, `p7` arrive the barrier, increase the count by 1, spin on sense
<img src="img/figure4.svg"/>

- `p2`, `p6` arrive the barrier, move to the parent node, increase the count by 1, spin on sense
<img src="img/figure5.svg"/>

- `p4` arrives the barrier, move to the parent node, move to the root node, increase the count by 1, spin on sense
<img src="img/figure6.svg"/>

- `p8` arrives the barrier, move to the parent node, move to the root node
<img src="img/figure7.svg"/>

- Count in root reaches desired value, reset count and reverse sense
<img src="img/figure8.svg"/>

- `p4`, `p8` go back to the node they from, reset count and reverse sense
<img src="img/figure9.svg"/>

- `p2`, `p4`, `p6`, `p8` go back to the node they from, reset count and reverse sense. Since all sense of leaves are reversed, processes resume their execution
<img src="img/figure10.svg"/>

In [4]:
%%writefile TreeBarrier.java

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;

class BarrierNode{
    AtomicInteger count;
    int size;
    boolean nodeSense;
    BarrierNode parentNode;
    public BarrierNode(int n){ 
        this.count = new AtomicInteger(0); 
        this.size = n;
        this.nodeSense = false;
        this.parentNode = null;
    } 
    public BarrierNode(int n, BarrierNode parent){ 
        this.count = new AtomicInteger(0); 
        this.size = n;
        this.nodeSense = false;
        this.parentNode = parent;
    } 
    public void myWait(boolean localSense) {
        int position = this.count.getAndIncrement(); 
        boolean reversed = !this.nodeSense;
        if (position != this.size-1) { 
            while (localSense != nodeSense){
                synchronized(this){try{wait();}catch(Exception e) {}}
            } 
        } else {
            if (this.parentNode != null){
                this.parentNode.myWait(localSense);
            }
            count.set(0); 
            this.nodeSense = reversed;
            synchronized(this){notifyAll();}
        }
        
    }
}
class Worker extends Thread {
    BarrierNode b;
    boolean localSense; 
    int id;
    public Worker(BarrierNode b, int i) {
        this.b = b;
        this.id = i;
        this.localSense = true;
    }
    public void run() {
        for (int i = 0; i < 2; i++) {
            System.out.println("thread id: " + id + ", number of phase: " + i + ", localSense is: " + localSense + ", nodeSense is: " + b.nodeSense);
            b.myWait(localSense);
            this.localSense = !this.localSense;
            
        }
    }
}
public class TreeBarrier {
    static int N = 2;
    static int M = 8;
    
    public static void main(String args[]) {
        BarrierNode root = new BarrierNode(N);
        BarrierNode node1 = new BarrierNode(N, root);
        BarrierNode node2 = new BarrierNode(N, root);
        BarrierNode node1_1 = new BarrierNode(N, node1);
        BarrierNode node1_2 = new BarrierNode(N, node1);
        BarrierNode node2_1 = new BarrierNode(N, node2);
        BarrierNode node2_2 = new BarrierNode(N, node2);
        BarrierNode b[]={node1_1, node1_2, node2_1, node2_2};
        Thread w[] = new Thread[M];
        
        for (int i = 0; i < M; i++) {
            w[i] = new Worker(b[i%4], i);
            w[i].start();
        }
        for (int i = 0; i < M; i++) {
            // wait for all threads to terminate
            try {w[i].join();
            } catch (Exception e) {}
        }
    }
}

Overwriting TreeBarrier.java


In [5]:
!javac TreeBarrier.java

In [6]:
!java TreeBarrier

thread id: 4, number of phase: 0, localSense is: true, nodeSense is: false
thread id: 3, number of phase: 0, localSense is: true, nodeSense is: false
thread id: 2, number of phase: 0, localSense is: true, nodeSense is: false
thread id: 6, number of phase: 0, localSense is: true, nodeSense is: false
thread id: 1, number of phase: 0, localSense is: true, nodeSense is: false
thread id: 5, number of phase: 0, localSense is: true, nodeSense is: false
thread id: 7, number of phase: 0, localSense is: true, nodeSense is: false
thread id: 0, number of phase: 0, localSense is: true, nodeSense is: false
thread id: 0, number of phase: 1, localSense is: false, nodeSense is: true
thread id: 6, number of phase: 1, localSense is: false, nodeSense is: true
thread id: 3, number of phase: 1, localSense is: false, nodeSense is: true
thread id: 7, number of phase: 1, localSense is: false, nodeSense is: true
thread id: 4, number of phase: 1, localSense is: false, nodeSense is: true
thread id: 5, number of p

### Tournament Barrier

Another tree structure barrier
- For at most `2ᵈ⁺¹` threads
- Divide each node to two parts: active and passive
- Each part has a boolean sense and a child

#### How it works

1. Similar to the combining tree with `r = 2` (binary tree).
    * Two processes will be assigned to each leaf.


2. Processes are divided into two groups.
    * A process arrives the passive side will reverse the sense of active side and spin on passive sense
    * A process arrives the active side will spin on the active sense and then move to the parent or signal the barrier has been completed.
    * The resuming processes will reverse sense field of every nodes they visited

```algorithm
var nodeSenseA: boolean = false
var nodeSenseP: boolean = false
var parentNode
```

<div style="float:left;border-left:2em solid white">

```algorithm
procedure myWait(localSense: boolean)
    if current process is Active then
        await localSense = nodeSenseA
        if in root then
            nodeSenseP := ¬nodeSenseP
        else 
            decide if process is active or passive in parent
            parentNode.myWait(localSense)
            nodeSenseP := ¬nodeSenseP
    else
        nodeSenseA := ¬nodeSenseA
        await localSense = nodeSenseP
        nodeSenseP := ¬nodeSenseP
    
```
</div>

<div>
<img src="img/figure35.svg"style="float:left;width: 400px;"/> 
</div>

#### Example

- Suppose processes are assigned to active or passive according to the index
<img src="img/figure13.svg"/>

- When the last process reach the barrier
<img src="img/figure14.svg"/>

In [7]:
%%writefile TournaBarrier.java

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;

class BarrierNode{
    boolean nodeSenseA = false;
    boolean nodeSenseP =false;
    BarrierNode parentNode;
    public BarrierNode(){ 
        this.parentNode = null;
    } 
    public BarrierNode(BarrierNode parent){ 
        this.parentNode = parent;
    } 
    public void myWait(boolean ap, boolean localSense, int i, int depth) {
        if (ap) { //Active node
            while (localSense != this.nodeSenseA){
                synchronized(this){try{wait();}catch(Exception e) {}}
            }
            if (this.parentNode == null){
                this.nodeSenseP = !this.nodeSenseP;
                synchronized(this){notifyAll();}
            } else {
                boolean reversed = !this.nodeSenseP;
                boolean apTmp = true;
                int depthTmp = depth + 1;
                if (i % (1 << depthTmp) != 0){apTmp = false;}  // i % (1 << depthTmp) != 0 decide if i could be divided by 2^(depth-1), if it could not it will be assign to passive node (ative if it could be divided)
                this.parentNode.myWait(apTmp, localSense, i, depthTmp);
                this.nodeSenseP = reversed;
                synchronized(this){notifyAll();}
            }
        } else { // Passive node
            boolean reversed = !this.nodeSenseP;
            this.nodeSenseA = localSense;
            synchronized(this){notifyAll();}
            while (localSense != this.nodeSenseP){
                synchronized(this){try{wait();}catch(Exception e) {}}
            }
            this.nodeSenseP = reversed;
            synchronized(this){notifyAll();}
        }
    }
}
class Worker extends Thread {
    BarrierNode b;
    boolean localSense; 
    int id;
    public Worker(BarrierNode b, int i) {
        this.b = b;
        this.id = i;
        this.localSense = true;
    }
    public void run() {
        boolean ap = true;
        if (id % 2 != 0){
            ap = false;
        } 
        for (int i = 0; i < 2; i++) {
            System.out.println("thread id: " + id + ", number of phase: " + i + ", localSense is: " + localSense);
            b.myWait(ap, localSense, id, 1);
            this.localSense = !this.localSense;
        }
    }
}
public class TournaBarrier {
    static int N = 2;
    static int M = 8;
    
    public static void main(String args[]) {
        BarrierNode root = new BarrierNode();
        BarrierNode node1 = new BarrierNode(root);
        BarrierNode node2 = new BarrierNode(root);
        BarrierNode node1_1 = new BarrierNode(node1);
        BarrierNode node1_2 = new BarrierNode(node1);
        BarrierNode node2_1 = new BarrierNode(node2);
        BarrierNode node2_2 = new BarrierNode(node2);
        BarrierNode b[]={node1_1, node1_2, node2_1, node2_2};
        Thread w[] = new Thread[M];
        
        for (int i = 0; i < M/2; i++) {
            w[i*2] = new Worker(b[i], i*2);
            w[i*2+1] = new Worker(b[i], i*2+1);
            w[i*2].start();
            w[i*2+1].start();
        }
        for (int i = 0; i < M; i++) {
            // wait for all threads to terminate
            try {w[i].join();
            } catch (Exception e) {}
        }
    }
}

Overwriting TournaBarrier.java


In [8]:
!javac TournaBarrier.java

In [9]:
!java TournaBarrier

thread id: 5, number of phase: 0, localSense is: true
thread id: 7, number of phase: 0, localSense is: true
thread id: 6, number of phase: 0, localSense is: true
thread id: 2, number of phase: 0, localSense is: true
thread id: 1, number of phase: 0, localSense is: true
thread id: 3, number of phase: 0, localSense is: true
thread id: 4, number of phase: 0, localSense is: true
thread id: 0, number of phase: 0, localSense is: true
thread id: 0, number of phase: 1, localSense is: false
thread id: 1, number of phase: 1, localSense is: false
thread id: 5, number of phase: 1, localSense is: false
thread id: 7, number of phase: 1, localSense is: false
thread id: 3, number of phase: 1, localSense is: false
thread id: 4, number of phase: 1, localSense is: false
thread id: 2, number of phase: 1, localSense is: false
thread id: 6, number of phase: 1, localSense is: false


## More types of barrier





### Static tree barrier
Another tree structure barrier
- Exactly one process per node.
- Notification from bottom to top.
- A process to the node will wait until all the child nodes have finished, then it notify the parent node and spin on the global sense field
- Root will reversed globalSense when it received the notification.

```algorithm
var count: integer = N
var nodeSense: boolean = false
var parentNode
var rootNode
```

<div style="float:left;border-left:2em solid white">

```algorithm
procedure myWait(localSense: boolean)
    await count = 0
    reset count
    if in non-root then
        ⟨parentNode.count := parentNode.count - 1⟩
        await localSense = root.nodeSense
    else
        nodeSense := ¬nodeSense
   
```
</div>

<div>
<img src="img/figure11.svg"/>
</div>

In [10]:
%%writefile StTreeBarrier.java

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;

class BarrierNode{
    AtomicInteger count;
    int size;
    boolean nodeSense;
    BarrierNode parentNode;
    BarrierNode rootNode;
    BarrierNode childNode[];
    public BarrierNode(int n){ 
        this.count = new AtomicInteger(n); 
        this.size = n;
        this.nodeSense = false;
        this.parentNode = null;
        this.rootNode = null;
    } 
    public BarrierNode(int n, BarrierNode parent, BarrierNode root){ 
        this.count = new AtomicInteger(n); 
        this.size = n;
        this.nodeSense = false;
        this.parentNode = parent;
        this.rootNode = root;
    } 
    public void addChild(BarrierNode[] child){
        this.childNode = child;
    }
    public void countDown(){
        int position = this.count.decrementAndGet();
        synchronized(this){notifyAll();}
    }
    public void myNotifyAll(){
        synchronized(this){notifyAll();}
        for (int i = 0; i < this.size; i++) {
            childNode[i].myNotifyAll();
        }
    }
    public void myWait(boolean localSense) {
        int position = this.count.get();
        while (position != 0){
            synchronized(this){try{wait();}catch(Exception e) {}}
            position = this.count.get();
        }
        count.set(size);
        if (this.parentNode != null){
            this.parentNode.countDown();
            while (localSense != this.rootNode.nodeSense){
                synchronized(this){try{wait();}catch(Exception e) {}}
            }
        } else {
            this.nodeSense = !this.nodeSense;
            try{TimeUnit.SECONDS.sleep(1);}catch(Exception e) {}
            this.myNotifyAll();
        }
         
    }
}
class Worker extends Thread {
    BarrierNode b;
    boolean localSense; 
    int id;
    public Worker(BarrierNode b, int i) {
        this.b = b;
        this.id = i;
        this.localSense = true;
    }
    public void run() {
        for (int i = 0; i < 3; i++) {
            if (b.rootNode == null){
                System.out.println("thread id: " + id + ", number of phase: " + i + ", localSense is: " + localSense + ", globalSense is: " + b.nodeSense);
            } else {
                System.out.println("thread id: " + id + ", number of phase: " + i + ", localSense is: " + localSense + ", globalSense is: " + b.rootNode.nodeSense);
            }
            b.myWait(localSense);
            this.localSense = !this.localSense;
        }
    }
}
public class StTreeBarrier {
    static int N = 2;
    static int M = 5;
    
    public static void main(String args[]) {
        BarrierNode root = new BarrierNode(2);
        BarrierNode node0 = new BarrierNode(2, root, root);
        BarrierNode node1 = new BarrierNode(0, root, root);
        BarrierNode node2 = new BarrierNode(0, node0, root);
        BarrierNode node3 = new BarrierNode(0, node0, root);
        BarrierNode child0[] = {node0, node1};
        BarrierNode child1[] = {node2, node3};
        root.addChild(child0);
        node0.addChild(child1);
        BarrierNode b[] = {root, node0, node1, node2, node3};
        
        Thread w[] = new Thread[M]; 
        for (int i = 0; i < M; i++) {
            w[i] = new Worker(b[i], i);
            w[i].start();
        }
        for (int i = 0; i < M; i++) {
            // wait for all threads to terminate
            try {w[i].join();
            } catch (Exception e) {}
        }
    }
}

Overwriting StTreeBarrier.java


In [11]:
!javac StTreeBarrier.java

In [12]:
!java StTreeBarrier

thread id: 0, number of phase: 0, localSense is: true, globalSense is: false
thread id: 3, number of phase: 0, localSense is: true, globalSense is: false
thread id: 2, number of phase: 0, localSense is: true, globalSense is: false
thread id: 4, number of phase: 0, localSense is: true, globalSense is: false
thread id: 1, number of phase: 0, localSense is: true, globalSense is: false
thread id: 0, number of phase: 1, localSense is: false, globalSense is: true
thread id: 3, number of phase: 1, localSense is: false, globalSense is: true
thread id: 1, number of phase: 1, localSense is: false, globalSense is: true
thread id: 4, number of phase: 1, localSense is: false, globalSense is: true
thread id: 2, number of phase: 1, localSense is: false, globalSense is: true
thread id: 0, number of phase: 2, localSense is: true, globalSense is: false
thread id: 1, number of phase: 2, localSense is: true, globalSense is: false
thread id: 3, number of phase: 2, localSense is: true, globalSense is: false

### Dissemination barrier
A round-based barrier
- Process need to complete `log₂N` rounds of notification to resume.
- A process that has reached one round notifies one other process.
- A process spinning on one round will be notified by another process. 



```algorithm
var n: integer = N
var round: integer = log₂N
```


<div style="float:left;border-left:2em solid white">

```algorithm
procedure myWait(id: integer)
    for i := 0 to round do
        notify process (id+2ⁱ⁻¹)) mod n
        await receive from process (id-2ⁱ⁻¹) mod n
   
```
</div>

<div>
<img src="img/figure12.svg" style="float:left;border-left:10em solid white">
</div>

In [13]:
%%writefile DisBarrier.java

import java.util.concurrent.*;
import java.lang.*;

class Barrier{
    Semaphore sem[][];
    int round;
    int n;
    public Barrier(int n, int r){ 
        this.sem = new Semaphore[r][n];
        for (int j = 0; j < r; j++) {
            for (int i = 0; i < n; i++) {
                sem[j][i] = new Semaphore(0);
            } 
        }
        this.round = r;
        this.n = n;
    } 
    public void myWait(int id) {
        for (int i = 0; i < round; i++) {
            sem[i][(id+(1<<i))%n].release();
            int tmp = (id-(1<<i))%n;
            if (tmp < 0){
                tmp = tmp + n;
            }
            try {sem[i][tmp].acquire();} catch(Exception e){}
            
        }
    }
}
class Worker extends Thread {
    Barrier b; 
    int id;
    public Worker(Barrier b, int i) {
        this.b = b;
        this.id = i;
    }
    public void run() {
        for (int i = 0; i < 3; i++) {
            System.out.println("thread id: " + id + ", number of phase: " + i);
            b.myWait(id);
        }
    }
}
public class DisBarrier {
    static int N = 6;
    static int R = 3;
    public static void main(String args[]) {
        Barrier b = new Barrier(N, R);

        Thread w[] = new Thread[N];
        for (int i = 0; i < N; i++) {
            w[i] = new Worker(b, i);
            w[i].start();
        }
        for (int i = 0; i < N; i++) {
            // wait for all threads to terminate
            try {w[i].join();
            } catch (Exception e) {}
        }
    }
}

Overwriting DisBarrier.java


In [14]:
!javac DisBarrier.java

In [15]:
!java DisBarrier

thread id: 4, number of phase: 0
thread id: 2, number of phase: 0
thread id: 0, number of phase: 0
thread id: 3, number of phase: 0
thread id: 1, number of phase: 0
thread id: 5, number of phase: 0
thread id: 4, number of phase: 1
thread id: 0, number of phase: 1
thread id: 3, number of phase: 1
thread id: 1, number of phase: 1
thread id: 5, number of phase: 1
thread id: 2, number of phase: 1
thread id: 5, number of phase: 2
thread id: 0, number of phase: 2
thread id: 4, number of phase: 2
thread id: 1, number of phase: 2
thread id: 3, number of phase: 2
thread id: 2, number of phase: 2


# Evaluation

- Based on time cost for 1000 phases
- Average time of 10 runs will be considered
- Only process time is measured (No loading or initialization is not included)
</br>
<center> <h6>Average time</h6> </center>

|Thread number|Sense-reversing|Combining Tree|Tournament|Dissemination|Semaphore-based|
|---|---|---|---|---|---|
| 2 threads | 13.4 ms | 14.2 ms | 15.8 ms | 9.7 ms  | 15.4 ms |
| 4 threads | 30.0 ms | 23 ms   | 43.0 ms | 19.2 ms | 46.6 ms |
| 8 threads | 81.0 ms | 51.4 ms | 70.4 ms | 51.8 ms | 92.6 ms |
| 16 threads| 185.8 ms| 80.8 ms | 142.0 ms| 115.0 ms| 204.6 ms|
| 32 threads| 281.4 ms| 177.6 ms| 271.0 ms| 255.6 ms| 491.6 ms|


##### Device used
|||
|:-|:-|
|Model name:|Intel(R) Xeon(R) CPU E5-2687W v4 @ 3.00GHz|
| CPU(s): | 4 |
| Thread(s) per core:| 1|
| Core(s) per socket:| 1|
| Socket(s):| 4|


##### Sense-reversing Barrier
- Strength:
    1. Easy to implement (very simple structure)
- Drawback:
    1. Worse performance when threads number is large
    2. Single sense field creates the bottleneck
    3. Thread safety need to be considered for the count variable
    
##### Combining Tree Barrier
- Strength:
    1. Divide threads into multiple groups spinning on different smaller barriers (lower contention)
    2. Best performance when threads number is large
- Drawback:
    1. More difficult to implement (complex structure)
    2. Thread safety need to be considered for the count variable
    
##### Tournament Barrier
- Strength:
    1. Divide threads into multiple groups spinning on different smaller barriers (lower contention)
    2. Don't need to track the number of threads arrived (no read-modify-write operations)
- Drawback:
    1. More difficult to implement (complex structure)
    2. Need to decide if a thread is active or passive before it arrives to a barrier node, therefore low performance
    
##### Dissemination Barrier
- Strength:
    1. Easy to implement (simple structure)
    2. Don't need to track the number of threads arrived (no read-modify-write operations)
    3. High performance when thread number is low (Only 1 round is needed when 2 threads)
- Drawback:
    1. Relative lower performance when thread number is large (extra rounds needed for each phase)


## Distributed Transaction

### Basic transaction
A serializable collection of data operations that executed by a single thread with following properties:
- Atomicity 
    - Transaction is either completed or not start
    - Commit or Abort
- Consistency
    - No violation of integrity constraints
    - From one consistent state to another consistent state
    - Hidden inconsistent state within the transaction
- Isolation
    - Result of concurrent transactions should be same to the result of serial
    - Modifications of a concurrent transaction are not visible to others before finished
- Durability
    - The result of a transaction should be persistent

**Distributed system is very common in our life, e.g, P2P network. Suppose you need to use P2P network for downloading some shared resource, the downloading task will be distributed to different devices, thus a single transaction will be inefficiency for this situations. What could we do if we want to process the resources spread over several databases?**

### Distributed Transaction

A transaction runs in the distributed environment. (Involve visiting several different nodes in environment)
- Contain several sub-transactions for each nodes in distributed environment.
- Atomic Commitment Protocol are used to ensure the consistency among all sub-transaction.
    - Make sure all sub-transactions reach the same decision (Commit or Abort)
    - Guarantee the decision make by all are not reversible
- Concurrency control method are used to ensure the serializability
    - Read lock (shared lock), mutual exclusive with write lock
    - Write lock ( exclusive lock), mutual exclusive with both read lock and write lock

### Two Phase Commit
- A basic type of Atomic Commitment Protocol
- Contains two phases:
    1. Voting phase: Main transaction asks for the votes (commit/abort) from all other sub-transactions.
    2. Decision phase: Main transaction decide whether to commit (all commit) or abort (one or more abort).

<img src="img/figure31.svg"/>

### Two Phase Locking

- A basic concurrency control method used during the transaction to obtain and release `Read` or `Write` locks on data.
- Divides the acquire and release operations of locks into two separated phases:
    - Growing Phase: New locks could be acquired but no locks could be released
    - Shrinking Phase: New locks could not be acquired but locks could be released

<img src="img/figure32.svg"/>

##### Two Phase Locking Drawback

Transaction will wait if a lock could not be acquired, thus deadlock may occur when there is a mutual blocking
- For example, transaction 1 acquires lock 1 and waiting for lock 2 while transaction 2 acquires lock 2 and waiting for lock 1
- Transaction will abort when time out

## Self-Stabilization

Suppose in a shared-memory system where a temporary malfunction is acceptable, we have several transactions running concurrently which are required to process in a certain pattern, e.g. only one transaction is allowed to execute while the others need to wait. However, the transactions might start with an incorrect initial configuration, since they could be added randomly. 

**How could the system always ends up with exactly one transaction executing at a time?**

A Self-Stabilization algorithm make sure the program always ends up in a correct configuration (even start with an unreachable incorrect configuration)
- Provide fault-tolerance solution for incorrect configuration
- Guaranteed to converge to a valid state and afterwards remain in the valid set of states

### Dijkstra’s token ring for mutual exclusion

For a directed ring of `N` processes `p₀,...,pₙ₋₁`
- Each process hold a single-write register `xᵢ` with value less than `K` `(K ≥ N)`
- Each process can read the value of its predecessor
    1. Process `p₀` is privileged if `x₀ = xₙ₋₁`
    2. Process `pᵢ` is privileged if `xᵢ ≠ xᵢ₋₁`
- Each process can change its own value to loss its privilege
    1. If `p₀` is privileged, perform `x₀ := (x₀ +1) mod K1`
    2. If `pᵢ` is privileged, perform `xᵢ := xᵢ₋₁`
- There can be multiple privileged processes initially
- We want only one privileged processes for mutual exclusion

<img src="img/figure15.svg" width="400" height="400"/>

```algorithm
Input: a directed ring of processes with random initial configuration
Output: a directed ring of processes with only one privileged process at a time
```

```algorithm
var id: integer
var x: integer  // random initial configuration
var prevProcess: process
var round: integer  // finite rounds of execution
```


<div style="float:left;border-left:2em solid white">

```algorithm
procedure process(id: integer)  // Any precondition is acceptable
    for i := 0 to round do
        prev := prevProcess.x
        if id = 0 then
            if x = prev then
                x := (x + 1) mod N
        else
            if x ≠ prev then
                x := prev    
    {postCond: (x₀ = xₙ₋₁) ⊕ (∃ i ∈ 1 .. N – 1 • xᵢ ≠ xᵢ₋₁)}
```
</div>

##### Example 
- Let `K = N = 5`, with the following initial configuration
<img src="img/figure16.svg" width="400" height="400"/>

- Process 2 finds it is privileged, it changes `x2 = x1 = 0`
- Process 4 finds it is privileged, it changes `x4 = x3 = 2`
<img src="img/figure17.svg" width="400" height="400"/>

- Process 3 finds it is privileged, it changes `x3 = x2 = 0`
- Only process 4 is privilege
<img src="img/figure18.svg" width="400" height="400"/>

In [2]:
from random import sample, randint
from threading import Thread, Semaphore

class Ring:
    def __init__(self, N, K):
        self.n, self.k = N, K
        self.xList = [randint(0, K-1) for i in range(N)] 
        self.sem = Semaphore(1)

    def getX(self, i):
        return self.xList[i]
    
    def setX(self, i, value):
        self.sem.acquire()
        self.xList[i] = value
        result = " privileged" if self.xList[0] == self.xList[-1] else " not       "
        for i in range(1, self.n):
            if self.xList[i] != self.xList[i-1]:
                result = result + " privileged"
            else:
                result = result + " not       "
        print(result)
        self.sem.release()
    
class Process(Thread):
    def __init__(self, i, ring):
        self.i, self.ring = i, ring
        super().__init__()
    def run(self):
        for i in range(1000000):   # Since the initial configuration is random, a large number of rounds are used to make sure program terminates with a correct configuration
            x = self.ring.getX(self.i)
            prev = self.ring.getX(self.i-1)
            if self.i == 0:
                if x == prev:
                    self.ring.setX(self.i, (x+1)%self.ring.k)
            else:
                if x != prev:
                    self.ring.setX(self.i, prev)

                    
N, K = 5, 5
ring = Ring(N, K)
ps = [Process(i, ring) for i in range(N)]
for p in ps: p.start()
for p in ps: p.join()

 not        not        not        not        privileged
 privileged not        not        not        not       
 not        privileged not        not        not       
 not        not        privileged not        not       
 not        not        not        privileged not       
 not        not        not        not        privileged
 privileged not        not        not        not       
 not        privileged not        not        not       
 not        not        privileged not        not       
 not        not        not        privileged not       
 not        not        not        not        privileged
 privileged not        not        not        not       
 not        privileged not        not        not       
 not        not        privileged not        not       
 not        not        not        privileged not       
 not        not        not        not        privileged
 privileged not        not        not        not       


### Arora-Gouda spanning tree algorithm

Suppose you have a network with several devices connected by undirected cables
 - Want to have the one with the largest storage as the main device 
 - Other device need to transmit data to main device eventually. 
 - New devices could be added and any existing device could be removed.

**How could you make sure the network will always recover from an incorrect configuration back to the proper one?**

<img src="img/figure37.svg" width="400" height="400"/>

For undirected networks with size `N`
- Allow process with the largest ID eventually becomes the root of a spanning tree
- Each process keeps track of its parent in the spanning tree (null for root)
- Each process maintain the root and distance to root as variable
- A process can correct its local variables if it find any inconsistency based its parent process
- A process declares itself as root when there is any inconsistency in its local variables




```algorithm
Input: a undirected network with any initial configuration (cycles are allowed)
Expected output: the spanning tree of the whole network, the node with largest index should be the root 
```

```algorithm
var id: integer
var root: integer
var parent: Node
var dist: integer
var neighs: [Node]
```

<div style="float:left;border-left:2em solid white">

```algorithm
procedure selfCheck(N: integer)
    var cond1 := root < i
    var cond2 := (parent = None) and (root ≠ id or dist ≠ 0)
    var cond3 := (parent ≠ None) and (parent ∉ neighs)
    var cond4 :=  dist ≥ N
    if cond1 or cond2 or cond3 or cond4 then
        delclare to be root
    {PC: (root ≥ i) ∧ (parent = None ⇒ (root = id ∧ dist = 0)) ∧ (parent ∈ neighs) ∧ (dist < N)}

    
procedure checkNeigh(N: integer)
    for neighbour in neighs do
        if neighbour.root > root then
            root := neighbour.root
            dist := neighbour.dist+1
            parent := neighbour
        else if self.parent = n:
            root := neighbour.root
            dist := neighbour.dist+1
    {PC: (root = parent.root) ∧ (dist = parent.dist + 1)}
```
</div>
    
<img src="img/figure36.svg" width="400" height="400"/>

#### Example
- Let `N = 5`, with the following initial configuration:
<img src="img/figure19.svg" width="400" height="400"/>

- Process 3 changes its distance according to its parent variable, and find inconsistency (should have `dist < N`)
<img src="img/figure20.svg" width="400" height="400"/>

- Process 3 announces it is the root, set its parent to null
- Starting with process 4, each process set their local variables based on their parents
<img src="img/figure21.svg" width="400" height="400"/>

- Process 4 notice the inconsistency (root smaller than its value)
<img src="img/figure22.svg" width="400" height="400"/>

- Process 4 announces it is the root, set its parent to `null`
- Start with process 0, each process set their local variables based on their parents
<img src="img/figure23.svg" width="400" height="400"/>

In [18]:
from threading import Thread, Semaphore
import time

class Node:
    def __init__(self, i, root, dist):
        self.i, self.root, self.dist = i, root, dist
        self.parent = None
        self.neighs = []
        self.sem = Semaphore(1)
    def setParent(self, n):
        self.parent = n
    def addNeigh(self, neighs):
        self.neighs.append(neighs)
    def beRoot(self):
        self.root = self.i
        self.parent = None
        self.dist = 0
    def selfCheck(self, N):
        c0 = self.root < self.i
        c1 = (self.parent == None) & ((self.root != self.i) | (self.dist != 0))
        c2 = (self.parent != None) & (self.parent not in self.neighs)
        c3 = self.dist >= N
        if c0 | c1 | c2 | c3 :
            self.beRoot()

    def checkNeigh(self):
        for n in self.neighs:
            if n.root > self.root:
                self.root = n.root
                self.dist = n.dist+1
                self.parent = n
            elif self.parent == n:
                self.root = n.root
                self.dist = n.dist+1

    
class Process(Thread):
    def __init__(self, i, n, node):
        self.i, self.n, self.node = i, n, node
        super().__init__()
    def run(self):
        for i in range(10):
            time.sleep(0.1)
            self.node.checkNeigh()
            time.sleep(0.1)
            self.node.selfCheck(self.n)
            
                   
N = 5
ns = []
ns.append(Node(0, 5, 4))
ns.append(Node(1, 5, 1))
ns.append(Node(2, 5, 2))
ns.append(Node(3, 5, 2))
ns.append(Node(4, 5, 3))
ns[0].addNeigh(ns[1])
ns[0].addNeigh(ns[3])
ns[0].addNeigh(ns[4])
ns[1].addNeigh(ns[0])
ns[1].addNeigh(ns[2])
ns[2].addNeigh(ns[1])
ns[3].addNeigh(ns[0])
ns[3].addNeigh(ns[4])
ns[4].addNeigh(ns[0])
ns[4].addNeigh(ns[3])

ns[0].setParent(ns[4])
ns[1].setParent(ns[0])
ns[2].setParent(ns[1])
ns[3].setParent(ns[0])
ns[4].setParent(ns[3])

ps = [Process(i, N, ns[i]) for i in range(N)]
for p in ps: p.start()
for p in ps: p.join()
for n in ns:
    if n.parent != None:
        print("Process",n.i,", parent",n.parent.i,", root", n.root,", distance",n.dist)
    else:
        print("Process",n.i,", parent n , root",n.root,", distance",n.dist)

Process 0 , parent 4 , root 4 , distance 1
Process 1 , parent 0 , root 4 , distance 2
Process 2 , parent 1 , root 4 , distance 3
Process 3 , parent 0 , root 4 , distance 2
Process 4 , parent n , root 4 , distance 0


### Afek-Kutten-Yung spanning tree algorithm

Similar to Arora-Gouda algorithm, for undirected network
- ***Does not require a known upper bound for network size***
- Process has `root`, `parent`, `dist` variables
- Process declares itself as root when there is any inconsistency in its local variables ***or compared with its parent***
- ***Process sends join request to its neighbor when it has lower root value***


#### Example
- With the following initial configuration:
<img src="img/figure24.svg" width="400" height="400"/>

- Process 3 find inconsistency with its parent, it declares itself as root

<img src="img/figure25.svg" width="400" height="400"/>

- Process 3 find `root₄` is larger than its root, send a join request to process 4

<img src="img/figure26.svg" width="400" height="400"/>

- Process 4 could not forward request to its parent, it declare itself as root
- Send acknowledgment back to process 3

<img src="img/figure27.svg" width="400" height="400"/>

- Process 0 find inconsistency with its parent, it declares itself as root
- Process 0 find `root₄` is larger than its root, send a join request to process 4

<img src="img/figure28.svg" width="400" height="400"/>

- Process 4 send acknowledgment back to process 0

<img src="img/figure29.svg" width="400" height="400"/>

- Process 1 and 2 repeat "find inconsistency" and "send join request"

<img src="img/figure30.svg" width="400" height="400"/>

In [21]:
from threading import Thread, Semaphore
import time

class Node:
    def __init__(self, i, root, dist):
        self.i, self.root, self.dist = i, root, dist
        self.parent = None
        self.neighs = []
        self.avail = True
    def setParent(self, n):
        self.parent = n
    def addNeigh(self, neighs):
        self.neighs.append(neighs)
    def beRoot(self):
        self.root = self.i
        self.parent = None
        self.dist = 0
    def joinRequest(self, root, length):
        if (self.i != root) & (self.parent != None):
            if self.parent.avail == True:
                self.avail = False
                length = length + 1
                tmp = self.parent.joinRequest(root, length)
                root, lengthTmp = tmp
                self.root = root
                self.dist = lengthTmp - length
                self.avail = True
                return root, lengthTmp       
        elif (self.i == root) & (self.parent == None):
            return self.i, length + 1 
        
        self.beRoot()
        return self.i, length + 1
        
    def selfCheck(self):
        c0 = self.root < self.i
        c1 = (self.parent == None) & ((self.root != self.i) | (self.dist != 0))
        c2 = (self.parent != None) & (self.parent not in self.neighs)
        if self.parent != None:
            c3 = (self.root != self.parent.root) | (self.dist != self.parent.dist+1)
        else:
            c3 = False
        if c0 | c1 | c2 | c3 :
            self.beRoot()
            
    def checkNeigh(self):
        maxN = -1
        maxNode = None
        for n in self.neighs:
            if maxN <= n.root:
                maxN = n.root
                maxNode = n
        if maxN > self.root:
            tmp = maxNode.joinRequest(self.root, 0)
            root, length = tmp
            self.root = root
            self.dist = length
            self.parent = maxNode

            
    
class Process(Thread):
    def __init__(self, i, node):
        self.i, self.node = i, node
        super().__init__()
    def run(self):
        for i in range(10):
            time.sleep(0.1)
            self.node.selfCheck()
            time.sleep(0.1)
            self.node.checkNeigh()
            
            
                   
N = 5
ns = []
ns.append(Node(0, 5, 4))
ns.append(Node(1, 5, 1))
ns.append(Node(2, 5, 2))
ns.append(Node(3, 5, 2))
ns.append(Node(4, 5, 3))
ns[0].addNeigh(ns[1])
ns[0].addNeigh(ns[3])
ns[0].addNeigh(ns[4])
ns[1].addNeigh(ns[0])
ns[1].addNeigh(ns[2])
ns[2].addNeigh(ns[1])
ns[3].addNeigh(ns[0])
ns[3].addNeigh(ns[4])
ns[4].addNeigh(ns[0])
ns[4].addNeigh(ns[3])

ns[0].setParent(ns[4])
ns[1].setParent(ns[0])
ns[2].setParent(ns[1])
ns[3].setParent(ns[0])
ns[4].setParent(ns[3])

ps = [Process(i, ns[i]) for i in range(N)]
for p in ps: p.start()
for p in ps: p.join()
for n in ns:
    if n.parent != None:
        print("Process",n.i,", parent",n.parent.i,", root", n.root,", distance",n.dist)
    else:
        print("Process",n.i,", parent n , root",n.root,", distance",n.dist)

Process 0 , parent 4 , root 4 , distance 1
Process 1 , parent 0 , root 4 , distance 2
Process 2 , parent 1 , root 4 , distance 3
Process 3 , parent 4 , root 4 , distance 1
Process 4 , parent n , root 4 , distance 0


## Reference
[1] <a href="http://cs.ipm.ac.ir/asoc2016/Resources/Theartofmulticore.pdf">Herlihy, Maurice., &amp; Shavit, Nir. The art of multiprocessor programming. Elsevier, 2009.</a>

[2] <a href="https://doc.lagout.org/science/0_Computer%20Science/2_Algorithms/Distributed%20Algorithms_%20An%20Intuitive%20Approach%20%5BFokkink%202013-12-06%5D.pdf">Fokkink, Wan. Distributed algorithms: An intuitive approach. The MIT Press, 2013.</a>

[3] Gray, Jim., &amp; Reuter, Andreas. Transaction processing: Concepts and techniques. Kaufmann, 2011.

[4] <a href="https://www.microsoft.com/en-us/research/people/philbe/book/">Bernstein, Philip. A., Goodman, Nathan., &amp; Hadzilacos, Vassos. Concurrency control and recovery in database systems. Addison-Wesley, 1995.</a>

## Question

#### In Dijkstra’s token ring for mutual exclusion, each process holds a single-write register with value less than K where K ≥ N (N is the number of nodes), please show with an example with 8 nodes that it will lead to an incorrect final configuration if K is smaller than N.