# Google Colab Rust Setup

The following cell is used to set up and spin up a Jupyter Notebook environment with a Rust kernel using Nix and IPC Proxy. 

In [None]:
!wget -qO- https://gist.github.com/wiseaidev/2af6bef753d48565d11bcd478728c979/archive/3f6df40db09f3517ade41997b541b81f0976c12e.tar.gz | tar xvz --strip-components=1
!bash setup_evcxr_kernel.sh

# Asynchronous Programming

## Utilizing the tokio Library

In [2]:
:dep tokio = { version = "1.35.0", features = ["full"] }

In [5]:
:dep reqwest = { version = "0.11.22" }

## Harnessing the Power of Async for Responsiveness

In [6]:
use tokio::time::Duration;


async fn fetch_data() -> Result<String, reqwest::Error> {
    // Simulate an asynchronous HTTP request
    tokio::time::sleep(Duration::from_secs(2)).await;
    Ok::<String, _>("Data fetched successfully".to_owned())
}

async fn process_data(data: String) {
    // Asynchronously process the fetched data
    println!("Processing: {}", data);
}

async fn main_async_workflow() {
    // Spawn multiple asynchronous tasks to perform operations concurrently
    let fetch_task = tokio::spawn(fetch_data());
    let process_task = tokio::spawn(process_data("Sample Data".to_owned()));

    // Await the completion of the tasks
    let fetch_result = fetch_task.await.expect("Failed to fetch data");
    let _ = process_task.await; // <10>

    // Continue with the results
    println!("Fetch Result: {:?}", fetch_result); // <11>
}

#[tokio::main]
async fn main() {
    // Run the main asynchronous workflow
    main_async_workflow().await;
}

main()

Processing: Sample Data
Fetch Result: Ok("Data fetched successfully")


()

### Error Handling in Asynchronous Code

### Example 1: Database Interaction using FFI

In [7]:
use std::error::Error;

async fn async_with_error_handling() -> Result<(), Box<dyn Error + Send>> {
    let handle = tokio::task::spawn(async {
        // Asynchronous logic with potential errors
        Ok::<(), Box<dyn Error + Send>>(())
    });

    // Await task completion and handle errors
    let _ = handle.await.map_err(|error| {
        Box::new(error) as Box<dyn Error + Send>
    })?;
    Ok(())
}

#[tokio::main]
async fn main() {
    match async_with_error_handling().await {
        Ok(_) => println!("Async operation completed successfully"),
        Err(error) => eprintln!("Error during async operation: {}", error),
    }
}

main()

Async operation completed successfully


()

### Concurrent Task Lifetimes and Resource Management

In [8]:
use tokio::sync::Mutex;
use std::sync::Arc;

async fn async_with_shared_data() {
    let shared_data = Arc::new(Mutex::new(0));

    let task1 = tokio::spawn(async_with_shared_data_task(Arc::clone(&shared_data)));
    let task2 = tokio::spawn(async_with_shared_data_task(Arc::clone(&shared_data)));

    println!("Task 1 and Task 2 spawned.");

    task1.await.expect("Task 1 failed");

    println!("Task 1 completed successfully.");

    task2.await.expect("Task 2 failed");

    println!("Task 2 completed successfully.");

    println!("Continuing with the results.");
}

async fn async_with_shared_data_task(shared_data: Arc<Mutex<i32>>) {
    let mut data = shared_data.lock().await;

    println!("Task acquiring lock on shared data.");

    *data += 1;
    println!("Task modifying shared data: {}", data);
}

#[tokio::main]
async fn main() {
    async_with_shared_data().await;
    println!("Main function completed.");
}

main()

Task acquiring lock on shared data.
Task modifying shared data: 1
Task acquiring lock on shared data.
Task modifying shared data: 2
Task 1 and Task 2 spawned.
Task 1 completed successfully.
Task 2 completed successfully.
Continuing with the results.
Main function completed.


()

## Advanced Patterns in Asynchronous Programming

### Asynchronous Streams

In [9]:
:dep tokio-stream = { version = "0.1.14" }

In [12]:
:dep rand = { version = "0.8.5" }

In [13]:
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::Duration;
use tokio_stream::{Stream, StreamExt};


struct MyInterval {
    interval: tokio::time::Interval,
}

impl MyInterval {
    fn new(duration: Duration) -> Self {
        Self {
            interval: tokio::time::interval(duration),
        }
    }
}

impl Stream for MyInterval {
    type Item = tokio::time::Instant;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match Pin::new(&mut self.interval).poll_tick(cx) {
            Poll::Ready(instant) => Poll::Ready(Some(instant)),
            Poll::Pending => Poll::Pending,
        }
    }
}

#[tokio::main]
async fn main() {
    // <11>
    let mut random_number_stream =
        MyInterval::new(Duration::from_secs(1)).map(|_| rand::random::<u32>());

    for _ in 0..5 {
        // <13>
        if let Some(random_number) = random_number_stream.next().await {
            println!("Random Number: {}", random_number);
        }
    }
}

main()

Random Number: 1375459144
Random Number: 2810972174
Random Number: 3420131127
Random Number: 3810768654
Random Number: 2841032359


()

### Fan-Out and Fan-In with Async Streams

In [21]:
:dep tokio-stream = { version = "0.1.14" }

In [22]:
use tokio_stream::{self as stream, Stream, StreamExt};
use tokio::time::{sleep, Duration};

async fn async_task(id: usize) -> usize {
    sleep(Duration::from_secs(id as u64)).await;
    id * 2
}

fn fan_out_tasks() -> impl Stream<Item = usize> {
    stream::iter(0..5).then(async_task)
}


async fn fan_in_results() {
    let result_stream = fan_out_tasks();

    tokio::pin!(result_stream);

    while let Some(result) = result_stream.next().await {
        println!("Received result: {:?}", result);
    }
}

#[tokio::main]
async fn main() {
    fan_in_results().await;
}

main()

// TODO: file an issue regarding `as` keyword?

Error: failed to resolve: use of undeclared crate or module `stream`

### Cancelation and Timeout Handling

In [23]:
use tokio::time::{sleep, timeout, Duration};

async fn async_task() {
    sleep(Duration::from_secs(5)).await;
    println!("Task completed");
}

async fn cancelable_task() {
    timeout(Duration::from_secs(2), async_task())
        .await
        .unwrap_or_else(|err| {
            eprintln!("Error: {}", err);
        });
}

#[tokio::main]
async fn main() {
    cancelable_task().await;
}

main()

Error: deadline has elapsed


()

### Dynamic Task Management

In [25]:
:dep futures = { version = "0.3.29" }

In [26]:
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::task::spawn;
use tokio::time::{sleep, Duration};

async fn dynamic_task_manager() {
    let task_manager = FuturesUnordered::new();

    for id in 0..5 {
        let task = spawn(async move {
            sleep(Duration::from_secs(id as u64)).await;
            id * 3
        });

        task_manager.push(task);
    }

    tokio::pin!(task_manager);

    while let Some(result) = task_manager.next().await {
        println!("Task result: {:?}", result.unwrap());
    }
}

#[tokio::main]
async fn main() {
    dynamic_task_manager().await;
}

main()

Task result: 0
Task result: 3
Task result: 6
Task result: 9
Task result: 12


()

### Integrating Async Code with Sync Code

In [27]:
use tokio::task;
use tokio::time::{sleep, Duration};

#[derive(Clone)]
struct AsyncResource {
    // Resource fields
}

impl AsyncResource {
    async fn cleanup(&self) {
        // Asynchronous cleanup logic
        println!("Cleaning up resources asynchronously...");
        // Simulate cleanup work
        sleep(Duration::from_secs(1)).await;
        println!("Cleanup completed");
    }
}

async fn async_and_sync_integration() {
    let handle = task::spawn(async {

        println!("Asynchronous code: Start");

        sleep(Duration::from_secs(2)).await;

        println!("Asynchronous code: End");

        tokio::task::block_in_place(|| {

            println!("Synchronous code: Start");

            for i in 1..=3 {
                println!("Synchronous iteration: {}", i);
            }

            println!("Synchronous code: End");
        });

        println!("Continuing with asynchronous logic");
    });

    handle.await.expect("Task failed");
}

#[tokio::main]
async fn main() {
    async_and_sync_integration().await;

    println!("Main function completed");
}

main()

Asynchronous code: Start
Asynchronous code: End
Synchronous code: Start
Synchronous iteration: 1
Synchronous iteration: 2
Synchronous iteration: 3
Synchronous code: End
Continuing with asynchronous logic
Main function completed


()

---
---