Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] pegasus dataflow panic when worker not in peers broadcast #1954

Closed
lnfjpt opened this issue Aug 10, 2022 · 0 comments
Closed

[BUG] pegasus dataflow panic when worker not in peers broadcast #1954

lnfjpt opened this issue Aug 10, 2022 · 0 comments
Assignees
Labels
bug Something isn't working

Comments

@lnfjpt
Copy link
Collaborator

lnfjpt commented Aug 10, 2022

Following dataflow panic when a worker which is not in peers broadcast a batch

    conf.set_workers(2);
    let mut results = pegasus::run(conf, || {
        |input, output| {
            let worker_id = input.get_worker_index();
            let workers = pegasus::get_current_worker().total_peers() as u64;
            let stream = input.input_from(vec![worker_id as u64])?;
            stream
                .flat_map(|id| {
                    Ok(Some(id).into_iter())
                })?
                .repartition(move |id| Ok(*id % (workers/2)))
                .filter_map(|source| {
                    Ok(Some(source))
                })?
                .broadcast()
                .sink_into(output)
        }

    }).expect("run job fail;");

    while let Some(next) = results.next() {
        let n = next.unwrap();
        println!("{}", n);
    }
@lnfjpt lnfjpt added the bug Something isn't working label Aug 10, 2022
bmmcq added a commit that referenced this issue Aug 11, 2022
* [pegasus] fix bug in (#1954l)

* tidy up;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants