Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit multiple values from Pipeline stage #517

Open
bear24rw opened this issue Oct 1, 2023 · 3 comments
Open

Emit multiple values from Pipeline stage #517

bear24rw opened this issue Oct 1, 2023 · 3 comments
Labels
question Further information is requested

Comments

@bear24rw
Copy link

bear24rw commented Oct 1, 2023

Is there an idiomatic pattern for emitting multiple values from a pipeline stage? For example in the DataPipeline shown in the docs how could I emit multiple values from the second pipe?

If I use tf::Pipeline instead I could have the second pipe add the items it created to a queue and have the 3rd pipe loop over that queue. But this breaks down in a different example where I emit multiple items from the second pipe but I want to process all those items in the third pipe concurrently. It seems like I'm looking for some kind of tf::PipeFlow::emit(item) method which I could call multiple times in the second pipe and it would trigger multiple runs of the 3rd pipe.

This seems possibly similar to corun except I don't want to specify a target, the target should be known automatically: its the next stage of the pipeline.

#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/data_pipeline.hpp>

int main() {

  // data flow => void -> int -> std::string -> float -> void 
  tf::Taskflow taskflow("pipeline");
  tf::Executor executor;

  const size_t num_lines = 4;
  
  // create a pipeline graph
  tf::DataPipeline pl(num_lines,
    tf::make_data_pipe<void, int>(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) -> int{
      if(pf.token() == 5) {
        pf.stop();
        return 0;
      }
      else {
        printf("first pipe returns %lu\n", pf.token());
        return pf.token();
      }
    }),

    tf::make_data_pipe<int, std::string>(tf::PipeType::SERIAL, [](int& input) {
      printf("second pipe returns a strong of %d\n", input + 100);
      return std::to_string(input + 100);
      return std::to_string(input + 200); // <---------------- I want to emit both + 100 and + 200
    }),

    tf::make_data_pipe<std::string, void>(tf::PipeType::SERIAL, [](std::string& input) {
      printf("third pipe receives the input string %s\n", input.c_str());
    })
  );

  // build the pipeline graph using composition
  taskflow.composed_of(pl).name("pipeline");

  // dump the pipeline graph structure (with composition)
  taskflow.dump(std::cout);

  // run the pipeline
  executor.run(taskflow).wait();

  return 0;
}
@tsung-wei-huang
Copy link
Member

In this case, we can use std::pair<std::string, std::string> as the input to the third pipe. Does that make sense to you?

@tsung-wei-huang tsung-wei-huang added the question Further information is requested label Oct 2, 2023
@bear24rw
Copy link
Author

bear24rw commented Oct 2, 2023

we can use std::pair<std::string, std::string> as the input to the third pipe

It makes sense but doesn't exactly solve what I'm looking for. I want to turn the 3rd pipe into a CONCURRENT pipe such that the two (or more) items emitted from the 2nd pipe are processed in parallel.

My actual application has 1 pipe decoding a datastream, when its decoded there may by 0 to N decoded "packets" that it emits. I want to send all these packets to the next pipe to be processed in parallel.

It seems like the tf::Pipeline setup assumes there is always 1 item in and 1 item out from each pipe. But I have 1 item in and N items out of my pipe.

@bear24rw
Copy link
Author

bear24rw commented Oct 26, 2023

This paper, talking about TBB, actually mentions my situation:

https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=d5a650bf4690ad6718c31aebe87cf7ceb458c7c1

Single input, multiple outputs

The SplitBlocks stage takes a block and splits it into smaller segments; 
it takes a single input token and produces multiple output tokens. However, 
Filter objects in TBB can only take a single input and produce a single output. 
To mimic the Pthreads version, we had to resort to nested pipelines as shown
in Figure 5(b).

It would be great if TaskFlow could solve this without having to do the nested pipelines workaround.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants