Skip to content

Commit

Permalink
dataflow/logging: Activate on data
Browse files Browse the repository at this point in the history
Currently, the logging dataflows activate once every introspection
interval to process data. This can cause a significant buildup of data
before it is processed.

In addition to activating the replay operator periodically, also
activate it once a sufficient amount of data has been accumulated.
Ideally, the data processing should be activated as soon as some data is
available, but this could cause a situation where activating causes
additional data to be produced.

For this reason, the dataflow activates after 32 batches have been
published by the logging infrastructure. This seems to be a reasonable
trade-off between memory (currently at most 8KiB per batch) and latency.

The time-based activation is still required in case there is no data for
a specific dataflow. In this case, we still need to advance the clock,
which the time-based activation takes care of.

Signed-off-by: Moritz Hoffmann <mh@materialize.com>
  • Loading branch information
antiguru committed Oct 5, 2021
1 parent 2115c46 commit cb53747
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 18 deletions.
88 changes: 88 additions & 0 deletions src/dataflow/src/activator.rs
@@ -0,0 +1,88 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Utilities to activate dataflows based on external triggers.

use std::cell::RefCell;
use std::rc::Rc;
use timely::scheduling::Activator;

/// An shared handle to multiple activators with support for triggering and acknowledging
/// activations.
#[derive(Debug, Clone)]
pub struct RcActivator {
inner: Rc<RefCell<ActivatorInner>>,
}

impl RcActivator {
/// Construct a new [RcActivator] with the given name.
pub fn new(name: String) -> Self {
let inner = ActivatorInner::new(name);
Self {
inner: Rc::new(RefCell::new(inner)),
}
}

/// Register an additional [Activator] with this [RcActivator]
pub fn register(&mut self, activator: Activator) {
self.inner.borrow_mut().register(activator)
}

/// Activate all contained activators.
///
/// The implementation is free to ignore activations and only release them once a sufficient
/// volume has been accumulated.
pub fn activate(&mut self) {
self.inner.borrow_mut().activate()
}

/// Acknowledge the activation, which enables new activations to be scheduled.
pub fn ack(&mut self) {
self.inner.borrow_mut().ack()
}
}

#[derive(Debug)]
struct ActivatorInner {
activated: usize,
activators: Vec<Activator>,
name: String,
}

impl ActivatorInner {
const THRESHOLD: usize = 32;

fn new(name: String) -> Self {
Self {
name,
activated: 0,
activators: Vec::new(),
}
}

fn register(&mut self, activator: Activator) {
self.activators.push(activator)
}

fn activate(&mut self) {
if self.activators.is_empty() {
return;
}
self.activated += 1;
if self.activated == ActivatorInner::THRESHOLD {
for activator in &self.activators {
activator.activate();
}
}
}

fn ack(&mut self) {
self.activated = 0;
}
}
128 changes: 128 additions & 0 deletions src/dataflow/src/collection.rs
@@ -0,0 +1,128 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Collection abstractions to handle `(key, value)` transparently.

use crate::operator::CollectionExt;
use differential_dataflow::difference::{Abelian, Semigroup};
use differential_dataflow::{AsCollection, Collection, Data};
use std::ops::Mul;
use timely::dataflow::operators::generic::operator;
use timely::dataflow::Scope;

/// A collection of rows split in a key and value
#[derive(Clone)]
pub struct KeyedCollection<S: Scope, D: Data, R: Semigroup> {
collection: Collection<S, (D, D), R>,
}

impl<S: Scope, D: Data, R: Semigroup> KeyedCollection<S, D, R> {
/// Construct a new KeyedCollection
pub fn new(collection: Collection<S, (D, D), R>) -> Self {
Self { collection }
}
}

impl<S: Scope, D: Data, R: Semigroup> AsCollection<S, D, R> for KeyedCollection<S, D, R> {
// TODO: We want this gone
fn as_collection(&self) -> Collection<S, D, R> {
self.collection.map(|(_k, v)| v)
}
}

impl<S: Scope, D: Data, R: Semigroup> From<Collection<S, (D, D), R>> for KeyedCollection<S, D, R> {
fn from(collection: Collection<S, (D, D), R>) -> Self {
Self::new(collection)
}
}

impl<S: Scope, D: Data, R: Abelian> KeyedCollection<S, D, R> {
pub fn negate(&self) -> Self {
self.collection.negate().into()
}
}

impl<S: Scope, D: Data, R: Semigroup> CollectionExt<S, (D, D), R> for KeyedCollection<S, D, R> {
fn empty(scope: &S) -> Collection<S, (D, D), R> {
operator::empty(scope).as_collection()
}

fn flat_map_fallible<D2, E, I, L>(
&self,
name: &str,
logic: L,
) -> (Collection<S, D2, R>, Collection<S, E, R>)
where
D2: timely::Data,
E: timely::Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut((D, D)) -> I + 'static,
{
todo!()
}

fn explode_fallible<D2, E, R2, I, L>(
&self,
name: &str,
logic: L,
) -> (
Collection<S, D2, <R2 as Mul<R>>::Output>,
Collection<S, E, <R2 as Mul<R>>::Output>,
)
where
D2: timely::Data,
E: timely::Data,
R2: Semigroup + Mul<R>,
<R2 as Mul<R>>::Output: timely::Data + Semigroup,
I: IntoIterator<Item = (Result<D2, E>, R2)>,
L: FnMut((D, D)) -> I + 'static,
{
todo!()
}
}

/// Concatenates multiple collections.
///
/// This method has the effect of a sequence of calls to `concat`, but it does
/// so in one operator rather than a chain of many operators.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// differential_dataflow::collection::concatenate(scope, vec![odds, evens])
/// .assert_eq(&data);
/// });
/// }
/// ```
pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> KeyedCollection<G, D, R>
where
G: Scope,
D: Data,
R: Semigroup,
I: IntoIterator<Item = KeyedCollection<G, D, R>>,
{
differential_dataflow::collection::concatenate(
scope,
iterator.into_iter().map(|kc| kc.collection),
)
.into()
}
2 changes: 2 additions & 0 deletions src/dataflow/src/lib.rs
Expand Up @@ -11,11 +11,13 @@

//! Driver for timely/differential dataflow.

mod activator;
mod arrangement;
mod decode;
mod metrics;
mod operator;
mod render;
mod replay;
mod server;
mod sink;

Expand Down
11 changes: 8 additions & 3 deletions src/dataflow/src/logging/differential.rs
Expand Up @@ -18,30 +18,34 @@ use differential_dataflow::operators::arrange::arrangement::Arrange;
use timely::communication::Allocate;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::capture::EventLink;
use timely::dataflow::operators::capture::Replay;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::logging::WorkerIdentifier;

use super::{DifferentialLog, LogVariant};
use crate::activator::RcActivator;
use crate::arrangement::manager::RowSpine;
use crate::arrangement::KeysValsHandle;
use crate::logging::ConsolidateBuffer;
use crate::render::datum_vec::DatumVec;
use crate::replay::MzReplay;
use repr::{Datum, Row, Timestamp};

/// Constructs the logging dataflows and returns a logger and trace handles.
pub fn construct<A: Allocate>(
worker: &mut timely::worker::Worker<A>,
config: &dataflow_types::logging::LoggingConfig,
linked: std::rc::Rc<EventLink<Timestamp, (Duration, WorkerIdentifier, DifferentialEvent)>>,
mut activator: RcActivator,
) -> HashMap<LogVariant, (Vec<usize>, KeysValsHandle)> {
let granularity_ms = std::cmp::max(1, config.granularity_ns / 1_000_000) as Timestamp;

let traces = worker.dataflow_named("Dataflow: differential logging", move |scope| {
let logs = Some(linked).replay_core(
let (act, logs) = Some(linked).mz_replay(
scope,
Some(Duration::from_nanos(config.granularity_ns as u64)),
"differential logs",
Duration::from_nanos(config.granularity_ns as u64),
);
activator.register(act);

let mut demux =
OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
Expand All @@ -54,6 +58,7 @@ pub fn construct<A: Allocate>(
let mut demux_buffer = Vec::new();
demux.build(move |_capability| {
move |_frontiers| {
activator.ack();
let arrangement_batches = arrangement_batches_out.activate();
let arrangement_records = arrangement_records_out.activate();
let sharing = sharing_out.activate();
Expand Down
11 changes: 8 additions & 3 deletions src/dataflow/src/logging/materialized.rs
Expand Up @@ -17,14 +17,15 @@ use differential_dataflow::operators::count::CountTotal;
use log::error;
use timely::communication::Allocate;
use timely::dataflow::operators::capture::EventLink;
use timely::dataflow::operators::capture::Replay;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::logging::WorkerIdentifier;

use super::{LogVariant, MaterializedLog};
use crate::activator::RcActivator;
use crate::arrangement::manager::RowSpine;
use crate::arrangement::KeysValsHandle;
use crate::render::datum_vec::DatumVec;
use crate::replay::MzReplay;
use expr::{GlobalId, SourceInstanceId};
use repr::{Datum, Row, Timestamp};

Expand Down Expand Up @@ -153,14 +154,17 @@ pub fn construct<A: Allocate>(
worker: &mut timely::worker::Worker<A>,
config: &dataflow_types::logging::LoggingConfig,
linked: std::rc::Rc<EventLink<Timestamp, (Duration, WorkerIdentifier, MaterializedEvent)>>,
mut activator: RcActivator,
) -> std::collections::HashMap<LogVariant, (Vec<usize>, KeysValsHandle)> {
let granularity_ms = std::cmp::max(1, config.granularity_ns / 1_000_000) as Timestamp;

let traces = worker.dataflow_named("Dataflow: mz logging", move |scope| {
let logs = Some(linked).replay_core(
let (act, logs) = Some(linked).mz_replay(
scope,
Some(Duration::from_nanos(config.granularity_ns as u64)),
"materialized logs",
Duration::from_nanos(config.granularity_ns as u64),
);
activator.register(act);

let mut demux =
OperatorBuilder::new("Materialize Logging Demux".to_string(), scope.clone());
Expand All @@ -180,6 +184,7 @@ pub fn construct<A: Allocate>(
let mut active_dataflows = std::collections::HashMap::new();
let mut peek_stash = std::collections::HashMap::new();
move |_frontiers| {
activator.ack();
let mut dataflow = dataflow_out.activate();
let mut dependency = dependency_out.activate();
let mut frontier = frontier_out.activate();
Expand Down
15 changes: 10 additions & 5 deletions src/dataflow/src/logging/reachability.rs
Expand Up @@ -19,9 +19,11 @@ use timely::dataflow::operators::capture::EventLink;
use timely::logging::WorkerIdentifier;

use super::{LogVariant, TimelyLog};
use crate::activator::RcActivator;
use crate::arrangement::manager::RowSpine;
use crate::arrangement::KeysValsHandle;
use crate::logging::ConsolidateBuffer;
use crate::replay::MzReplay;
use dataflow_types::logging::LoggingConfig;
use ore::iter::IteratorExt;
use repr::{Datum, Row, RowArena, Timestamp};
Expand All @@ -43,22 +45,24 @@ pub fn construct<A: Allocate>(
),
>,
>,
mut activator: RcActivator,
) -> std::collections::HashMap<LogVariant, (Vec<usize>, KeysValsHandle)> {
let granularity_ms = std::cmp::max(1, config.granularity_ns / 1_000_000) as Timestamp;

// A dataflow for multiple log-derived arrangements.
let traces = worker.dataflow_named("Dataflow: timely reachability logging", move |scope| {
use differential_dataflow::collection::AsCollection;
use timely::dataflow::operators::capture::Replay;

let logs = Some(linked).replay_core(
let (act, logs) = Some(linked).mz_replay(
scope,
Some(Duration::from_nanos(config.granularity_ns as u64)),
"reachability logs",
Duration::from_nanos(config.granularity_ns as u64),
);
activator.register(act);

use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;

let construct_reachability = |key: Vec<_>| {
let construct_reachability = |key: Vec<_>, mut activator: RcActivator| {
let mut flatten = OperatorBuilder::new(
"Timely Reachability Logging Flatten ".to_string(),
scope.clone(),
Expand All @@ -72,6 +76,7 @@ pub fn construct<A: Allocate>(
let mut buffer = Vec::new();
flatten.build(move |_capability| {
move |_frontiers| {
activator.ack();
let updates = updates_out.activate();
let mut updates_session = ConsolidateBuffer::new(updates, 0);

Expand Down Expand Up @@ -133,7 +138,7 @@ pub fn construct<A: Allocate>(
if config.active_logs.contains_key(&variant) {
let key = variant.index_by();
let key_clone = key.clone();
let trace = construct_reachability(key.clone())
let trace = construct_reachability(key.clone(), activator.clone())
.arrange_named::<RowSpine<_, _, _, _>>(&format!("Arrange {:?}", variant))
.trace;
result.insert(variant, (key_clone, trace));
Expand Down

0 comments on commit cb53747

Please sign in to comment.