# Async

General note: Code shown here are for educational proposes only. In production, use a ready runtime like 

For IO operations, there is two way, blocking and non blocking. Here is a blocking sleep:

In [5]:
use std::{time::Duration, thread::sleep};

fn sleep_1s_and_return_5() -> i32 {
    sleep(Duration::from_secs(1));
    5
}

sleep_1s_and_return_5()

5

And here is a non blocking sleep. `Poll<T>` is a topologically equal enum to `Option<T>`. We like different types with equal topology in Rust, because it will prevent misusing a `Poll<T>` when an `Option<T>` is needed. A non blocking version is fairly more complex:

In [6]:
use std::{task::Poll, time::Instant};

struct Sleep {
    start_time: Instant,
}

impl Sleep {
    fn try_again(&self) -> Poll<i32> {
        if Instant::now() - self.start_time > Duration::from_secs(1) {
            Poll::Ready(5)
        } else {
            Poll::Pending
        }
    }
}

fn sleep_1s_and_return_5() -> Sleep {
    Sleep { start_time: Instant::now() }
}

let sleep_result = sleep_1s_and_return_5();
loop {
    match sleep_result.try_again() {
        Poll::Ready(x) => break x,
        Poll::Pending => println!("pending"),
    }
    sleep(Duration::from_millis(100)); // to prevent spamming the stdout
}

pending
pending
pending
pending
pending
pending
pending
pending
pending
pending


5

Trait `Future` abstracts the functionality we implemented above. Let's use that instead:

In [7]:
use std::{future::Future, task::{Context, Wake, Waker}, pin::Pin, sync::Arc};

struct Sleep {
    start_time: Instant,
}

impl Future for Sleep {
    type Output = i32;
    fn poll(self: Pin<&mut Sleep>, _: &mut Context) -> Poll<i32> {
        if Instant::now() - self.start_time > Duration::from_secs(1) {
            Poll::Ready(5)
        } else {
            Poll::Pending
        }
    }
}

struct EmptyWaker;
impl Wake for EmptyWaker {
    fn wake(self: Arc<Self>) {
    }
}

fn sleep_1s_and_return_5() -> impl Future<Output = i32> {
    Sleep { start_time: Instant::now() }
}
{
    let future = &mut sleep_1s_and_return_5();
    let mut sleep_result = Pin::new(future);
    let waker = Arc::new(EmptyWaker).into();
    let mut context = Context::from_waker(&waker);
    loop {
        match sleep_result.as_mut().poll(&mut context) {
            Poll::Ready(x) => break x,
            Poll::Pending => println!("pending"),
        }
        sleep(Duration::from_millis(100)); // to prevent spamming the stdout
    }
}

pending
pending
pending
pending
pending
pending
pending
pending
pending
pending


5

Too much ceremony added. Let's break them:
* `Future` is a trait, which represents tasks that can be polled.
* `Context` can provide a way for tasks to notify the runner they need to be polled. We didn't notify the runtime, so our implementation is technically wrong and runner is allowed to not poll our future, but since we provide runner as well, it works.

Now imagine we want to write a non blocking function which calls that function two times and print between calls. Blocking version is here:


In [7]:
fn sleep_1s_and_return_5() -> i32 {
    sleep(Duration::from_secs(1));
    5
}

fn composed_job() -> i32 {
    println!("start");
    let x = sleep_1s_and_return_5();
    println!("middle");
    let y = sleep_1s_and_return_5();
    println!("end");
    x + y
}

composed_job()

start
middle
end


10

Manual non blocking version would be way more complex:

In [8]:
fn sleep_1s_and_return_5() -> Sleep {
    Sleep { start_time: Instant::now() }
}

struct Sleep {
    start_time: Instant,
}

impl Future for Sleep {
    type Output = i32;
    fn poll(self: Pin<&mut Sleep>, _: &mut Context) -> Poll<i32> {
        if Instant::now() - self.start_time > Duration::from_secs(1) {
            Poll::Ready(5)
        } else {
            Poll::Pending
        }
    }
}

enum ComposedJobStateMachine {
    NotStarted,
    Beginning { fut1: Pin<Box<Sleep>> },
    Middle { x: i32, fut2: Pin<Box<Sleep>> },
    End { x: i32, y: i32 },
}

use ComposedJobStateMachine::*;

impl Future for ComposedJobStateMachine {
    type Output = i32;
    fn poll(mut self: Pin<&mut ComposedJobStateMachine>, cx: &mut Context) -> Poll<i32> {
        match &mut *self {
            NotStarted => {
                println!("begining");
                *self = Beginning {
                    fut1: Box::pin(sleep_1s_and_return_5()),
                };
                Poll::Pending
            }
            Beginning { fut1 } => {
                match fut1.as_mut().poll(cx) {
                    Poll::Ready(x) => {
                        println!("middle");
                        *self = Middle {
                            x, fut2: Box::pin(sleep_1s_and_return_5()),
                        };
                        Poll::Pending
                    }
                    Poll::Pending => Poll::Pending,
                }
            }
            Middle { x, fut2 } => {
                match fut2.as_mut().poll(cx) {
                    Poll::Ready(y) => {
                        println!("end");
                        let x = *x;
                        *self = End { x, y };
                        Poll::Ready(x + y)
                    }
                    Poll::Pending => Poll::Pending,
                }
            }
            End { x, y } => Poll::Ready(*x + *y),
        }
    }
}

fn composed_job() -> impl Future<Output = i32> {
    NotStarted
}

{
    let future = &mut composed_job();
    let mut sleep_result = Pin::new(future);
    let waker = Arc::new(EmptyWaker).into();
    let mut context = Context::from_waker(&waker);
    loop {
        match sleep_result.as_mut().poll(&mut context) {
            Poll::Ready(x) => break x,
            Poll::Pending => print!("pending "),
        }
        sleep(Duration::from_millis(100)); // to prevent spamming the stdout
    }
}

begining
pending pending pending pending pending pending pending pending pending pending middle
pending pending pending pending pending pending pending pending pending pending end


10

We created a state machine to keep which line of code we were in the last call. Since creating this stack machine is a very good way to compose multiple non blocking tasks, Rust supports it at the language level. We can write the above code with `async` blocks:

In [9]:
fn composed_job() -> impl Future<Output = i32> {
    async {
        println!("start");
        let x = sleep_1s_and_return_5().await;
        println!("middle");
        let y = sleep_1s_and_return_5().await;
        println!("end");
        x + y
    }
}

{
    let future = composed_job();
    let mut sleep_result = Box::pin(future);
    let waker = Arc::new(EmptyWaker).into();
    let mut context = Context::from_waker(&waker);
    loop {
        match sleep_result.as_mut().poll(&mut context) {
            Poll::Ready(x) => break x,
            Poll::Pending => print!("pending "),
        }
        sleep(Duration::from_millis(100)); // to prevent spamming the stdout
    }
}

start
pending pending pending pending pending pending pending pending pending pending middle
pending pending pending pending pending pending pending pending pending pending end


10

The `async` version looks like blocking, but works like non-blocking. Also Rust version is more performant. It doesn't have heap allocation and instead uses a self referential structure as state machine. It also nests state machines of multiple async blocks inside each other, by going through generators. `Pin`, which you have seen in above codes, is for this. `Pin<Pointer<T>>` is a type that guarantees that `T` won't be moved. `Pin` is very important for self referential data structures, since moving self referential data structres invalidates them.

Now that we have `async`, we can write concourent code which runs in a single thread:

In [17]:
:dep futures = "0.3"

In [17]:
fn concurent_job() -> impl Future<Output = ()> {
    async {
        futures::join!(
            async {
                println!("start");
                let x = sleep_1s_and_return_5().await;
                println!("middle");
                let y = sleep_1s_and_return_5().await;
                println!("end");
                x + y
            },
            async {
                for i in 1..=5 {
                    println!("{i}");
                    sleep_1s_and_return_5().await;
                }
            }
        );
    }
}

{
    let future = concurent_job();
    let mut sleep_result = Box::pin(future);
    let waker = Arc::new(EmptyWaker).into();
    let mut context = Context::from_waker(&waker);
    loop {
        match sleep_result.as_mut().poll(&mut context) {
            Poll::Ready(x) => break x,
            Poll::Pending => (),
        }
        sleep(Duration::from_millis(100));
    }
}

start
1
middle
2
end
3
4
5


()

Until this point, we poll each future eagarly and added a 100ms delay to not prevent busy waiting. This is wrong since a future can become ready before 100ms. We should use wake mechanism to fix this:

In [14]:
use std::{thread, future::Future, task::{Context, Wake, Waker}, pin::Pin, sync::Arc};

struct Sleep {
    start_time: Instant,
    waker_started: bool,
}

impl Future for Sleep {
    type Output = i32;
    fn poll(self: Pin<&mut Sleep>, cx: &mut Context) -> Poll<i32> {
        if Instant::now() - self.start_time > Duration::from_secs(1) {
            Poll::Ready(5)
        } else {
            let remain = Duration::from_secs(1) - (Instant::now() - self.start_time);
            let waker = cx.waker().clone();
            if !self.waker_started {
                thread::spawn(move || {
                    thread::sleep(remain);
                    waker.wake();
                });
            }
            Poll::Pending
        }
    }
}

struct ThreadWaker(thread::Thread);
impl Wake for ThreadWaker {
    fn wake(self: Arc<Self>) {
        self.0.unpark();
    }
}

fn sleep_1s_and_return_5() -> impl Future<Output = i32> {
    Sleep { start_time: Instant::now(), waker_started: false }
}

{
    let future = &mut sleep_1s_and_return_5();
    let mut sleep_result = Pin::new(future);
    let waker = Arc::new(ThreadWaker(thread::current())).into();
    let mut context = Context::from_waker(&waker);
    loop {
        match sleep_result.as_mut().poll(&mut context) {
            Poll::Ready(x) => break x,
            Poll::Pending => println!("pending"),
        }
        thread::park();
    }
}

The type of the variable sleep_result was redefined, so was lost.


pending


5

Now there is just one `pending` executed. The loop code which creates a waker and call `.poll` until it finishes is called an executor, and the code inside sleep future which creates a thread and calls `.wake` is a reactor (a super stupid one). Since Rust is a no runtime language (unlike for example JS) writing these is on the shoulder of user. There are many third party libraries which provide executor and reactors for common things like timers and networking and IO. Some executors are for embedded systems, some are for writing OS, and ..., and there are multiple competeing executors for running in normal computers, which in them `tokio` is the more popular one.

In [19]:
:dep tokio = { version = "1", features = ["full"] }

In [22]:
use tokio::runtime::Runtime;

{
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        futures::join!(
            async {
                for i in 1..=5 {
                    // this sleep function returns a future which its waker works with tokio runtime
                    tokio::time::sleep(Duration::from_millis(700)).await;
                    println!("a {i}");
                }
            },
            async {
                for i in 1..=10 {
                    tokio::time::sleep(Duration::from_millis(300)).await;
                    println!("b {i}");
                }
            }
        );
    })
}

b 1
b 2
a 1
b 3
b 4
a 2
b 5
b 6
a 3
b 7
b 8
b 9
a 4
b 10
a 5


()

This was a basic taste of async in Rust, and was no where near complete. There are other great resources to learn async in Rust. The goal of this chapter was to only make you familiar with this existence of this feature in Rust, which doesn't exists in C, and it is only added in C++20 (in Rust, tokio and futures was there from 2016 and async await syntax was added in 2018). The one in C++ is similar, but different. Both use generators to lower the async blocks, but C++ exposes the coroutine to the user unlike Rust, which coroutine and generators are just implementation details. C++ adds implicit allocations, so it's async await is not useful for embedded and similar which allocation is not available. Also, being older, and with a sane package manager, Rust has richer async ecosystem.

## Downside of Async Rust

For normal Rust, there was C and C++ to take lessons from and not repeat their mistake, so it is very clean, consistent and simple. But Rust was the first low level and runtime less language adding async syntax, so many things are not ideal.

### Async trait

Before async trait, let's introduce async functions:

In [24]:
async fn async_fn(name: &str, count: usize, delay: Duration) {
    for i in 1..=count {
        tokio::time::sleep(delay).await;
        println!("{name} {i}");
    }
}

Which are just a syntax sugar for this:

In [28]:
fn async_fn_desugared(name: &str, count: usize, delay: Duration) -> impl Future<Output=()> + '_ {
    async move {
        for i in 1..=count {
            tokio::time::sleep(delay).await;
            println!("{name} {i}");
        }
    }
}

And we can use them like any other function which returns a future:

In [29]:
{
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        async_fn("a", 5, Duration::from_millis(700)).await;
    })
}

a 1
a 2
a 3
a 4
a 5


()

In a real world async code, almost all functions are async, and you will rarely see async blocks. But, you can't make function in traits async:

In [30]:
trait SomeTrait {
    async fn foo();
}

Error: functions in traits cannot be declared `async`

Because `impl Trait` doesn't work in traits:

In [33]:
trait SomeTrait {
    fn foo() -> impl Future<Output = i32>;
}

Error: `impl Trait` only allowed in function and inherent method return types, not in trait method return

There is active work on support for `async fn` and `impl Trait` in traits. Until that work becomes done, we can use `Box<dyn Trait>`: 

In [34]:
trait SomeTrait {
    fn foo() -> Box<dyn Future<Output = i32>>;
}

We can use `#[async_trait]` to make this conversation automatic:

In [37]:
:dep async-trait = "0.1"

In [45]:
use async_trait::async_trait;

#[async_trait]
trait SomeTrait {
    async fn foo(self) -> i32;
}

#[async_trait]
impl SomeTrait for i32 {
    async fn foo(self) -> i32 {
        tokio::time::sleep(Duration::from_millis(200)).await;
        44 + self
    }
}

{
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        2.foo().await
    })
}

46

Since implicit heap allocation is not acceptable in Rust, proper language support is blocked until `impl Trait` in traits. 

### Splitted ecosystem

Rust doesn't have executor and reactors in the standard library, and it's a good thing, because there is no one-size-fit-all executor. A tiny embedded executor is not desired for a multi core server, and vise versa. But this causes interoperability problems. A library which uses `tokio::time::sleep` is incompatible with one using `embassy_executor::time::block_for` which is the same function for `embassy`, an embedded executor, even if it just sleeps and doesn't require any other functionality. There are async libraries which are executor independent, with having their own reactor. But they are either for a specific propose, or are not as performant as executor dependent libraries (like our timer, which was executor independent, but extremely wasteful by running one thread per sleep call). This can be partially solved by async trait and functions which are generic over executor.