Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] iterative when_all #55

Open
kikaxa opened this issue Jun 22, 2023 · 1 comment
Open

[feature] iterative when_all #55

kikaxa opened this issue Jun 22, 2023 · 1 comment

Comments

@kikaxa
Copy link

kikaxa commented Jun 22, 2023

Many code paths (dynamically/iteratively) generate tasks that need to be waited on.
Currently one needs to make a temporary storage for all of them and then create when_all composition.
The proposal api simply separates the when_all initialization and addition steps, allowing it to be easily used with loops, generators, fold expressions etc..

Please let me know if the feature is ok with you and any comments on proposed code below:
I can make a pr then..
// On a side note, while it should be possible to make an analogue for when_any, i dont imagine (many) use-cases for that rn. Therefore not suggesting to add it.

/// Returns when_all state that can be used with tasks from generators without intermediate storage
/// Use operator(task) to fill it with tasks and
/// readyTask is the composite result task
template<typename Ttask = async::task<void> >
auto when_all_iterative(size_t total_count)
{
    typedef typename std::decay<Ttask>::type task_type;
    typedef std::vector<task_type> result_type;

    struct when_all_iterative_state
    {
        typedef async::detail::when_all_state<result_type> state_type;

        void operator() (task_type&& source_task)
        {
            LIBASYNC_ASSERT(state, std::logic_error, "Adding to empty when_all");

            // Add a continuation to each task to add its result to the shared state
            // Last task sets the event result
            LIBASYNC_TRY {
                source_task.then(async::inline_scheduler(), async::detail::when_all_func_range<task_type, result_type>(next_index++, async::detail::ref_count_ptr<async::detail::when_all_state<result_type>>(state)));
            } LIBASYNC_CATCH(...) {
                // Make sure we don't leak memory if then() throws
                addFailed(1);
                LIBASYNC_RETHROW();
            }
        }

        void setTotal(size_t total_count)
        {
            LIBASYNC_ASSERT(!state, std::logic_error, "Initializing state twice");

            state = new state_type(total_count);
            state->result.resize(total_count);
            readyTask = state->event.get_task();
        }

        void addFailed(size_t failed = 1)
        {
            LIBASYNC_ASSERT(state, std::logic_error, "Adding to empty when_all");
            state->remove_ref(failed);
            next_index += failed;
        }

    protected:
        state_type* state = nullptr;
        size_t next_index = 0;
    public:
        async::task<result_type> readyTask = async::make_task<result_type>({});
    } state;

    if (total_count)
        state.setTotal(total_count);

    return state;
}
@kikaxa
Copy link
Author

kikaxa commented Feb 15, 2024

kikaxa@034821b

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant