# Threads and Concurrency in Rust
One of the main goals of Rust is fearless concurrency.
<center><img src='figures/rust-logo.png'></center>
<center><img src='figures/mandel-brot.png'></center>

<center><img src='figures/process-single.png'></center>

<center><img src='figures/process-threads.png'></center>

<center><img src='figures/process-threads-pc.png'></center>

# Threads
---

In [None]:
use std::thread;

* system is taking care of scheduling

## std :: thread :: spawn
---
Rust allows one to create a new operating system thread by calling thread::spawn() from std::thread


In [None]:
use std::thread;
use std::time::Duration;
thread::spawn(|| {
    for i in 0..2 {
        println!("hi number {} from the spawned thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
});
for i in 0..2 {
    println!("hi number {} from the main thread!", i);
    thread::sleep(Duration::from_millis(1));
}

* returns a **JoinHandle**
 * may outlive the parent (unless parent is the main thread)
* signature of spawn consists of two **constraints** (Send,'static)

[docs](https://doc.rust-lang.org/std/thread/fn.spawn.html )

## JoinHandle constraints
---

#### 'static

* lifetime until the end of the program
  *  threads can detach and outlive the lifetime they have been created in

#### Send

* the closure is passed by value
  *  from the thread where it is spawned to the new thread
  * return value will need to be passed from the new thread to the thread where it is joined
* **Send** trait expresses that it is safe to pass value from thread to thread.


[docs](https://doc.rust-lang.org/std/thread/fn.spawn.html )

## std :: thread :: JoinHandle
---

* can not be cloned!
* **joinable**, waits for the thread to finish
* **detaches** the thread when it is dropped
  * no longer any handle to thread and no way to join on it

[docs](https://doc.rust-lang.org/std/thread/struct.JoinHandle.html)

### std :: thread :: JoinHandle.join()
---

```rust
pub fn join(self) -> Result<T>
```

* if the child panics, **Err** is returned

## thread join
---

In [None]:
use std::thread;

let computation = thread::spawn(|| {
    42
});

let result = computation.join().unwrap();
println!("{}", result);

## thread move
---

In [None]:
use std::thread;
//...
let mut magic_num: i32 = 10;
let handle = thread::spawn(move || {
    magic_num += 1;
    println!("thread: {}", magic_num);
    magic_num
});
println!("join: {}", handle.join().unwrap());
println!("main: {}", magic_num);

* move – keyword to force a closure to take ownership of the values it uses

## thread sleep
---

In [None]:
use std::thread;
use std::time::Duration;
//...
thread::sleep(Duration::from_millis(1));

* avoid usage
---

# thread :: Builder
---

In [None]:
use std::thread;

let builder = thread::Builder::new()
    .name("foo".into())
    .stack_size(32 * 1024);

let handler = builder.spawn(|| 42).unwrap();

println!("Msg from thread: {}", handler.join().unwrap());

[docs](https://doc.rust-lang.org/std/thread/struct.Builder.html)

# detached thread example
---

In [None]:
use std::thread;
use std::time::Duration;
//...
let original_thread = thread::spawn(|| {
    let _detached_thread = thread::spawn(|| {
        thread::sleep(Duration::from_millis(10));
        println!("♫ Still alive ♫");
    });
});
original_thread.join();
println!("Original thread is joined.");
thread::sleep(Duration::from_millis(1000));

---
# Concurrency
* Mutual Exclusion 
* Sharing Data 
* Threading Tools

## Mutual Exclusion – Mutex
---

In [None]:
use std::sync::Mutex;
//...
let value = 5;
let mutex = Mutex::new(value);
fn zero_mutex(mtx: &Mutex<u32>) {
    let mut n = mtx.lock().unwrap();       
    *n = 0;
}
zero_mutex(&mutex);
println!("value in the mutex: {}", mutex.lock().unwrap());

* a lock is required to access the variable behind the mutex (dereferencing with \*)
* after leaving the scope the mutex is locked again
* poisoning @panic – all other threads are unable to access the data by default 

[docs](https://doc.rust-lang.org/std/sync/struct.Mutex.html)

## Mutual Exclusion – Threads
---

In [None]:
use std::time::Duration;
use std::thread;
use std::sync::Mutex;
//...
let mutex = Mutex::new(5);
let handle = thread::spawn(move || {
    let mut n = mutex.lock().unwrap();       
    *n = 0;
    println!("thread: locked");
});
mutex.lock();
println!("main: after lock");
handle.join().unwrap();

#### error: borrow of moved value "mutex"

## Sharing Data
---

## Sharing Data – Reference Counting Rc
---

In [None]:
use std::rc::Rc;

let foo = Rc::new(vec![1.0, 2.0, 3.0]);
let a = foo.clone();
let b = Rc::clone(&foo); // equivalent

println!("foo address:\t{:p}\na address:\t{:p}\nb address:\t{:p}", a, b,foo);

* enables multiple ownership 
* smart pointer enables sharing data in single-threaded cases
* counting the created references on the shared data
  * keeps track of the number of references to a value which determines whether or not a value is still in use
  * at zero references to a value, the value can be cleaned up without any references becoming invalid
* **only for use in single-threaded scenarios**

[docs](https://doc.rust-lang.org/book/ch15-04-rc.html#:~:text=To%20enable%20multiple%20ownership%2C%20Rust,value%20is%20still%20in%20use.)

## Sharing Data – Atomic Reference Counting Arc
---
'Arc' stands for 'Atomically Reference Counted'

In [None]:
use std::sync::Arc;

* thread-safe through atomic operations for its reference counting
* provides shared ownership of a value of type **T**, which must impl. **Send** & **Sync**
* more expensive than ordinary memory accesses

[docs](https://doc.rust-lang.org/std/sync/struct.Arc.html)

### Arc Example Cloning References Example
---

In [None]:
use std::sync::Arc;
//...
let foo = Arc::new(vec![1.0, 2.0, 3.0]);

let a = foo.clone();      // points to parent memory location
let b = Arc::clone(&foo); // equivalent
println!("foo address:\t{:p}\na address:\t{:p}\nb address:\t{:p}", a, b,foo);

### Sharing immutable data between threads
---

In [None]:
use std::sync::Arc;
use std::thread;
//...
let value = Arc::new(5);
for _ in 0..10 {
    let value = Arc::clone(&value);
    thread::spawn(move || {
        println!("{:?}", value);
    });
}

### Sharing mutable data between threads
---

In [None]:
use std::sync::Arc;
use std::thread;
use std::sync::Mutex;
//...
let value = Arc::new(Mutex::new(5));
for _ in 0..10 {
    let value = Arc::clone(&value);
    thread::spawn(move || {
        let mut num = value.lock().unwrap();
        *num +=1;
        println!("{:?}", num);
    });
}

## Sharing Data – Channels
---

In [None]:
use std::sync::mpsc::channel;

* message passing, communicate by sending each other messages
* consisting of a transmitter and receiver
* Multi-producer, single-consumer
* FIFO queue
* channels in two flavors
  * asynchronous – none blocking
  * synchronous – blocking 
    * rendezvous channel if buffer size set to 0
    
[docs](https://doc.rust-lang.org/book/ch16-02-message-passing.html)

### Channels – sync(producer, consumer)
---
```rust
use std::thread;
use std::sync::mpsc::sync_channel;
//...
let (tx, rx) = sync_channel(0); // rendezvous channel
thread::spawn(move|| {
    tx.send(10).unwrap();
});// receive something?
println!("receiving: {}", rx.recv().unwrap());
```

### Channels – async(n-producer, consumer)
---
```rust
use std::thread;
use std::sync::mpsc::channel;
```

```rust
let (tx, rx) = channel();
for i in 0..10 {
    let tx = tx.clone();
    thread::spawn(move || {
        tx.send(i).unwrap();
    });
}
for _ in 0..10 { // in order?
    println!("receiving: {}", rx.recv().unwrap());
}
```

---
# Threading Tools
---

## Tools – Crossbeam 
---
Toolset for concurrent programming.

In [None]:
extern crate crossbeam;

* [source](https://github.com/crossbeam-rs/crossbeam), [crates](https://crates.io/crates/crossbeam), [docs](https://docs.rs/crossbeam/0.8.0/crossbeam/index.html)
* Atomics, Data structures, Memory management, Utilities
* Thread synchronization
  * channel – multi-producer multi-consumer
  * Parker – parking primitive.
  * ShardedLock – sharded reader-writer lock, fast concurrent reads
  * WaitGroup – gather/synchronizing a group

## Crossbeam – ShardedLock
---

In [None]:
use crossbeam::sync::ShardedLock;
//...
let lock = ShardedLock::new(5);
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
} // unlock
{
let mut w = lock.write().unwrap();
*w += 1;
println!("w: {}",w);
w
}

* made of shards, each being a RwLock occupying a single cache line
* read operations will pick one of the shards and lock it
* write operations need to lock all shards 

---
## Tools – Rayon
---

In [None]:
extern crate rayon;

* [source](https://github.com/rayon-rs/rayon), [crates](https://crates.io/crates/rayon), [docs](https://docs.rs/rayon/1.5.0/rayon/)
* data-parallelism library
* easy to use
* race rondition free
* many parallel iterator types
* thread pools – work-load at runtime

## Rayon – parallel iterator
---

In [None]:
(0..5).for_each(|x| println!("{:?}", x));

In [None]:
use rayon::prelude::*;

(0..5).into_par_iter().for_each(|x| println!("{:?}", x));

[docs](https://docs.rs/rayon/1.5.0/rayon/iter/trait.ParallelIterator.html)