Skip to content

Commit

Permalink
Merge pull request #266 from TimelyDataflow/stream_concatenate
Browse files Browse the repository at this point in the history
Add Concatenate for streams
  • Loading branch information
frankmcsherry committed Apr 26, 2019
2 parents 8d96d18 + 726179e commit 23a66da
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions timely/src/dataflow/operators/concat.rs
Expand Up @@ -47,11 +47,26 @@ pub trait Concatenate<G: Scope, D: Data> {
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn concatenate(&self, _: impl IntoIterator<Item=Stream<G, D>>) -> Stream<G, D>;
fn concatenate<I>(&self, sources: I) -> Stream<G, D>
where
I: IntoIterator<Item=Stream<G, D>>;
}

impl<G: Scope, D: Data> Concatenate<G, D> for Stream<G, D> {
fn concatenate<I>(&self, sources: I) -> Stream<G, D>
where
I: IntoIterator<Item=Stream<G, D>>
{
let clone = self.clone();
self.scope().concatenate(Some(clone).into_iter().chain(sources))
}
}

impl<G: Scope, D: Data> Concatenate<G, D> for G {
fn concatenate(&self, sources: impl IntoIterator<Item=Stream<G, D>>) -> Stream<G, D> {
fn concatenate<I>(&self, sources: I) -> Stream<G, D>
where
I: IntoIterator<Item=Stream<G, D>>
{

// create an operator builder.
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
Expand Down

0 comments on commit 23a66da

Please sign in to comment.