In [1]:
import * as tslab from "tslab";
import { requireCytoscape, requireCarbon } from "./lib/draw";

requireCarbon();
requireCytoscape();

# Concurrency

## Where Were We?

1. Language primitives (i.e., building blocks of languages)
2. **Language paradigms** (i.e., combinations of language primitives)
    - Last time: DSLs with regex and SQL
    - This time: **concurrent** programming in TypeScript
3. Building a language (i.e., designing your own language)

## Outline

- Message passing model of concurrency
- Shared memory model of concurrency

## Why Concurrency?

At some point, a resource needs to be shared.
1. You're cooking a complex dish but you only have 1 burner ...
2. You have roommates and you need to share a bathroom ...
3. You're in an apartment complex and need to share 1 washer and 2 dryers ...

### Computer Examples

At some point, a resource needs to be shared on a computational device.
1. Operating system needs to share finite compute and memory resources among programs (browser, games, discord, zoom).
2. Web browser needs to share finite compute and memory resources among tasks (interpret javascript, render page, network)
3. Phone needs to share finite compute and resources among apps (messaging, web connection, phone connection)

### Concurrency vs. Parallelism

1. **Concurrency** is, broadly speaking, the theory of sharing resources.
2. This is not the same as **parallelism**, which concerns how to execute computations at the same time.
3. In particular, you can run computations in parallel when you do not have to share resources, which is why concurrency and parallelism co-occur.

<img src="media/concur_vs_parallel.png" alt="drawing" width="500"/>

## Models of Concurrency

1. Message Passing
2. Shared memory

## Message Passing

Each **node** can send and receive messages with other **nodes**.

### Examples

1. Each smart phone is a **node** that can send and receive **text messages** from another smart phone.
2. Each computer with Discord is a **node** that can send and recieve **messages** with another compute with Discord.

### Message Passing Node/TypeScript

Event emitter: [https://nodejs.org/api/events.html](https://nodejs.org/api/events.html)

In [2]:
import EventEmitter from 'events'; // nodejs class

In [3]:
class MyEmitter extends EventEmitter {} // create a class
const myEmitter = new MyEmitter();      // instantiate an event emitter

In [4]:
// Question: what concepts from this class does this remind you of?
let count = 0;
myEmitter.on('message-name', (x: any) => {
    count += 1;
    console.log(`Called ${count} times and passed argument ${x}.`);
});

MyEmitter {
  _events: [Object: null prototype] { [32m'message-name'[39m: [36m[Function (anonymous)][39m },
  _eventsCount: [33m1[39m,
  _maxListeners: [90mundefined[39m,
  [[32mSymbol(kCapture)[39m]: [33mfalse[39m
}


In [5]:
myEmitter.emit('message-name', 'a');
myEmitter.emit('message-name', 1);
myEmitter.emit('message-name', [1, 'a']);

Called 1 times and passed argument a.
Called 2 times and passed argument 1.
Called 3 times and passed argument 1,a.
[33mtrue[39m


### The concepts

```ts
let count = 0; // 4. closed over

// 3. myEmitter is a first-class function that accepts a function
myEmitter.on('message-name', (x: any) => {  // 1. anonymous function
    count += 1;  // 2. closure over count
    console.log(`Called ${count} times and passed argument ${x}.`);
});
```

### Events triggering events ...

In [6]:
const myEmitter1 = new MyEmitter();      // instantiate an event emitter
const myEmitter2 = new MyEmitter();      // instantiate an event emitter

let count = 0;

function callback1(emitter: MyEmitter, x: any): void {
    count += 1;
    console.log(`Called ${count} times and passed argument ${x}.`);
    if (count % 2 === 0) {
        console.log("Triggerering a second message from callback1 ...");
        emitter.emit('message-name-2', emitter, x);
    }
}

function callback2(emitter: MyEmitter, x: any): void {
    count += 1;
    console.log(`Called ${count} times and passed argument ${x}.`);
    if (count % 2 === 1) {
        console.log("Triggerering a second message from callback2 ...");
        emitter.emit('message-name-1', emitter, x);
    }
}

myEmitter1.on('message-name-1', callback1);
myEmitter2.on('message-name-2', callback2);
myEmitter2.on('message-name-1', callback2);

MyEmitter {
  _events: [Object: null prototype] {
    [32m'message-name-2'[39m: [36m[Function: callback2][39m,
    [32m'message-name-1'[39m: [36m[Function: callback2][39m
  },
  _eventsCount: [33m2[39m,
  _maxListeners: [90mundefined[39m,
  [[32mSymbol(kCapture)[39m]: [33mfalse[39m
}


In [7]:
myEmitter1.emit('message-name-1', myEmitter2, 'a');

Called 1 times and passed argument a.
[33mtrue[39m


In [8]:
myEmitter2.emit('message-name-2', myEmitter2, 'a');

Called 2 times and passed argument a.
[33mtrue[39m


In [9]:
myEmitter2.emit('message-name-1', myEmitter2, 'a');

Called 3 times and passed argument a.
Triggerering a second message from callback2 ...
Called 4 times and passed argument a.
[33mtrue[39m


### Can you trigger an infinite number of events?
    
1. Yes, the message passing version of an "infinite loop".
2. This is why reasoning in the presence of concurrency is hard.
3. Related to a concept called **deadlock**.

### How do you share resources?

1. Suppose you want to make a hamburger with mushrooms and onions.
2. But you only have 1 pan and 1 plate.
3. We'll use message passing to coordinate accessing the pan and plate.

![](media/making_burger.png) 

In [10]:
type Pan = {
    contents: string[]   // list of ingredients
    temperature: number  // in Fahrenheit
};

In [11]:
const pan = {
    contents: [],
    temperature: 68 
}

let plate = [];

In [12]:
function reset(ingredient: string): void { // Not a pure function!
    pan.contents = [];
    plate = [];
}

In [13]:
function addToPan(ingredient: string): void { // Not a pure function!
    pan.contents.push(ingredient);
}

In [14]:
function removeFromPan(ingredient: string): void {
    // Get stuff from pan, mark that it's COOKED at temperature, and put it on the plate
    plate = plate.concat(pan.contents.map(x => `COOKED ${x} at ${pan.temperature}`));
    // Empty the pan
    pan.contents = [];
}

In [15]:
function heatOrCoolPan(degrees: number): void {
    pan.temperature += degrees;
}

In [16]:
const panEmitter = new MyEmitter();
panEmitter.on('RESET', reset);
panEmitter.on('ADD', addToPan);
panEmitter.on('REMOVE', removeFromPan);
panEmitter.on('HEAT_OR_COOL', heatOrCoolPan);

MyEmitter {
  _events: [Object: null prototype] {
    RESET: [36m[Function: reset][39m,
    ADD: [36m[Function: addToPan][39m,
    REMOVE: [36m[Function: removeFromPan][39m,
    HEAT_OR_COOL: [36m[Function: heatOrCoolPan][39m
  },
  _eventsCount: [33m4[39m,
  _maxListeners: [90mundefined[39m,
  [[32mSymbol(kCapture)[39m]: [33mfalse[39m
}


In [17]:
panEmitter.emit('RESET');
panEmitter.emit('HEAT_OR_COOL', 100);
panEmitter.emit('ADD', 'HAMBURGER');
console.log(pan);
console.log(plate);

{ contents: [ [32m'HAMBURGER'[39m ], temperature: [33m168[39m }
[]


In [18]:
panEmitter.emit('REMOVE');
console.log(pan);
console.log(plate);

{ contents: [], temperature: [33m168[39m }
[ [32m'COOKED HAMBURGER at 168'[39m ]


In [19]:
panEmitter.emit('HEAT_OR_COOL', -50);
panEmitter.emit('ADD', 'ONIONS');
panEmitter.emit('ADD', 'MUSHROOMS');
console.log(pan);
console.log(plate);

{ contents: [ [32m'ONIONS'[39m, [32m'MUSHROOMS'[39m ], temperature: [33m118[39m }
[ [32m'COOKED HAMBURGER at 168'[39m ]


In [20]:
panEmitter.emit('REMOVE');
console.log(pan);
console.log(plate);

{ contents: [], temperature: [33m118[39m }
[
  [32m'COOKED HAMBURGER at 168'[39m,
  [32m'COOKED ONIONS at 118'[39m,
  [32m'COOKED MUSHROOMS at 118'[39m
]


In [21]:
panEmitter.emit('ADD', 'BUN');
console.log(pan);
console.log(plate);

{ contents: [ [32m'BUN'[39m ], temperature: [33m118[39m }
[
  [32m'COOKED HAMBURGER at 168'[39m,
  [32m'COOKED ONIONS at 118'[39m,
  [32m'COOKED MUSHROOMS at 118'[39m
]


In [22]:
panEmitter.emit('REMOVE');
console.log(pan);
console.log(plate);

{ contents: [], temperature: [33m118[39m }
[
  [32m'COOKED HAMBURGER at 168'[39m,
  [32m'COOKED ONIONS at 118'[39m,
  [32m'COOKED MUSHROOMS at 118'[39m,
  [32m'COOKED BUN at 118'[39m
]


### Message passing is used a lot in web programming

1. HTTP protocol: [https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol](https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol)
2. Web servers and clients (web browser) use message passing, i.e., concurrency model, to share resources.

In [23]:
const webServer = new MyEmitter();
webServer.on('GET', (x) => x);
webServer.on('POST', (x) => x);
webServer.on('PUT', (x) => x);

MyEmitter {
  _events: [Object: null prototype] {
    GET: [36m[Function (anonymous)][39m,
    POST: [36m[Function (anonymous)][39m,
    PUT: [36m[Function (anonymous)][39m
  },
  _eventsCount: [33m3[39m,
  _maxListeners: [90mundefined[39m,
  [[32mSymbol(kCapture)[39m]: [33mfalse[39m
}


### Final Project 

1. User input sends a message.
2. Example of message passing handler callback in excerpt from `Reducer.tsx`:
```
const newState = (() => {
    switch (type) {
      ...
      case 'SET_SONGS': {
        const songs = args.get('songs');
        return state.set('songs', songs);
      }
      case 'PLAY_SONG': {
        const notes = state
          .get('songs')
          .find((s: any) => s.get('id') === args.get('id'))
          .get('notes');
        return state.set('notes', notes);
      }
      case 'STOP_SONG': {
        return state.delete('notes');
      }
      ...
    }
  })();
```

## Shared Memory

1. Previously we saw message passing.
2. Now we'll look at an alternative model of concurrency called **shared memory**.
3. The server example with message passing is a good starting point for a mental model of shared memory.

### Aside: Threads vs. Processes

1. A process is has one stack (i.e., control-flow) and one memory space.
2. A thread has it's own stack and shares memory with other threads in the same process.
3. Thus you can use message passing for concurrency with processes.
    * Web server communicating with web browser.
    * Mobile phones communicating with each other.
4. And you can use **shared memory** with threads.
    * The implementation of a web server.

<img src="./media/thread_vs_process.png" alt="drawing" width="500"/>

In [24]:
const { Worker, isMainThread, parentPort } = require('worker_threads')

In [25]:
const {Worker} = require("worker_threads");

//Create new worker
const worker = new Worker("./worker.js");

//Listen for a message from worker
worker.on("message", result => {
  console.log(`${result.num}th Fibonacci Number: ${result.fib}`);
});

worker.on("error", error => {
  console.log(error);
});

worker.postMessage({num: 40});
worker.postMessage({num: 12});

### Shared Memory with Threads

In [26]:
const nums = [40, 35, 30, 25];

// Create shared memory
const size = Int32Array.BYTES_PER_ELEMENT*nums.length;
const sharedBuffer = new SharedArrayBuffer(size);
const sharedArray = new Int32Array(sharedBuffer);

In [27]:
// Wrong!
nums.forEach((x: number, i: number) => {
   sharedArray[i] = x; // not thread-safe
});

sharedArray

Int32Array(4) [ [33m40[39m, [33m35[39m, [33m30[39m, [33m25[39m ]


In [28]:
// Correct!
nums.forEach((num, index) => {
    Atomics.store(sharedArray, index, num);  // thread-safe
})
sharedArray

Int32Array(4) [ [33m40[39m, [33m35[39m, [33m30[39m, [33m25[39m ]


#### 1 worker

In [29]:
import { Worker } from "worker_threads"; // nodejs worker threads
import * as fs from "fs";

In [30]:
const x = fs.readFileSync("./tmp/fib_worker.js");
x.toString();

const {parentPort} = require("worker_threads");

parentPort.on("message", data => {
  const nums = data.nums;
  for (let i = data.start; i < data.stop; i++) {
     const n = nums[i];
     const res = fibonnaci(n);
     Atomics.store(nums, i, res);  // thread-safe
     parentPort.postMessage({num: n, fib: res});
  }
})

function fibonnaci(num) {
    if (num === 0) {
        return 0;
    } else if (num === 1) {
        return 1;
    } else {
        return fibonnaci(num - 1) + fibonnaci(num - 2);
    }
}




In [31]:
// Create new worker
const worker = new Worker("./tmp/fib_worker.js");

// Listen for a message from worker
worker.on("message", result => {
    console.log(`${result.num}th Fibonacci Number: ${result.fib}`);
});

nums.forEach((num, index) => {
    Atomics.store(sharedArray, index, num);  // thread-safe
})

console.log("BEFORE")
worker.postMessage({
    nums: sharedArray,
    start: 0,
    stop: sharedArray.length
});
console.log("AFTER");

BEFORE
AFTER


#### Multiple workers

In [32]:
// Create new worker
const worker1 = new Worker("./tmp/fib_worker.js");
const worker2 = new Worker("./tmp/fib_worker.js");

// Listen for messages from workers
worker1.on("message", result => {
    console.log(`${result.num}th Fibonacci Number: ${result.fib}`);
});
worker2.on("message", result => {
    console.log(`${result.num}th Fibonacci Number: ${result.fib}`);
});

nums.forEach((num, index) => {
    Atomics.store(sharedArray, index, num);  // thread-safe
})

console.log("BEFORE")
worker1.postMessage({
    nums: sharedArray,
    start: 0,
    stop: Math.floor(sharedArray.length / 2)
});
worker2.postMessage({
    nums: sharedArray,
    start: Math.floor(sharedArray.length / 2), // question: what would happen if we made these array indices overlap?
    stop: sharedArray.length
});
console.log("AFTER");

BEFORE
AFTER


#### Parallelized map

In [34]:
// Create new worker
const workers = [];
for (let i = 0; i < sharedArray.length; i++) {
    const worker = new Worker("./tmp/fib_worker.js");
    worker.on("message", result => {
        console.log(`${result.num}th Fibonacci Number: ${result.fib}`);
    });
    workers.push(worker);
}

nums.forEach((num, index) => {
    Atomics.store(sharedArray, index, num);  // thread-safe
})

console.log("BEFORE")
for (let i = 0; i < sharedArray.length; i++) {
    workers[i].postMessage({
        nums: sharedArray,
        start: i,
        stop: i+1 
    });
}
console.log("AFTER");

BEFORE
AFTER
25th Fibonacci Number: 75025
30th Fibonacci Number: 832040
35th Fibonacci Number: 9227465
40th Fibonacci Number: 102334155


#### Challenge: debugging timing issues

1. Run the cells with the 2 workers and the array of workers in quick succession
2. Why is there an error?

#### Challenge: parallelized reduce

1. Try writing parallelized reduce
2. Simplified version of [https://en.wikipedia.org/wiki/MapReduce](https://en.wikipedia.org/wiki/MapReduce)

#### Question: how would you implement message passing with shared memory?

## Summary

We covered a lot today
1. We saw that concurrency was about sharing resources whereas parallelism was about running things at the same time.
2. We saw the message passing model of concurrency.
3. We also saw the shared memory model of concurrency.
4. The message passing and shared memory model of concurrency are "equivalent".