Skip to content

Commit

Permalink
Revert unnecessary moving constant and render filtering to separate f…
Browse files Browse the repository at this point in the history
…iles.
  • Loading branch information
Andi Wang committed Sep 1, 2020
1 parent 4b5156b commit 9b2dfbf
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 129 deletions.
55 changes: 0 additions & 55 deletions src/dataflow/src/render/constant.rs

This file was deleted.

62 changes: 0 additions & 62 deletions src/dataflow/src/render/filter.rs

This file was deleted.

10 changes: 3 additions & 7 deletions src/dataflow/src/render/join.rs
Expand Up @@ -9,7 +9,6 @@

use std::iter;

use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::arrangement::Arrange;
use differential_dataflow::operators::arrange::arrangement::Arranged;
use differential_dataflow::operators::join::{Join, JoinCore};
Expand All @@ -19,7 +18,6 @@ use differential_dataflow::trace::Cursor;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::Collection;
use timely::dataflow::Scope;
use timely::progress::{timestamp::Refines, Timestamp};

use dataflow_types::*;
use expr::{RelationExpr, ScalarExpr};
Expand All @@ -28,11 +26,9 @@ use repr::{Datum, RelationType, Row, RowArena};
use crate::operator::CollectionExt;
use crate::render::context::{ArrangementFlavor, Context};

impl<G, T> Context<G, RelationExpr, Row, T>
impl<G> Context<G, RelationExpr, Row, dataflow_types::Timestamp>
where
G: Scope,
G::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice,
G: Scope<Timestamp = dataflow_types::Timestamp>,
{
pub fn render_join(
&mut self,
Expand Down Expand Up @@ -487,7 +483,7 @@ where
};

// 1c. Render the constant collection
self.render_constant(&constant_row_expr, scope, worker_index);
self.ensure_rendered(&constant_row_expr, scope, worker_index);
let (constant_collection, errs) = self.collection(&constant_row_expr).unwrap();

// 1d. Assemble parts of the Join `RelationExpr` together.
Expand Down
38 changes: 33 additions & 5 deletions src/dataflow/src/render/mod.rs
Expand Up @@ -112,6 +112,7 @@ use differential_dataflow::operators::consolidate::Consolidate;
use differential_dataflow::{AsCollection, Collection};
use futures::executor::block_on;
use timely::communication::Allocate;
use timely::dataflow::operators::to_stream::ToStream;
use timely::dataflow::operators::unordered_input::UnorderedInput;
use timely::dataflow::operators::Map;
use timely::dataflow::scopes::Child;
Expand Down Expand Up @@ -140,10 +141,8 @@ use crate::source::{self, FileSourceInfo, KafkaSourceInfo, KinesisSourceInfo};
use crate::source::{SourceConfig, SourceToken};

mod arrange_by;
mod constant;
mod context;
mod delta_join;
mod filter;
mod join;
mod reduce;
mod threshold;
Expand Down Expand Up @@ -748,8 +747,22 @@ where
// a collection or an arrangement. In either case, we associate the result with
// the `relation_expr` argument in the context.
match relation_expr {
RelationExpr::Constant { .. } => {
self.render_constant(relation_expr, scope, worker_index);
RelationExpr::Constant { rows, .. } => {
let rows = if worker_index == 0 {
rows.clone()
} else {
vec![]
};

let collection = rows
.to_stream(scope)
.map(|(x, diff)| (x, timely::progress::Timestamp::minimum(), diff))
.as_collection();

let err_collection = Collection::empty(scope);

self.collections
.insert(relation_expr.clone(), (collection, err_collection));
}

// A get should have been loaded into the context, and it is surprising to
Expand Down Expand Up @@ -912,7 +925,22 @@ where
};
(ok_collection, err_collection.map(Into::into))
} else {
self.render_filter(input, predicates.clone(), scope, worker_index)
self.ensure_rendered(input, scope, worker_index);
let temp_storage = RowArena::new();
let predicates = predicates.clone();
let (ok_collection, err_collection) = self.collection(input).unwrap();
let (ok_collection, new_err_collection) =
ok_collection.filter_fallible(move |input_row| {
let datums = input_row.unpack();
for p in &predicates {
if p.eval(&datums, &temp_storage)? != Datum::True {
return Ok(false);
}
}
Ok::<_, DataflowError>(true)
});
let err_collection = err_collection.concat(&new_err_collection);
(ok_collection, err_collection)
};
self.collections.insert(relation_expr.clone(), collections);
}
Expand Down

0 comments on commit 9b2dfbf

Please sign in to comment.