# Google Colab Rust Setup

The following cell is used to set up and spin up a Jupyter Notebook environment with a Rust kernel using Nix and IPC Proxy. 

In [None]:
!wget -qO- https://gist.github.com/wiseaidev/2af6bef753d48565d11bcd478728c979/archive/3f6df40db09f3517ade41997b541b81f0976c12e.tar.gz | tar xvz --strip-components=1
!bash setup_evcxr_kernel.sh

## Managing Concurrency

### Creating and Managing Threads in Rust

In [2]:
use std::thread;

let handle = thread::spawn(|| {
    // Code for a new thread goes here.
});

handle.join().unwrap();

### Sharing Data Between Threads

In [3]:
use std::thread;

let data = vec![1, 2, 3, 4, 5];

let handle = thread::spawn(move || {
    // Code for the new thread goes here.
    println!("{:?}", data);
});

handle.join().unwrap();

[1, 2, 3, 4, 5]


### Synchronization Primitives

#### Basic Usage of Mutex

In [4]:
use std::sync::Mutex;

let data = Mutex::new(42);

{
    let mut guard = data.lock().unwrap();
    *guard += 1;
} // Guard goes out of scope, unlocking the Mutex

println!("Data after modification: {:?}", *data.lock().unwrap());

Data after modification: 43


#### Sharing Data Across Threads

In [5]:
use std::sync::{Mutex, Arc};
use std::thread;

let data = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..5 {
    let data = Arc::clone(&data);

    let handle = thread::spawn(move || {
        let mut guard = data.lock().unwrap();
        *guard += 1;
    });

    handles.push(handle);
}

for handle in handles {
    handle.join().unwrap();
}

println!("Final Counter Value: {:?}", *data.lock().unwrap());

Final Counter Value: 5


#### Avoiding Mutex Deadlocks

In [6]:
use std::sync::{Mutex, Arc};
use std::thread;

let data1 = Arc::new(Mutex::new(0));
let data2 = Arc::new(Mutex::new(0));

let data1_final = Arc::new(Mutex::new(0));
let data2_final = Arc::new(Mutex::new(0));

let handles = vec![
    thread::spawn({
        let data1_clone = Arc::clone(&data1);
        let data2_clone = Arc::clone(&data2);
        move || {
            let mut guard1 = data1_clone.lock().unwrap();
            *guard1 += 1;
            let mut guard2 = data2_clone.lock().unwrap();
            *guard2 += 1;
            (1, 0)
        }
    }),
    thread::spawn({
        let data1_clone = Arc::clone(&data1);
        move || {
            let mut guard1 = data1_clone.lock().unwrap();
            *guard1 += 1;
            (0, 1)
        }
    }),
];

let results: Vec<(i32, i32)> = handles
    .into_iter()
    .map(|handle| handle.join().unwrap())
    .collect();

let final_value1: i32 = results.iter().map(|&(val1, _)| val1).sum();
let final_value2: i32 = results.iter().map(|&(_, val2)| val2).sum();

{
    let mut guard1_final = data1_final.lock().unwrap();
    let mut guard2_final = data2_final.lock().unwrap();
    *guard1_final = *data1.lock().unwrap() + final_value1;
    *guard2_final = *data2.lock().unwrap() + final_value2;
}

println!(
    "Final values: data1 = {}, data2 = {}",
    *data1_final.lock().unwrap(),
    *data2_final.lock().unwrap()
);

Final values: data1 = 3, data2 = 2


#### Ownership of Mutex Guards

In [9]:
use std::sync::Mutex;

let data = Mutex::new(vec![1, 2, 3]);

{
    let mut guard = data.lock().unwrap();
    guard.push(4);
} // Guard goes out of scope, unlocking the Mutex

// Attempting to modify the data outside the guard's scope
// let mut guard = data.lock().unwrap(); // This line would fail to compile

println!("Data: {:?}", *data.lock().unwrap());

Data: [1, 2, 3, 4]


In [10]:
use std::sync::Mutex;

let data = Mutex::new(vec![1, 2, 3]);

{
    let mut guard = data.lock().unwrap();
    guard.push(4);
} // Guard goes out of scope, unlocking the Mutex

let mut guard = data.lock().unwrap(); // This line would fail to compile

println!("Data: {:?}", *data.lock().unwrap());

Error: The variable `guard` contains a reference with a non-static lifetime so
can't be persisted. You can prevent this error by making sure that the
variable goes out of scope - i.e. wrapping the code in {}.

#### Handling Poisoned Mutexes

In [14]:
use std::sync::{Mutex, Arc};
use std::thread;

let data = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..5 {
    let data = Arc::clone(&data);

    let handle = thread::spawn(move || {
        let mut guard = data.lock().unwrap();
        if *guard == 3 {
            println!("Thread reached 3, stopping.");
            return; // Don't panic, just exit the thread.
        }
        *guard += 1;
    });

    handles.push(handle);
}

for handle in handles {
    handle.join().unwrap();
}

let result = data.lock();
match result {
    Ok(guard) => println!("Final Counter Value: {:?}", *guard),
    Err(poisoned) => {
        let guard = poisoned.into_inner();
        println!("Mutex is poisoned. Recovered Counter Value: {:?}", *guard);
    }
}

// TODO: report this bug, it executes as expected if wrapped with the main as shown below

Error: unused variable: `data`

In [16]:
use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let data = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..5 {
        let data = Arc::clone(&data);

        let handle = thread::spawn(move || {
            let mut guard = data.lock().unwrap();
            if *guard == 3 {
                println!("Thread reached 3, stopping.");
                return; // Don't panic, just exit the thread.
            }
            *guard += 1;
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    let result = data.lock();
    match result {
        Ok(guard) => println!("Final Counter Value: {:?}", *guard),
        Err(poisoned) => {
            let guard = poisoned.into_inner();
            println!("Mutex is poisoned. Recovered Counter Value: {:?}", *guard);
        }
    }
}

main()

Thread reached 3, stopping.
Thread reached 3, stopping.
Final Counter Value: 3


()

### Advanced Usage of Mutex

#### Conditional Locking

In [19]:
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

fn main() {
    let data = Arc::new((Mutex::new(Vec::new()), Condvar::new()));

    let mut handles = vec![];

    for i in 0..5 {
        let data_clone = Arc::clone(&data);

        let handle = thread::spawn(move || {
            let (lock, cvar) = &*data_clone;

            let mut data = lock.lock().unwrap();

            data.push(i);

            cvar.notify_one();
        });

        handles.push(handle);
    }

    let (lock, cvar) = &*data;
    let mut data = lock.lock().unwrap();

    while data.len() < 5 {
        data = cvar.wait(data).unwrap();
    }

    println!("Data: {:?}", *data);
}

main()

Data: [1, 0, 2, 3, 4]


()

#### Implementing a Mutex-Protected Queue

In [20]:
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let queue = Arc::new(Mutex::new(Vec::new()));

    let mut handles = vec![];

    for i in 0..5 {
        let queue_clone = Arc::clone(&queue);

        let handle = thread::spawn(move || {
            let mut queue = queue_clone.lock().unwrap();

            queue.push(i);
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    let final_queue = queue.lock().unwrap();

    println!("Final Queue: {:?}", *final_queue);
}

main()

Final Queue: [0, 1, 2, 3, 4]


()

#### Implementing a Mutex-Protected Priority Queue

In [21]:
use std::collections::BinaryHeap;
use std::sync::{Arc, Mutex};
use std::thread;

struct PriorityQueue<T> {
    inner: Mutex<BinaryHeap<T>>,
}

impl<T: Ord> PriorityQueue<T> {
    fn new() -> Self {
        PriorityQueue {
            inner: Mutex::new(BinaryHeap::new()),
        }
    }

    fn push(&self, item: T) {
        let mut heap = self.inner.lock().unwrap();
        heap.push(item);
    }

    fn pop(&self) -> Option<T> {
        let mut heap = self.inner.lock().unwrap();
        heap.pop()
    }
}

fn main() {
    let priority_queue = Arc::new(PriorityQueue::new());
    let mut push_handles = vec![];
    let mut pop_handles = vec![];

    for i in (1..6).rev() {
        let priority_queue_clone = Arc::clone(&priority_queue);
        let handle = thread::spawn(move || {
            priority_queue_clone.push(i);
        });
        push_handles.push(handle);
    }

    for handle in push_handles {
        handle.join().unwrap();
    }

    for _ in 0..5 {
        let priority_queue_clone = Arc::clone(&priority_queue);
        let handle = thread::spawn(move || {
            let popped = priority_queue_clone.pop();
            match popped {
                Some(val) => println!("Popped: {}", val),
                None => println!("Queue is empty!"),
            }
        });
        pop_handles.push(handle);
    }

    for handle in pop_handles {
        handle.join().unwrap();
    }
}

main()

Popped: 5
Popped: 4
Popped: 3
Popped: 2
Popped: 1


()

#### Handling Deadlocks with Mutex

In [22]:
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let lock = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..5 {
        let lock_clone = Arc::clone(&lock);
        let handle = thread::spawn(move || {
            let mut data = lock_clone.try_lock();
            match data {
                Ok(ref mut value) => {
                    **value += 1;
                }
                Err(_) => {
                    println!("Failed to acquire lock, continuing...");
                }
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    let final_data = lock.lock().unwrap();
    println!("Final Data: {}", *final_data);
}

main()

Final Data: 5


()

#### Mutex in Multi-Threaded Producer-Consumer Scenario

In [23]:
use std::sync::{Arc, Mutex};
use std::thread;

const BUFFER_SIZE: usize = 5;

fn main() {
    let buffer = Arc::new(Mutex::new(Vec::new()));
    let mut handles = vec![];

    // Producer threads
    for i in 0..3 {
        let buffer_clone = Arc::clone(&buffer);
        let handle = thread::spawn(move || {
            for j in 0..BUFFER_SIZE {
                let mut buffer = buffer_clone.lock().unwrap();
                buffer.push(i * BUFFER_SIZE + j);
            }
        });
        handles.push(handle);
    }

    // Consumer threads
    for _ in 0..2 {
        let buffer_clone = Arc::clone(&buffer);
        let handle = thread::spawn(move || {
            for _ in 0..BUFFER_SIZE {
                let mut buffer = buffer_clone.lock().unwrap();
                if let Some(item) = buffer.pop() {
                    println!("Consumed: {}", item);
                }
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

main()

Consumed: 14
Consumed: 13
Consumed: 12
Consumed: 11
Consumed: 10
Consumed: 9
Consumed: 8
Consumed: 7
Consumed: 6
Consumed: 5


()

### Exploring RwLock

#### Managing Shared Data with RwLock

In [24]:
use std::thread;
use std::sync::{RwLock, Arc};

fn main() {
    let data = Arc::new(RwLock::new(0));

    let mut handles = vec![];

    for _ in 0..5 {
        let data = Arc::clone(&data);

        let handle = thread::spawn(move || {
            let data_guard = data.read().unwrap();
            println!("Read: {}", *data_guard);

            // Drop the read lock here to allow other threads to read concurrently
            drop(data_guard);
            thread::sleep(std::time::Duration::from_millis(100));

            let mut data_write = data.write().unwrap();
            *data_write += 1;
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {:?}", *data.read().unwrap());
}

main()

Read: 0
Read: 0
Read: 0
Read: 0
Read: 0
Result: 5


()

#### Advanced Usages of RwLock

##### Dynamic Number of Readers

In [25]:
use std::sync::{RwLock, Arc};
use std::thread;

fn main() {
    let data = Arc::new(RwLock::new(0));

    let mut handles = vec![];

    for i in 0..10 {
        let data = Arc::clone(&data);

        let handle = thread::spawn(move || {
            let data_guard = data.read().unwrap();
            println!("Reader {}: {}", i, *data_guard);
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

main()

Reader 0: 0
Reader 1: 0
Reader 2: 0
Reader 4: 0
Reader 3: 0
Reader 5: 0
Reader 6: 0
Reader 7: 0
Reader 8: 0
Reader 9: 0


()

##### Timed Locking

In [31]:
use std::sync::{RwLock, Arc};
use std::thread;

fn main() {
    let data1 = Arc::new(RwLock::new(0));
    let data2 = Arc::new(RwLock::new(0));

    let mut handles = vec![];

    for _ in 0..5 {
        let data1 = Arc::clone(&data1);
        let data2 = Arc::clone(&data2);

        let handle = thread::spawn(move || {
            let mut data_guard1 = data1.write().unwrap();
            let mut data_guard2 = data2.write().unwrap();

            // Perform operations on both data1 and data2
            *data_guard1 += 1;
            *data_guard2 += 1;
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

##### Deadlock Avoidance

In [32]:
use std::sync::{RwLock, Arc};
use std::thread;

fn main() {
    let data1 = Arc::new(RwLock::new(0));
    let data2 = Arc::new(RwLock::new(0));

    let mut handles = vec![];

    for _ in 0..5 {
        let data1 = Arc::clone(&data1);
        let data2 = Arc::clone(&data2);

        let handle = thread::spawn(move || {
            let mut data_guard1 = data1.write().unwrap();
            let mut data_guard2 = data2.write().unwrap();

            // Perform operations on both data1 and data2
            *data_guard1 += 1;
            *data_guard2 += 1;
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

main()

()

##### Implementing Resource Pooling

In [34]:
use std::sync::{RwLock, Arc};
use std::thread;

struct Resource {
    id: u32,
}

fn main() {
    const NUM_RESOURCES: u32 = 3;
    let resources: Vec<Arc<RwLock<Resource>>> = (0..NUM_RESOURCES)
        .map(|id| Arc::new(RwLock::new(Resource { id })))
        .collect();

    let mut handles = vec![];

    for i in 0..10 {
        let resources_clone: Vec<Arc<RwLock<Resource>>> = resources.clone();

        let handle = thread::spawn(move || {
            let idx = (i % NUM_RESOURCES) as usize;
            let resource = &resources_clone[idx];
            let data_guard = resource.read().unwrap();
            println!("Thread {} accessed Resource {}", i, data_guard.id);
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

main()

Thread 0 accessed Resource 0
Thread 1 accessed Resource 1
Thread 2 accessed Resource 2
Thread 3 accessed Resource 0
Thread 4 accessed Resource 1
Thread 5 accessed Resource 2
Thread 6 accessed Resource 0
Thread 8 accessed Resource 2
Thread 7 accessed Resource 1
Thread 9 accessed Resource 0


()

### Techniques for Thread Communication and Message Passing

#### Using Channels for Communication

In [35]:
use std::thread;
use std::sync::mpsc;

fn main() {
    let (sender, receiver) = mpsc::channel();

    let sender_clone = sender.clone();
    let handle = thread::spawn(move || {
        let message = "Hello from the sender thread".to_string();
        sender_clone.send(message).unwrap();
    });

    let received_data = receiver.recv().unwrap();
    println!("Received: {}", received_data);

    handle.join().unwrap();
}

main()

Received: Hello from the sender thread


()

#### Message Passing with Structs

In [36]:
use std::thread;
use std::sync::mpsc;

struct CustomMessage {
    id: u32,
    content: String,
}

fn main() {
    let (sender, receiver) = mpsc::channel();

    let sender_clone = sender.clone();
    let handle = thread::spawn(move || {
        let message = CustomMessage {
            id: 42,
            content: "Important data".to_string(),
        };
        sender_clone.send(message).unwrap();
    });

    let received_message = receiver.recv().unwrap();
    println!("Received Message - ID: {}, Content: {}", received_message.id, received_message.content);

    handle.join().unwrap();
}

main()

Received Message - ID: 42, Content: Important data


()

#### Atomic Operations

In [39]:
use std::sync::atomic::{AtomicUsize, Ordering};

fn main() {
    let counter = Arc::new(AtomicUsize::new(0));

    let mut handles = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);

        let handle = thread::spawn(move || {
            counter.fetch_add(1, Ordering::Relaxed);
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    let final_value = counter.load(Ordering::Relaxed);
    println!("Final Counter Value: {}", final_value);
}

main()

Final Counter Value: 5


()

### Advanced Thread Communication

#### Thread Coordination with Barriers

In [40]:
use std::sync::{Arc, Barrier};
use std::thread;

fn main() {
    let barrier = Arc::new(Barrier::new(3));
    let mut handles = vec![];

    for id in 0..3 {
        let barrier = Arc::clone(&barrier);
        let handle = thread::spawn(move || {
            println!("Thread {} started", id);
            thread::sleep(std::time::Duration::from_secs(2));
            println!("Thread {} completed its work", id);
            barrier.wait(); // <7>
            println!("Thread {} reached the barrier", id);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

main()

Thread 0 started
Thread 1 started
Thread 2 started
Thread 0 completed its work
Thread 2 completed its work
Thread 1 completed its work
Thread 1 reached the barrier
Thread 2 reached the barrier
Thread 0 reached the barrier


()

#### Thread Local Storage (TLS)

In [43]:
use std::thread;
use std::thread_local;

thread_local! {
    static THREAD_LOCAL_DATA: std::cell::RefCell<u32> = std::cell::RefCell::new(42);
}

fn main() {
    let mut handles = vec![];
    for _ in 0..5 {
        let handle = thread::spawn(|| {
            THREAD_LOCAL_DATA.with(|data| {
                let mut value = data.borrow_mut();
                *value += 1;
                println!("Thread-local value: {}", *value);
            });
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    THREAD_LOCAL_DATA.with(|data| {
        let value = data.borrow();
        println!("Main thread's thread-local value: {}", *value);
    });
}

main()

// TODO: file a bug for `thread_local` macro not working

Error: cannot find value `THREAD_LOCAL_DATA` in this scope

Error: cannot find value `THREAD_LOCAL_DATA` in this scope

#### Crossbeam Library

In [44]:
:dep crossbeam = {version = "0.8"}

In [47]:
use crossbeam::thread;

thread::scope(|s| {
    for i in 0..5 {
        s.spawn(move |_| {
            println!("Scoped thread {}", i);
        });
    }
}).unwrap();

Scoped thread 0
Scoped thread 1
Scoped thread 2
Scoped thread 3
Scoped thread 4


### Asynchronous Programming

In [48]:
:dep async-std = {version = "1.12.0"}

In [50]:
use async_std::task;

fn main() {
    task::block_on(async {
        let task1 = task::spawn(async {
            println!("Task 1: Starting");
            // Simulate an asynchronous task with a delay.
            async_std::task::sleep(std::time::Duration::from_secs(2)).await;
            println!("Task 1: Finished");
        });

        let task2 = task::spawn(async {
            println!("Task 2: Starting");
            // Simulate another asynchronous task with a delay.
            async_std::task::sleep(std::time::Duration::from_secs(1)).await;
            println!("Task 2: Finished");
        });

        task1.await;
        task2.await;

        println!("All tasks have completed");
    });
}

main()

Task 1: Starting
Task 2: Starting
Task 2: Finished
Task 1: Finished
All tasks have completed


()

---
---