Skip to content

Conversation

@jurij-jukic
Copy link

@jurij-jukic jurij-jukic commented Oct 17, 2025

Goes with: hyperware-ai/hyperprocess-macro#51

Join Handle:
Previous behavior of spawn was to just add a future to the spawn queue, and return no handle. Now it spawns a task which takes that future and awaits it. It sends the result back via a oneshot channel. The JoinHandle is a struct wrapping around the receiver of the oneshot channel.

Waker:
New struct ExecutorWakeFlag is basically a flag which tells us whether a waker has been called, and if it has, poll all the pending tasks again (amongst which there is that one task which called the waker in the first place).
The AtomicBool, ArcWake, swap, wake_by_ref, are all there to ensure that we can pass a waker reference to many tasks at the same time, and check from the outer loop if one of them had called the waker: let was_woken = wake_flag.take();

Note - previously we used a no-op waker, which doesn't work if we want to have handles for tasks, i.e. repoll once a future has completed. No-op waker would cause us to depend on the main event loop to receive a message from somewhere to repoll all tasks.

Example hyperapp code using join handles:

        println!("Demonstrating JoinHandle with two concurrent tasks...");
        // JoinHandle example: wait for two spawned tasks concurrently
        let join_demo_a: JoinHandle<String> = spawn(async {
            println!("task A sleeping...");
            sleep(150).await.ok();
            println!("task A awake");
            "result from task A".to_string()
        });
        let join_demo_b: JoinHandle<i32> = spawn(async {
            println!("task B computing 2 + 2...");
            2 + 2
        });
        let (res_a, res_b) = futures::join!(join_demo_a, join_demo_b);
        match (res_a, res_b) {
            (Ok(msg), Ok(sum)) => println!("Join demo combined -> {msg} and {sum}"),
            (a, b) => println!("Join demo failed: left = {:?}, right = {:?}", a, b),
        }

@jurij-jukic
Copy link
Author

jurij-jukic commented Oct 20, 2025

There are two edge cases when a future gets dropped:

  • we need to make sure that the future is removed from RESPONSE_REGISTRY once its dropped. For that we impl Drop and remove it from RESPONSE_REGISTRY
  • we need to make sure that if a future is dropped before receiving a response, the incoming response doesn't stay stored in the RESPONSE_REGISTRY indefinitely. So we track CANCELLED_RESPONSES against which we check incoming responses in hyperprocess macro
    (this latest commit requires the PR to be synchronized with Feat: ResponseFuture drop edge cases hyperprocess-macro#51)

So finally, we can use select now:

        let send_fut =
            hyperware_process_lib::hyperapp::send::<Result<CounterSnapshot, String>>(request)
                .map(|res| match res {
                    Ok(inner) => inner,
                    Err(err) => Err(format!("transport failed: {err}")),
                })
                .fuse();

        let timeout_fut = hyperware_process_lib::hyperapp::sleep(1)
            .map(|result| {
                let _ = result;
                Err::<CounterSnapshot, String>("timed out waiting for reply".into())
            })
            .fuse();

        pin_mut!(send_fut);
        pin_mut!(timeout_fut);

        let result = select! {
            res = send_fut => res,
            timeout_res = timeout_fut => timeout_res,
        };
        println!("Initial select result: {result:?}");

Note that, when using futures select, we need to use FutureExt::map() and fuse()

Copy link
Member

@nick1udwig nick1udwig left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested; LGTM. Very nice work ⚡

@jurij-jukic jurij-jukic merged commit d3bb304 into develop Oct 24, 2025
1 check passed
@jurij-jukic jurij-jukic deleted the j/spawn-join branch October 24, 2025 08:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants