From 5edbdeec2ef9c0595bfdc61feccd9d171312ac57 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 15 Jan 2025 09:55:12 +0100 Subject: [PATCH 1/9] Convert compute logging to columnar Signed-off-by: Moritz Hoffmann --- Cargo.lock | 29 +- Cargo.toml | 10 + src/compute-types/Cargo.toml | 1 + src/compute-types/src/plan.rs | 25 +- src/compute-types/src/plan/lowering.rs | 2 +- src/compute/Cargo.toml | 1 + src/compute/src/compute_state.rs | 4 +- src/compute/src/extensions/arrange.rs | 10 +- src/compute/src/logging.rs | 28 +- src/compute/src/logging/compute.rs | 333 ++++++++++++--------- src/compute/src/logging/differential.rs | 2 +- src/compute/src/logging/initialize.rs | 30 +- src/compute/src/logging/timely.rs | 2 +- src/compute/src/render.rs | 11 +- src/compute/src/server.rs | 2 +- src/repr/src/global_id.rs | 2 + src/repr/src/timestamp.rs | 2 + src/timely-util/Cargo.toml | 2 + src/timely-util/src/containers.rs | 371 ++++++++++++++++++++++++ 19 files changed, 648 insertions(+), 219 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2657322314fa2..388d9926de059 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1436,9 +1436,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.20.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" [[package]] name = "byteorder" @@ -1755,8 +1755,7 @@ checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" [[package]] name = "columnar" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d754936b0b004c01c338b3d81c39e0a81d2ead5b6ee9fa64bfa140e6c430b80d" +source = "git+https://github.com/frankmcsherry/columnar#ea1be6ddb64fe984dfe847f82c5f5c60f5815835" dependencies = [ "bincode", "bytemuck", @@ -1769,8 +1768,7 @@ dependencies = [ [[package]] name = "columnar_derive" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "174af3249fb00e9845597cb3a8259f05ff62b4060650b6af6adb4e05df277f7a" +source = "git+https://github.com/frankmcsherry/columnar#ea1be6ddb64fe984dfe847f82c5f5c60f5815835" dependencies = [ "proc-macro2", "quote", @@ -5113,6 +5111,7 @@ dependencies = [ "anyhow", "async-stream", "bytesize", + "columnar", "core_affinity", "crossbeam-channel", "dec", @@ -5199,6 +5198,7 @@ dependencies = [ name = "mz-compute-types" version = "0.0.0" dependencies = [ + "columnar", "columnation", "differential-dataflow", "itertools 0.12.1", @@ -7309,6 +7309,8 @@ version = "0.0.0" dependencies = [ "ahash", "bincode", + "bytemuck", + "columnar", "columnation", "differential-dataflow", "either", @@ -10370,8 +10372,7 @@ dependencies = [ [[package]] name = "timely" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b134806db44c8452ba2a9b18e6112f3d5e0a0185c6e58524269a08ca2891149c" +source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" dependencies = [ "bincode", "byteorder", @@ -10389,14 +10390,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99223f9594ab7d4dd36b55b1d7eb9bd2cd205f4304b5cc5381d5cdd417ec06f2" +source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" [[package]] name = "timely_communication" version = "0.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00d5c7f7bbb132a32220890240e5d951fc9582f82fb94452c011a8033c39f7a" +source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" dependencies = [ "byteorder", "columnar", @@ -10411,8 +10410,7 @@ dependencies = [ [[package]] name = "timely_container" version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92b763064e76ba4f650dcdb035d82bcaad09986971fab002276e0b4cb10b3aa5" +source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" dependencies = [ "columnation", "flatcontainer", @@ -10422,8 +10420,7 @@ dependencies = [ [[package]] name = "timely_logging" version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8300263e23bebac4ec9fbbeee65954a25402037609b7716d20191ed35829b14" +source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" dependencies = [ "timely_container", ] diff --git a/Cargo.toml b/Cargo.toml index 74517c65d8d38..0a14fe6c1ede8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -285,6 +285,16 @@ incremental = true # merged), after which point it becomes impossible to build that historical # version of Materialize. [patch.crates-io] +columnar = { git = "https://github.com/frankmcsherry/columnar" } +#columnar = { git = "https://github.com/antiguru/columnar", branch = "attrs" } +#columnar = { path = "../columnar" } +#differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } +#differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow", branch = "logging_fallout", version = "0.13.2" } +#differential-dataflow = { path = "../differential-dataflow" } +#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "logger_flush" } +#timely = { path = "../timely-dataflow/timely" } + # Waiting on https://github.com/sfackler/rust-postgres/pull/752. postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } diff --git a/src/compute-types/Cargo.toml b/src/compute-types/Cargo.toml index a26797e591820..2188835bc6dd1 100644 --- a/src/compute-types/Cargo.toml +++ b/src/compute-types/Cargo.toml @@ -10,6 +10,7 @@ publish = false workspace = true [dependencies] +columnar = "0.2.0" columnation = "0.1.0" differential-dataflow = "0.13.3" itertools = "0.12.1" diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index 88260444c32f7..abd1eee3afb17 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -14,6 +14,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::num::NonZeroU64; +use columnar::Columnar; use mz_expr::{ CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, OptimizedMirRelationExpr, TableFunc, @@ -161,14 +162,12 @@ impl AvailableCollections { } /// An identifier for an LIR node. -/// -/// LirIds start at 1, not 0, which let's us get a better struct packing in `ComputeEvent::LirMapping`. -#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] -pub struct LirId(NonZeroU64); +#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, Columnar)] +pub struct LirId(u64); impl LirId { fn as_u64(&self) -> u64 { - self.0.into() + self.0 } } @@ -186,11 +185,11 @@ impl std::fmt::Display for LirId { impl RustType for LirId { fn into_proto(&self) -> u64 { - u64::from(self.0) + self.0 } fn from_proto(proto: u64) -> Result { - Ok(Self(proto.try_into()?)) + Ok(Self(proto)) } } @@ -513,7 +512,7 @@ impl Arbitrary for LirId { type Parameters = (); fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { - let lir_id = NonZeroU64::arbitrary(); + let lir_id = u64::arbitrary(); lir_id.prop_map(LirId).boxed() } } @@ -1132,16 +1131,6 @@ mod tests { use super::*; - #[mz_ore::test] - fn test_option_lirid_fits_in_usize() { - let option_lirid_size = std::mem::size_of::>(); - let usize_size = std::mem::size_of::(); - assert!( - option_lirid_size <= usize_size, - "Option (size {option_lirid_size}) should fit in usize (size {usize_size})" - ); - } - proptest! { #![proptest_config(ProptestConfig::with_cases(10))] #[mz_ore::test] diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 2fa49f3585d59..7b9d2d1a693c3 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -43,7 +43,7 @@ impl Context { pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self { Self { arrangements: Default::default(), - next_lir_id: LirId(std::num::NonZero::::MIN), + next_lir_id: LirId(1), debug_info: LirDebugInfo { debug_name, id: GlobalId::Transient(0), diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 5f459f9edeb53..a86a71668411f 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -13,6 +13,7 @@ workspace = true anyhow = "1.0.66" async-stream = "0.3.3" bytesize = "1.1.0" +columnar = "0.2.0" crossbeam-channel = "0.5.8" dec = { version = "0.4.8", features = ["serde"] } differential-dataflow = "0.13.3" diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 70aa3b9f419dc..1e419838c64a2 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -597,7 +597,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { // Log the receipt of the peek. if let Some(logger) = self.compute_state.compute_logger.as_mut() { - logger.log(pending.as_log_event(true)); + logger.log(&pending.as_log_event(true)); } self.process_peek(&mut Antichain::new(), pending); @@ -891,7 +891,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { // Log responding to the peek request. if let Some(logger) = self.compute_state.compute_logger.as_mut() { - logger.log(log_event); + logger.log(&log_event); } } diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index 6409d44244d7d..555bc69a0271e 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -263,10 +263,10 @@ where .stream .unary(Pipeline, "ArrangementSize", |_cap, info| { let address = info.address; - logger.log(ComputeEvent::ArrangementHeapSizeOperator( + logger.log(&ComputeEvent::ArrangementHeapSizeOperator( ArrangementHeapSizeOperator { operator_id, - address, + address: address.to_vec(), }, )); move |input, output| { @@ -281,7 +281,7 @@ where let size = size.try_into().expect("must fit"); if size != old_size { - logger.log(ComputeEvent::ArrangementHeapSize(ArrangementHeapSize { + logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize { operator_id, delta_size: size - old_size, })); @@ -289,7 +289,7 @@ where let capacity = capacity.try_into().expect("must fit"); if capacity != old_capacity { - logger.log(ComputeEvent::ArrangementHeapCapacity( + logger.log(&ComputeEvent::ArrangementHeapCapacity( ArrangementHeapCapacity { operator_id, delta_capacity: capacity - old_capacity, @@ -299,7 +299,7 @@ where let allocations = allocations.try_into().expect("must fit"); if allocations != old_allocations { - logger.log(ComputeEvent::ArrangementHeapAllocations( + logger.log(&ComputeEvent::ArrangementHeapAllocations( ArrangementHeapAllocations { operator_id, delta_allocations: allocations - old_allocations, diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index 1aafb9847697e..b25f253937b0d 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -64,27 +64,27 @@ where _marker: PhantomData, } } - - /// Indicate progress up to a specific `time`. - fn report_progress(&mut self, time: &Duration) { - let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms; - let new_time_ms: Timestamp = time_ms.try_into().expect("must fit"); - if self.time_ms < new_time_ms { - self.event_pusher - .push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)])); - self.time_ms = new_time_ms; - } - } } impl BatchLogger where P: EventPusher, + C: Container, { /// Publishes a batch of logged events and advances the capability. - fn publish_batch(&mut self, time: &Duration, data: C) { - self.event_pusher.push(Event::Messages(self.time_ms, data)); - self.report_progress(time); + fn publish_batch(&mut self, time: &Duration, data: &mut Option) { + if let Some(data) = data.take() { + self.event_pusher.push(Event::Messages(self.time_ms, data)); + } else { + // Flush: Indicate progress up to `time`. + let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms; + let new_time_ms: Timestamp = time_ms.try_into().expect("must fit"); + if self.time_ms < new_time_ms { + self.event_pusher + .push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)])); + self.time_ms = new_time_ms; + } + } } } diff --git a/src/compute/src/logging/compute.rs b/src/compute/src/logging/compute.rs index cd3e3633d620e..8af72795723ff 100644 --- a/src/compute/src/logging/compute.rs +++ b/src/compute/src/logging/compute.rs @@ -15,20 +15,23 @@ use std::fmt::{Display, Write}; use std::rc::Rc; use std::time::{Duration, Instant}; +use columnar::{Columnar, Index}; use differential_dataflow::collection::AsCollection; use differential_dataflow::trace::{BatchReader, Cursor}; use differential_dataflow::Collection; use mz_compute_types::plan::LirId; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, GlobalId, Timestamp}; +use mz_timely_util::containers::{Column, ColumnBuilder}; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; +use timely::dataflow::operators::core::Map; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; -use timely::dataflow::operators::{Filter, Operator}; +use timely::dataflow::operators::Operator; use timely::dataflow::{Scope, Stream}; use timely::scheduling::Scheduler; use timely::worker::Worker; @@ -45,10 +48,10 @@ use crate::typedefs::RowRowSpine; /// Type alias for a logger of compute events. pub type Logger = timely::logging_core::Logger; -pub type ComputeEventBuilder = CapacityContainerBuilder>; +pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>; /// A dataflow exports a global ID. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct Export { /// Identifier of the export. pub export_id: GlobalId, @@ -57,14 +60,14 @@ pub struct Export { } /// The export for a global id was dropped. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct ExportDropped { /// Identifier of the export. pub export_id: GlobalId, } /// A peek event with a [`Peek`], a [`PeekType`], and an installation status. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct PeekEvent { /// The data for the peek itself. pub peek: Peek, @@ -76,7 +79,7 @@ pub struct PeekEvent { } /// Frontier change event. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct Frontier { pub export_id: GlobalId, pub time: Timestamp, @@ -84,7 +87,7 @@ pub struct Frontier { } /// An import frontier change. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct ImportFrontier { pub import_id: GlobalId, pub export_id: GlobalId, @@ -93,7 +96,7 @@ pub struct ImportFrontier { } /// A change in an arrangement's heap size. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct ArrangementHeapSize { /// Operator index pub operator_id: usize, @@ -102,7 +105,7 @@ pub struct ArrangementHeapSize { } /// A change in an arrangement's heap capacity. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct ArrangementHeapCapacity { /// Operator index pub operator_id: usize, @@ -111,7 +114,7 @@ pub struct ArrangementHeapCapacity { } /// A change in an arrangement's heap allocation count. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct ArrangementHeapAllocations { /// Operator index pub operator_id: usize, @@ -120,30 +123,30 @@ pub struct ArrangementHeapAllocations { } /// Announcing an operator that manages an arrangement. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct ArrangementHeapSizeOperator { /// Operator index pub operator_id: usize, /// The address of the operator. - pub address: Rc<[usize]>, + pub address: Vec, } /// Drop event for an operator managing an arrangement. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct ArrangementHeapSizeOperatorDrop { /// Operator index pub operator_id: usize, } /// Dataflow shutdown event. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct DataflowShutdown { /// Timely worker index of the dataflow. pub dataflow_index: usize, } /// Error count update event. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct ErrorCount { /// Identifier of the export. pub export_id: GlobalId, @@ -152,13 +155,13 @@ pub struct ErrorCount { } /// An export is hydrated. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct Hydration { pub export_id: GlobalId, } /// Announce a mapping of an LIR operator to a dataflow operator for a global ID. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct LirMapping { /// The `GlobalId` in which the LIR operator is rendered. /// @@ -169,11 +172,11 @@ pub struct LirMapping { pub global_id: GlobalId, /// The actual mapping. /// Represented this way to reduce the size of `ComputeEvent`. - pub mapping: Box<[(LirId, LirMetadata)]>, + pub mapping: Vec<(LirId, LirMetadata)>, } /// Announce that a dataflow supports a specific global ID. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct DataflowGlobal { /// The identifier of the dataflow. pub dataflow_index: usize, @@ -182,7 +185,7 @@ pub struct DataflowGlobal { } /// A logged compute event. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub enum ComputeEvent { /// A dataflow export was created. Export(Export), @@ -218,7 +221,7 @@ pub enum ComputeEvent { } /// A peek type distinguishing between index and persist peeks. -#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)] +#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)] pub enum PeekType { /// A peek against an index. Index, @@ -237,28 +240,29 @@ impl PeekType { } /// A logged peek event. -#[derive(Debug, Clone, PartialOrd, PartialEq)] +#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)] pub struct Peek { /// The identifier of the view the peek targets. id: GlobalId, /// The logical timestamp requested. time: Timestamp, /// The ID of the peek. - uuid: Uuid, + uuid: uuid::Bytes, } impl Peek { /// Create a new peek from its arguments. pub fn new(id: GlobalId, time: Timestamp, uuid: Uuid) -> Self { + let uuid = uuid.into_bytes(); Self { id, time, uuid } } } /// Metadata for LIR operators. -#[derive(Clone, Debug, PartialEq, PartialOrd)] +#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)] pub struct LirMetadata { /// The LIR operator, as a string (see `FlatPlanNode::humanize`). - operator: Box, + operator: String, /// The LIR identifier of the parent (if any). /// Since `LirId`s are strictly positive, Rust can steal the low bit. /// (See `test_option_lirid_fits_in_usize`.) @@ -273,7 +277,7 @@ pub struct LirMetadata { impl LirMetadata { /// Construct a new LIR metadata object. pub fn new( - operator: Box, + operator: String, parent_lir_id: Option, nesting: u8, operator_span: (usize, usize), @@ -296,7 +300,7 @@ impl LirMetadata { pub(super) fn construct( worker: &mut timely::worker::Worker, config: &mz_compute_client::logging::LoggingConfig, - event_queue: EventQueue>, + event_queue: EventQueue>, shared_state: Rc>, ) -> BTreeMap { let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); @@ -305,20 +309,21 @@ pub(super) fn construct( let dataflow_index = worker.next_dataflow_index(); worker.dataflow_named("Dataflow: compute logging", move |scope| { - let (mut logs, token) = Some(event_queue.link) - .mz_replay::<_, CapacityContainerBuilder<_>, _>( - scope, - "compute logs", - config.interval, - event_queue.activator, - |mut session, data| session.give_iterator(data.iter()), - ); - - // If logging is disabled, we still need to install the indexes, but we can leave them - // empty. We do so by immediately filtering all logs events. - if !config.enable_logging { - logs = logs.filter(|_| false); - } + let enable_logging = config.enable_logging; + let (logs, token) = Some(event_queue.link).mz_replay::<_, ColumnBuilder<_>, _>( + scope, + "compute logs", + config.interval, + event_queue.activator, + move |mut session, data| { + // If logging is disabled, we still need to install the indexes, but we can leave them + // empty. We do so by immediately filtering all logs events. + if enable_logging { + session.give_iterator(data.iter()) + } + }, + ); + let logs = logs.container::>(); // Build a demux operator that splits the replayed event stream up into the separate // logging streams. @@ -369,14 +374,15 @@ pub(super) fn construct( arrangement_heap_allocations: arrangement_heap_allocations.session(&cap), error_count: error_count.session(&cap), hydration_time: hydration_time.session(&cap), - lir_mapping: lir_mapping.session(&cap), + lir_mapping: lir_mapping.session_with_builder(&cap), dataflow_global_ids: dataflow_global_ids.session(&cap), }; - for (time, event) in data.drain(..) { + let shared_state = &mut shared_state.borrow_mut(); + for (time, event) in data.drain() { DemuxHandler { state: &mut demux_state, - shared_state: &mut shared_state.borrow_mut(), + shared_state, output: &mut output_sessions, logging_interval_ms, time, @@ -428,7 +434,7 @@ pub(super) fn construct( let mut scratch = String::new(); move |PeekDatum { peek, peek_type }| { packer.pack_slice(&[ - Datum::Uuid(peek.uuid), + Datum::Uuid(Uuid::from_bytes(peek.uuid)), Datum::UInt64(u64::cast_from(worker_id)), make_string_datum(peek.id, &mut scratch), Datum::String(peek_type.name()), @@ -503,25 +509,28 @@ pub(super) fn construct( }); let packer = PermutedRowPacker::new(ComputeLog::LirMapping); - let lir_mapping = lir_mapping.as_collection().map({ - let mut scratch1 = String::new(); - let mut scratch2 = String::new(); - move |datum| { - packer.pack_slice(&[ - make_string_datum(datum.global_id, &mut scratch1), - Datum::UInt64(datum.lir_id.into()), - Datum::UInt64(u64::cast_from(worker_id)), - make_string_datum(&datum.operator, &mut scratch2), - datum - .parent_lir_id - .map(|lir_id| Datum::UInt64(lir_id.into())) - .unwrap_or_else(|| Datum::Null), - Datum::UInt16(u16::cast_from(datum.nesting)), - Datum::UInt64(u64::cast_from(datum.operator_span.0)), - Datum::UInt64(u64::cast_from(datum.operator_span.1)), - ]) - } - }); + let lir_mapping = lir_mapping + .map({ + let mut scratch1 = String::new(); + let mut scratch2 = String::new(); + move |(datum, time, diff)| { + let row = packer.pack_slice(&[ + make_string_datum(GlobalId::into_owned(datum.global_id), &mut scratch1), + Datum::UInt64(::into_owned(datum.lir_id).into()), + Datum::UInt64(u64::cast_from(worker_id)), + make_string_datum(datum.operator, &mut scratch2), + datum + .parent_lir_id + .map(|lir_id| Datum::UInt64(LirId::into_owned(lir_id).into())) + .unwrap_or_else(|| Datum::Null), + Datum::UInt16(u16::cast_from(*datum.nesting)), + Datum::UInt64(u64::cast_from(datum.operator_span.0)), + Datum::UInt64(u64::cast_from(datum.operator_span.1)), + ]); + (row, Timestamp::into_owned(time), *diff) + } + }) + .as_collection(); let packer = PermutedRowPacker::new(ComputeLog::DataflowGlobal); let dataflow_global_ids = dataflow_global_ids.as_collection().map({ @@ -665,9 +674,14 @@ struct ArrangementSizeState { type Update = (D, Timestamp, Diff); /// A pusher for updates of value `D` for vector-based containers. type Pusher = Counter>, Tee>>>; +/// A pusher for updates of value `D` for columnar containers. +type PusherColumnar = Counter>, Tee>>>; /// An output session for vector-based containers of updates `D`, using a capacity container builder. type OutputSession<'a, D> = Session<'a, Timestamp, CapacityContainerBuilder>>, Pusher>; +/// An output session for columnar containers of updates `D`, using a column builder. +type OutputSessionColumnar<'a, D> = + Session<'a, Timestamp, ColumnBuilder>, PusherColumnar>; /// Bundled output sessions used by the demux operator. struct DemuxOutput<'a> { @@ -682,7 +696,7 @@ struct DemuxOutput<'a> { arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>, hydration_time: OutputSession<'a, HydrationTimeDatum>, error_count: OutputSession<'a, ErrorCountDatum>, - lir_mapping: OutputSession<'a, LirMappingDatum>, + lir_mapping: OutputSessionColumnar<'a, LirMappingDatum>, dataflow_global_ids: OutputSession<'a, DataflowGlobalDatum>, } @@ -722,7 +736,7 @@ struct ArrangementHeapDatum { operator_id: usize, } -#[derive(Clone)] +#[derive(Clone, Columnar)] struct HydrationTimeDatum { export_id: GlobalId, time_ns: Option, @@ -737,11 +751,11 @@ struct ErrorCountDatum { count: i64, } -#[derive(Clone)] +#[derive(Clone, Columnar)] struct LirMappingDatum { global_id: GlobalId, lir_id: LirId, - operator: Box, + operator: String, parent_lir_id: Option, nesting: u8, operator_span: (usize, usize), @@ -778,54 +792,59 @@ impl DemuxHandler<'_, '_, A> { } /// Handle the given compute event. - fn handle(&mut self, event: ComputeEvent) { + fn handle(&mut self, event: ::Ref<'_>) { match event { - ComputeEvent::Export(export) => self.handle_export(export), - ComputeEvent::ExportDropped(export_dropped) => { + ComputeEventReference::Export(export) => self.handle_export(export), + ComputeEventReference::ExportDropped(export_dropped) => { self.handle_export_dropped(export_dropped) } - ComputeEvent::Peek( - peek @ PeekEvent { + ComputeEventReference::Peek( + peek @ PeekEventReference { installed: true, .. }, ) => self.handle_peek_install(peek), - ComputeEvent::Peek( - peek @ PeekEvent { + ComputeEventReference::Peek( + peek @ PeekEventReference { installed: false, .. }, ) => self.handle_peek_retire(peek), - ComputeEvent::Frontier(frontier) => self.handle_frontier(frontier), - ComputeEvent::ImportFrontier(import_frontier) => { + ComputeEventReference::Frontier(frontier) => self.handle_frontier(frontier), + ComputeEventReference::ImportFrontier(import_frontier) => { self.handle_import_frontier(import_frontier) } - ComputeEvent::ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner), - ComputeEvent::ArrangementHeapCapacity(inner) => { + ComputeEventReference::ArrangementHeapSize(inner) => { + self.handle_arrangement_heap_size(inner) + } + ComputeEventReference::ArrangementHeapCapacity(inner) => { self.handle_arrangement_heap_capacity(inner) } - ComputeEvent::ArrangementHeapAllocations(inner) => { + ComputeEventReference::ArrangementHeapAllocations(inner) => { self.handle_arrangement_heap_allocations(inner) } - ComputeEvent::ArrangementHeapSizeOperator(inner) => { + ComputeEventReference::ArrangementHeapSizeOperator(inner) => { self.handle_arrangement_heap_size_operator(inner) } - ComputeEvent::ArrangementHeapSizeOperatorDrop(inner) => { + ComputeEventReference::ArrangementHeapSizeOperatorDrop(inner) => { self.handle_arrangement_heap_size_operator_dropped(inner) } - ComputeEvent::DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown), - ComputeEvent::ErrorCount(error_count) => self.handle_error_count(error_count), - ComputeEvent::Hydration(hydration) => self.handle_hydration(hydration), - ComputeEvent::LirMapping(mapping) => self.handle_lir_mapping(mapping), - ComputeEvent::DataflowGlobal(global) => self.handle_dataflow_global(global), + ComputeEventReference::DataflowShutdown(shutdown) => { + self.handle_dataflow_shutdown(shutdown) + } + ComputeEventReference::ErrorCount(error_count) => self.handle_error_count(error_count), + ComputeEventReference::Hydration(hydration) => self.handle_hydration(hydration), + ComputeEventReference::LirMapping(mapping) => self.handle_lir_mapping(mapping), + ComputeEventReference::DataflowGlobal(global) => self.handle_dataflow_global(global), } } fn handle_export( &mut self, - Export { + ExportReference { export_id, dataflow_index, - }: Export, + }: ::Ref<'_>, ) { + let export_id = Columnar::into_owned(export_id); let ts = self.ts(); let datum = ExportDatum { export_id, @@ -855,7 +874,11 @@ impl DemuxHandler<'_, '_, A> { self.output.hydration_time.give((datum, ts, 1)); } - fn handle_export_dropped(&mut self, ExportDropped { export_id }: ExportDropped) { + fn handle_export_dropped( + &mut self, + ExportDroppedReference { export_id }: ::Ref<'_>, + ) { + let export_id = Columnar::into_owned(export_id); let Some(export) = self.state.exports.remove(&export_id) else { error!(%export_id, "missing exports entry at time of export drop"); return; @@ -917,7 +940,10 @@ impl DemuxHandler<'_, '_, A> { } } - fn handle_dataflow_shutdown(&mut self, DataflowShutdown { dataflow_index }: DataflowShutdown) { + fn handle_dataflow_shutdown( + &mut self, + DataflowShutdownReference { dataflow_index }: ::Ref<'_>, + ) { let ts = self.ts(); if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) { @@ -963,15 +989,19 @@ impl DemuxHandler<'_, '_, A> { nesting, operator_span, }; - self.output.lir_mapping.give((datum, ts, -1)); + self.output.lir_mapping.give(&(datum, ts, -1)); } } } } } - fn handle_error_count(&mut self, ErrorCount { export_id, diff }: ErrorCount) { + fn handle_error_count( + &mut self, + ErrorCountReference { export_id, diff }: ::Ref<'_>, + ) { let ts = self.ts(); + let export_id = Columnar::into_owned(export_id); let Some(export) = self.state.exports.get_mut(&export_id) else { // The export might have already been dropped, in which case we are no longer @@ -1000,8 +1030,12 @@ impl DemuxHandler<'_, '_, A> { export.error_count = new_count; } - fn handle_hydration(&mut self, Hydration { export_id }: Hydration) { + fn handle_hydration( + &mut self, + HydrationReference { export_id }: ::Ref<'_>, + ) { let ts = self.ts(); + let export_id = Columnar::into_owned(export_id); let Some(export) = self.state.exports.get_mut(&export_id) else { error!(%export_id, "hydration event for unknown export"); @@ -1032,13 +1066,14 @@ impl DemuxHandler<'_, '_, A> { fn handle_peek_install( &mut self, - PeekEvent { + PeekEventReference { peek, peek_type, installed: _, - }: PeekEvent, + }: ::Ref<'_>, ) { - let uuid = peek.uuid; + let peek = Peek::into_owned(peek); + let uuid = Uuid::from_bytes(peek.uuid); let ts = self.ts(); self.output .peek @@ -1052,13 +1087,14 @@ impl DemuxHandler<'_, '_, A> { fn handle_peek_retire( &mut self, - PeekEvent { + PeekEventReference { peek, peek_type, installed: _, - }: PeekEvent, + }: ::Ref<'_>, ) { - let uuid = peek.uuid; + let peek = Peek::into_owned(peek); + let uuid = Uuid::from_bytes(peek.uuid); let ts = self.ts(); self.output .peek @@ -1077,43 +1113,50 @@ impl DemuxHandler<'_, '_, A> { fn handle_frontier( &mut self, - Frontier { + FrontierReference { export_id, time, diff, - }: Frontier, + }: ::Ref<'_>, ) { - let diff = i64::from(diff); + let export_id = Columnar::into_owned(export_id); + let diff = i64::from(*diff); let ts = self.ts(); + let time = Columnar::into_owned(time); let datum = FrontierDatum { export_id, time }; self.output.frontier.give((datum, ts, diff)); } fn handle_import_frontier( &mut self, - ImportFrontier { + ImportFrontierReference { import_id, export_id, time, diff, - }: ImportFrontier, + }: ::Ref<'_>, ) { + let import_id = Columnar::into_owned(import_id); + let export_id = Columnar::into_owned(export_id); let ts = self.ts(); + let time = Columnar::into_owned(time); let datum = ImportFrontierDatum { export_id, import_id, time, }; - self.output.import_frontier.give((datum, ts, diff.into())); + self.output + .import_frontier + .give((datum, ts, (*diff).into())); } /// Update the allocation size for an arrangement. fn handle_arrangement_heap_size( &mut self, - ArrangementHeapSize { + ArrangementHeapSizeReference { operator_id, delta_size, - }: ArrangementHeapSize, + }: ::Ref<'_>, ) { let ts = self.ts(); let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else { @@ -1131,10 +1174,10 @@ impl DemuxHandler<'_, '_, A> { /// Update the allocation capacity for an arrangement. fn handle_arrangement_heap_capacity( &mut self, - ArrangementHeapCapacity { + ArrangementHeapCapacityReference { operator_id, delta_capacity, - }: ArrangementHeapCapacity, + }: ::Ref<'_>, ) { let ts = self.ts(); let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else { @@ -1152,10 +1195,10 @@ impl DemuxHandler<'_, '_, A> { /// Update the allocation count for an arrangement. fn handle_arrangement_heap_allocations( &mut self, - ArrangementHeapAllocations { + ArrangementHeapAllocationsReference { operator_id, delta_allocations, - }: ArrangementHeapAllocations, + }: ::Ref<'_>, ) { let ts = self.ts(); let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else { @@ -1174,12 +1217,15 @@ impl DemuxHandler<'_, '_, A> { /// Indicate that a new arrangement exists, start maintaining the heap size state. fn handle_arrangement_heap_size_operator( &mut self, - ArrangementHeapSizeOperator { + ArrangementHeapSizeOperatorReference { operator_id, address, - }: ArrangementHeapSizeOperator, + }: ::Ref<'_>, ) { - let activator = self.state.worker.activator_for(address); + let activator = self + .state + .worker + .activator_for(address.into_iter().collect()); let existing = self .state .arrangement_size @@ -1199,7 +1245,7 @@ impl DemuxHandler<'_, '_, A> { /// Indicate that an arrangement has been dropped and we can cleanup the heap size state. fn handle_arrangement_heap_size_operator_dropped( &mut self, - ArrangementHeapSizeOperatorDrop { operator_id }: ArrangementHeapSizeOperatorDrop, + ArrangementHeapSizeOperatorDropReference { operator_id }: ::Ref<'_>, ) { if let Some(state) = self.state.arrangement_size.remove(&operator_id) { let ts = self.ts(); @@ -1226,33 +1272,29 @@ impl DemuxHandler<'_, '_, A> { } /// Indicate that a new LIR operator exists; record the dataflow address it maps to. - fn handle_lir_mapping(&mut self, LirMapping { global_id, mapping }: LirMapping) { + fn handle_lir_mapping( + &mut self, + LirMappingReference { global_id, mapping }: ::Ref<'_>, + ) { + let global_id = Columnar::into_owned(global_id); // record the state (for the later drop) + let mappings = || mapping.into_iter().map(Columnar::into_owned); self.state .lir_mapping .entry(global_id) - .and_modify(|existing_mapping| existing_mapping.extend(mapping.iter().cloned())) - .or_insert_with(|| mapping.iter().cloned().collect::>()); + .and_modify(|existing_mapping| existing_mapping.extend(mappings())) + .or_insert_with(|| mappings().collect()); // send the datum out let ts = self.ts(); - for ( - lir_id, - LirMetadata { - operator, - parent_lir_id, - nesting, - operator_span, - }, - ) in mapping - { - let datum = LirMappingDatum { + for (lir_id, meta) in mapping.into_iter() { + let datum = LirMappingDatumReference { global_id, lir_id, - operator, - parent_lir_id, - nesting, - operator_span, + operator: meta.operator, + parent_lir_id: meta.parent_lir_id, + nesting: meta.nesting, + operator_span: meta.operator_span, }; self.output.lir_mapping.give((datum, ts, 1)); } @@ -1260,21 +1302,22 @@ impl DemuxHandler<'_, '_, A> { fn handle_dataflow_global( &mut self, - DataflowGlobal { + DataflowGlobalReference { dataflow_index, global_id, - }: DataflowGlobal, + }: ::Ref<'_>, ) { + let global_id = Columnar::into_owned(global_id); self.state .dataflow_global_ids .entry(dataflow_index) .and_modify(|globals| { // NB BTreeSet::insert() returns `false` when the element was already in the set - if !globals.insert(global_id.clone()) { + if !globals.insert(global_id) { error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId"); } }) - .or_insert_with(|| BTreeSet::from([global_id.clone()])); + .or_insert_with(|| BTreeSet::from([global_id])); let ts = self.ts(); let datum = DataflowGlobalDatum { @@ -1305,7 +1348,7 @@ impl CollectionLogging { dataflow_index: usize, import_ids: impl Iterator, ) -> Self { - logger.log(ComputeEvent::Export(Export { + logger.log(&ComputeEvent::Export(Export { export_id, dataflow_index, })); @@ -1346,7 +1389,7 @@ impl CollectionLogging { diff: 1, }) }); - let events = retraction.into_iter().chain(insertion); + let events = retraction.as_ref().into_iter().chain(insertion.as_ref()); self.logger.log_many(events); } } @@ -1377,14 +1420,14 @@ impl CollectionLogging { diff: 1, }) }); - let events = retraction.into_iter().chain(insertion); + let events = retraction.as_ref().into_iter().chain(insertion.as_ref()); self.logger.log_many(events); } } /// Set the collection as hydrated. pub fn set_hydrated(&self) { - self.logger.log(ComputeEvent::Hydration(Hydration { + self.logger.log(&ComputeEvent::Hydration(Hydration { export_id: self.export_id, })); } @@ -1400,7 +1443,7 @@ impl Drop for CollectionLogging { self.set_import_frontier(import_id, None); } - self.logger.log(ComputeEvent::ExportDropped(ExportDropped { + self.logger.log(&ComputeEvent::ExportDropped(ExportDropped { export_id: self.export_id, })); } @@ -1423,7 +1466,7 @@ where move |input, output| { input.for_each(|cap, data| { let diff = data.iter().map(|(_d, _t, r)| r).sum(); - logger.log(ComputeEvent::ErrorCount(ErrorCount { export_id, diff })); + logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff })); output.session(&cap).give_container(data); }); @@ -1443,7 +1486,7 @@ where move |input, output| { input.for_each(|cap, data| { let diff = data.iter().map(sum_batch_diffs).sum(); - logger.log(ComputeEvent::ErrorCount(ErrorCount { export_id, diff })); + logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff })); output.session(&cap).give_container(data); }); diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index 27149e7e7a362..f5162b2b3f939 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -298,7 +298,7 @@ impl DemuxHandler<'_, '_> { .expect("under/overflow"); if *sharing == 0 { self.state.sharing.remove(&operator_id); - logger.log(ComputeEvent::ArrangementHeapSizeOperatorDrop( + logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop( ArrangementHeapSizeOperatorDrop { operator_id }, )); } diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index 5c6d2a37e9bba..96e0536fe9747 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -18,6 +18,7 @@ use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; use mz_repr::{Diff, Timestamp}; use mz_storage_operators::persist_source::Subtime; use mz_storage_types::errors::DataflowError; +use mz_timely_util::containers::Column; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; use timely::container::flatcontainer::FlatStack; @@ -100,7 +101,7 @@ struct LoggingContext<'a, A: Allocate> { t_event_queue: EventQueue>, r_event_queue: EventQueue>, d_event_queue: EventQueue>, - c_event_queue: EventQueue>, + c_event_queue: EventQueue>, shared_state: Rc>, } @@ -173,11 +174,17 @@ impl LoggingContext<'_, A> { &self, event_queue: EventQueue, ) -> Logger { - let mut logger = BatchLogger::new(event_queue.link, self.interval_ms); - Logger::new(self.now, self.start_offset, move |time, data| { - logger.publish_batch(time, std::mem::take(data)); - event_queue.activator.activate(); - }) + let mut logger = BatchLogger::<_, _>::new(event_queue.link, self.interval_ms); + Logger::new( + self.now, + self.start_offset, + move |time, data: &mut Option| { + if data.is_none() { + event_queue.activator.activate(); + } + logger.publish_batch(time, data); + }, + ) } fn reachability_logger(&self) -> Logger { @@ -189,7 +196,13 @@ impl LoggingContext<'_, A> { Logger::new( self.now, self.start_offset, - move |time, data: &mut Vec<_>| { + move |time, data: &mut Option>| { + let Some(data) = data.as_mut() else { + logger.publish_batch(time, &mut None); + event_queue.activator.activate(); + return; + }; + let mut massaged = Vec::new(); let mut container = FlatStack::default(); for (time, event) in data.drain(..) { @@ -220,8 +233,7 @@ impl LoggingContext<'_, A> { } } } - logger.publish_batch(time, container); - logger.report_progress(time); + logger.publish_batch(time, &mut Some(container)); event_queue.activator.activate(); }, ) diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index 61fb4df11f758..61f43bbe36280 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -566,7 +566,7 @@ impl DemuxHandler<'_, '_> { fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) { // Notify compute logging about the shutdown. if let Some(logger) = &self.shared_state.compute_logger { - logger.log(ComputeEvent::DataflowShutdown(DataflowShutdown { + logger.log(&ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index, })); } diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index c5dd45ed23e08..83d2cd12d1f9f 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -956,7 +956,7 @@ where // ActiveComputeState can't have a catalog reference, so we'll need to capture the names // in some other structure and have that structure impl ExprHumanizer let metadata = if should_compute_lir_metadata { - let operator: Box = node.expr.humanize(&DummyHumanizer).into(); + let operator = node.expr.humanize(&DummyHumanizer); let operator_id_start = self.scope.peek_identifier(); Some((operator, operator_id_start)) } else { @@ -983,8 +983,7 @@ where } if let Some(lir_mapping_metadata) = lir_mapping_metadata { - let mapping: Box<[(LirId, LirMetadata)]> = lir_mapping_metadata.into(); - self.log_lir_mapping(object_id, mapping); + self.log_lir_mapping(object_id, lir_mapping_metadata); } collections @@ -1186,16 +1185,16 @@ where fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) { if let Some(logger) = &self.compute_logger { - logger.log(ComputeEvent::DataflowGlobal(DataflowGlobal { + logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal { dataflow_index, global_id, })); } } - fn log_lir_mapping(&self, global_id: GlobalId, mapping: Box<[(LirId, LirMetadata)]>) { + fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) { if let Some(logger) = &self.compute_logger { - logger.log(ComputeEvent::LirMapping(LirMapping { global_id, mapping })); + logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping })); } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index a4440192356ca..ffb763b764be3 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -744,7 +744,7 @@ impl<'w, A: Allocate + 'static> Worker<'w, A> { for (_, peek) in std::mem::take(&mut compute_state.pending_peeks) { // Log dropping the peek request. if let Some(logger) = compute_state.compute_logger.as_mut() { - logger.log(peek.as_log_event(false)); + logger.log(&peek.as_log_event(false)); } } diff --git a/src/repr/src/global_id.rs b/src/repr/src/global_id.rs index b570a245b549f..a09def03252ba 100644 --- a/src/repr/src/global_id.rs +++ b/src/repr/src/global_id.rs @@ -11,6 +11,7 @@ use std::fmt; use std::str::FromStr; use anyhow::{anyhow, Error}; +use columnar::Columnar; use columnation::{Columnation, CopyRegion}; use mz_lowertest::MzReflect; use mz_ore::id_gen::AtomicIdGen; @@ -50,6 +51,7 @@ include!(concat!(env!("OUT_DIR"), "/mz_repr.global_id.rs")); Serialize, Deserialize, MzReflect, + Columnar, )] pub enum GlobalId { /// System namespace. diff --git a/src/repr/src/timestamp.rs b/src/repr/src/timestamp.rs index c5e689e2b11f5..c159916a8596a 100644 --- a/src/repr/src/timestamp.rs +++ b/src/repr/src/timestamp.rs @@ -11,6 +11,7 @@ use std::convert::TryFrom; use std::num::TryFromIntError; use std::time::Duration; +use columnar::Columnar; use dec::TryFromDecimalError; use mz_proto::{RustType, TryFromProtoError}; use proptest_derive::Arbitrary; @@ -34,6 +35,7 @@ include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs")); Hash, Default, Arbitrary, + Columnar, )] pub struct Timestamp { /// note no `pub`. diff --git a/src/timely-util/Cargo.toml b/src/timely-util/Cargo.toml index 071b181bb7b9f..4c37d17485e92 100644 --- a/src/timely-util/Cargo.toml +++ b/src/timely-util/Cargo.toml @@ -12,6 +12,8 @@ workspace = true [dependencies] ahash = { version = "0.8.11", default-features = false } bincode = "1.3.3" +bytemuck = "1.21.0" +columnar = "0.2.0" columnation = "0.1.0" differential-dataflow = "0.13.3" either = "1" diff --git a/src/timely-util/src/containers.rs b/src/timely-util/src/containers.rs index 9431900d8edfd..6ae6db0fd9eaf 100644 --- a/src/timely-util/src/containers.rs +++ b/src/timely-util/src/containers.rs @@ -9,5 +9,376 @@ //! Reusable containers. +use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher}; +use timely::container::columnation::TimelyStack; + pub mod array; pub mod stack; + +pub use container::Column; + +mod container { + + use columnar::bytes::serialization::decode; + use columnar::common::IterOwn; + use columnar::Columnar; + use columnar::Container as _; + use columnar::{AsBytes, Clear, FromBytes, Index, Len}; + use timely::bytes::arc::Bytes; + use timely::container::PushInto; + use timely::container::SizableContainer; + use timely::dataflow::channels::ContainerBytes; + use timely::Container; + + /// A container based on a columnar store, encoded in aligned bytes. + pub enum Column { + /// The typed variant of the container. + Typed(C::Container), + /// The binary variant of the container. + Bytes(Bytes), + /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason. + /// + /// Reasons could include misalignment, cloning of data, or wanting + /// to release the `Bytes` as a scarce resource. + Align(Box<[u64]>), + } + + impl Column { + /// Borrows the container as a reference. + fn borrow(&self) -> >::Borrowed<'_> { + match self { + Column::Typed(t) => t.borrow(), + Column::Bytes(b) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))), + Column::Align(a) => <>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(a)), + } + } + } + + impl Default for Column { + fn default() -> Self { + Self::Typed(Default::default()) + } + } + + impl Clone for Column + where + C::Container: Clone, + { + fn clone(&self) -> Self { + match self { + // TODO: We could go from `Typed` to `Align` if we wanted to. + Column::Typed(t) => Column::Typed(t.clone()), + Column::Bytes(b) => { + assert_eq!(b.len() % 8, 0); + let mut alloc: Vec = vec![0; b.len() / 8]; + bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]); + Self::Align(alloc.into()) + } + Column::Align(a) => Column::Align(a.clone()), + } + } + } + + impl Container for Column { + type ItemRef<'a> = C::Ref<'a>; + type Item<'a> = C::Ref<'a>; + + fn len(&self) -> usize { + self.borrow().len() + } + + // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. + fn clear(&mut self) { + match self { + Column::Typed(t) => t.clear(), + Column::Bytes(_) => *self = Column::Typed(Default::default()), + Column::Align(_) => *self = Column::Typed(Default::default()), + } + } + + type Iter<'a> = IterOwn<>::Borrowed<'a>>; + + fn iter(&self) -> Self::Iter<'_> { + self.borrow().into_iter() + } + + type DrainIter<'a> = IterOwn<>::Borrowed<'a>>; + + fn drain(&mut self) -> Self::DrainIter<'_> { + self.borrow().into_iter() + } + } + + impl SizableContainer for Column { + fn at_capacity(&self) -> bool { + match self { + Self::Typed(t) => { + let length_in_bytes = t.borrow().length_in_words() * 8; + length_in_bytes >= (1 << 20) + } + Self::Bytes(_) => true, + Self::Align(_) => true, + } + } + fn ensure_capacity(&mut self, _stash: &mut Option) {} + } + + impl PushInto for Column + where + C::Container: columnar::Push, + { + #[inline] + fn push_into(&mut self, item: T) { + use columnar::Push; + match self { + Column::Typed(t) => t.push(item), + Column::Align(_) | Column::Bytes(_) => { + // We really oughtn't be calling this in this case. + // We could convert to owned, but need more constraints on `C`. + unimplemented!("Pushing into Column::Bytes without first clearing"); + } + } + } + } + + impl ContainerBytes for Column { + fn from_bytes(bytes: Bytes) -> Self { + // Our expectation / hope is that `bytes` is `u64` aligned and sized. + // If the alignment is borked, we can relocate. IF the size is borked, + // not sure what we do in that case. + assert_eq!(bytes.len() % 8, 0); + if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) { + Self::Bytes(bytes) + } else { + println!("Re-locating bytes for alignment reasons"); + let mut alloc: Vec = vec![0; bytes.len() / 8]; + bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]); + Self::Align(alloc.into()) + } + } + + fn length_in_bytes(&self) -> usize { + match self { + // We'll need one u64 for the length, then the length rounded up to a multiple of 8. + Column::Typed(t) => 8 * t.borrow().length_in_words(), + Column::Bytes(b) => b.len(), + Column::Align(a) => 8 * a.len(), + } + } + + fn into_bytes(&self, writer: &mut W) { + match self { + Column::Typed(t) => { + use columnar::Container; + // Columnar data is serialized as a sequence of `u64` values, with each `[u8]` slice + // serialize as first its length in bytes, and then as many `u64` values as needed. + // Padding should be added, but only for alignment; no specific values are required. + for (align, bytes) in t.borrow().as_bytes() { + assert!(align <= 8); + let length: u64 = bytes.len().try_into().unwrap(); + writer + .write_all(bytemuck::cast_slice(std::slice::from_ref(&length))) + .unwrap(); + writer.write_all(bytes).unwrap(); + let padding: usize = ((8 - (length % 8)) % 8).try_into().unwrap(); + writer.write_all(&[0; 8][..padding]).unwrap(); + } + } + Column::Bytes(b) => writer.write_all(b).unwrap(), + Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), + } + } + } +} + +pub use builder::ColumnBuilder; +mod builder { + use std::collections::VecDeque; + + use columnar::{AsBytes, Clear, Columnar, Len, Push}; + use timely::container::PushInto; + use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; + + use super::Column; + + /// A container builder for `Column`. + pub struct ColumnBuilder { + /// Container that we're writing to. + current: C::Container, + /// Empty allocation. + empty: Option>, + /// Completed containers pending to be sent. + pending: VecDeque>, + } + + impl PushInto for ColumnBuilder + where + C::Container: Push, + { + #[inline] + fn push_into(&mut self, item: T) { + self.current.push(item); + // If there is less than 10% slop with 2MB backing allocations, mint a container. + use columnar::Container; + let words = self.current.borrow().length_in_words(); + let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); + if round - words < round / 10 { + let mut alloc = Vec::with_capacity(round); + columnar::bytes::serialization::encode( + &mut alloc, + self.current.borrow().as_bytes(), + ); + self.pending + .push_back(Column::Align(alloc.into_boxed_slice())); + self.current.clear(); + } + } + } + + impl Default for ColumnBuilder { + fn default() -> Self { + ColumnBuilder { + current: Default::default(), + empty: None, + pending: Default::default(), + } + } + } + + impl ContainerBuilder for ColumnBuilder + where + C::Container: Clone, + { + type Container = Column; + + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(container) = self.pending.pop_front() { + self.empty = Some(container); + self.empty.as_mut() + } else { + None + } + } + + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.current.is_empty() { + self.pending + .push_back(Column::Typed(std::mem::take(&mut self.current))); + } + self.empty = self.pending.pop_front(); + self.empty.as_mut() + } + } + + impl LengthPreservingContainerBuilder for ColumnBuilder where C::Container: Clone {} +} + +/// A batcher for columnar storage. +pub type Col2ValBatcher = MergeBatcher< + Column<((K, V), T, R)>, + batcher::Chunker>, + ColMerger<(K, V), T, R>, +>; +pub type Col2KeyBatcher = Col2ValBatcher; + +/// Types for consolidating, merging, and extracting columnar update collections. +pub mod batcher { + use std::collections::VecDeque; + + use columnar::Columnar; + use differential_dataflow::difference::Semigroup; + use timely::container::{ContainerBuilder, PushInto}; + use timely::Container; + + use crate::containers::Column; + + #[derive(Default)] + pub struct Chunker { + /// Buffer into which we'll consolidate. + /// + /// Also the buffer where we'll stage responses to `extract` and `finish`. + /// When these calls return, the buffer is available for reuse. + empty: C, + /// Consolidated buffers ready to go. + ready: VecDeque, + } + + impl ContainerBuilder for Chunker { + type Container = C; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = ready; + Some(&mut self.empty) + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + self.extract() + } + } + + impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker + where + D: Columnar, + for<'b> D::Ref<'b>: Ord + Copy, + T: Columnar, + for<'b> T::Ref<'b>: Ord + Copy, + R: Columnar + Semigroup + for<'b> Semigroup>, + for<'b> R::Ref<'b>: Ord, + C2: Container + for<'b> PushInto<&'b (D, T, R)>, + { + fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) { + // Sort input data + // TODO: consider `Vec` that we retain, containing indexes. + let mut permutation = Vec::with_capacity(container.len()); + permutation.extend(container.drain()); + permutation.sort(); + + self.empty.clear(); + // Iterate over the data, accumulating diffs for like keys. + let mut iter = permutation.drain(..); + if let Some((data, time, diff)) = iter.next() { + let mut owned_data = D::into_owned(data); + let mut owned_time = T::into_owned(time); + + let mut prev_data = data; + let mut prev_time = time; + let mut prev_diff = ::into_owned(diff); + + for (data, time, diff) in iter { + if (&prev_data, &prev_time) == (&data, &time) { + prev_diff.plus_equals(&diff); + } else { + if !prev_diff.is_zero() { + D::copy_from(&mut owned_data, prev_data); + T::copy_from(&mut owned_time, prev_time); + let tuple = (owned_data, owned_time, prev_diff); + self.empty.push_into(&tuple); + owned_data = tuple.0; + owned_time = tuple.1; + } + prev_data = data; + prev_time = time; + prev_diff = ::into_owned(diff); + } + } + + if !prev_diff.is_zero() { + D::copy_from(&mut owned_data, prev_data); + T::copy_from(&mut owned_time, prev_time); + let tuple = (owned_data, owned_time, prev_diff); + self.empty.push_into(&tuple); + } + } + + if !self.empty.is_empty() { + self.ready.push_back(std::mem::take(&mut self.empty)); + } + } + } +} From a5f162daf0b6c1ef282b3ce22569f7ecbf4bddad Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 16 Jan 2025 11:58:55 +0100 Subject: [PATCH 2/9] Convert more logging dataflows to columnar Signed-off-by: Moritz Hoffmann --- Cargo.lock | 20 +- Cargo.toml | 8 +- src/compute-types/Cargo.toml | 2 +- src/compute/Cargo.toml | 2 +- src/compute/src/logging.rs | 126 ++++++- src/compute/src/logging/compute.rs | 6 +- src/compute/src/logging/initialize.rs | 63 ++-- src/compute/src/logging/reachability.rs | 116 +++---- src/compute/src/logging/timely.rs | 430 +++++++++++------------- src/compute/src/typedefs.rs | 12 +- src/repr/Cargo.toml | 2 +- src/repr/src/timestamp.rs | 1 + src/timely-util/Cargo.toml | 2 +- src/timely-util/src/containers.rs | 33 ++ 14 files changed, 463 insertions(+), 360 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 388d9926de059..deb7826f9af53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1754,8 +1754,9 @@ checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" [[package]] name = "columnar" -version = "0.2.0" -source = "git+https://github.com/frankmcsherry/columnar#ea1be6ddb64fe984dfe847f82c5f5c60f5815835" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d58a4c12223e2d2140bbf4be9fb38b3a612804230c91388dfa4e56a8a6464bf3" dependencies = [ "bincode", "bytemuck", @@ -1767,8 +1768,9 @@ dependencies = [ [[package]] name = "columnar_derive" -version = "0.2.0" -source = "git+https://github.com/frankmcsherry/columnar#ea1be6ddb64fe984dfe847f82c5f5c60f5815835" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "087b1ac5c4ecad28348b6a9957e3dbd44ac7d041d267370acfdbbfa66b514c7d" dependencies = [ "proc-macro2", "quote", @@ -10372,7 +10374,7 @@ dependencies = [ [[package]] name = "timely" version = "0.16.0" -source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#291de98e10f778799557b187888df0e78fb58f39" dependencies = [ "bincode", "byteorder", @@ -10390,12 +10392,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.12.2" -source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#291de98e10f778799557b187888df0e78fb58f39" [[package]] name = "timely_communication" version = "0.16.1" -source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#291de98e10f778799557b187888df0e78fb58f39" dependencies = [ "byteorder", "columnar", @@ -10410,7 +10412,7 @@ dependencies = [ [[package]] name = "timely_container" version = "0.13.1" -source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#291de98e10f778799557b187888df0e78fb58f39" dependencies = [ "columnation", "flatcontainer", @@ -10420,7 +10422,7 @@ dependencies = [ [[package]] name = "timely_logging" version = "0.13.1" -source = "git+https://github.com/antiguru/timely-dataflow?branch=logger_flush#56ec0d7473e7fee0a79f40525843fa6089873347" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#291de98e10f778799557b187888df0e78fb58f39" dependencies = [ "timely_container", ] diff --git a/Cargo.toml b/Cargo.toml index 0a14fe6c1ede8..8d8cd3b79f0e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -285,14 +285,14 @@ incremental = true # merged), after which point it becomes impossible to build that historical # version of Materialize. [patch.crates-io] -columnar = { git = "https://github.com/frankmcsherry/columnar" } -#columnar = { git = "https://github.com/antiguru/columnar", branch = "attrs" } +#columnar = { git = "https://github.com/frankmcsherry/columnar" } +#columnar = { git = "https://github.com/antiguru/columnar", branch = "inline_as_bytes" } #columnar = { path = "../columnar" } #differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } #differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow", branch = "logging_fallout", version = "0.13.2" } #differential-dataflow = { path = "../differential-dataflow" } -#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } -timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "logger_flush" } +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +#timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "logger_flush" } #timely = { path = "../timely-dataflow/timely" } # Waiting on https://github.com/sfackler/rust-postgres/pull/752. diff --git a/src/compute-types/Cargo.toml b/src/compute-types/Cargo.toml index 2188835bc6dd1..a82db2a67b378 100644 --- a/src/compute-types/Cargo.toml +++ b/src/compute-types/Cargo.toml @@ -10,7 +10,7 @@ publish = false workspace = true [dependencies] -columnar = "0.2.0" +columnar = "0.2.2" columnation = "0.1.0" differential-dataflow = "0.13.3" itertools = "0.12.1" diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index a86a71668411f..3f9ba4d140a3d 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -13,7 +13,7 @@ workspace = true anyhow = "1.0.66" async-stream = "0.3.3" bytesize = "1.1.0" -columnar = "0.2.0" +columnar = "0.2.2" crossbeam-channel = "0.5.8" dec = { version = "0.4.8", features = ["serde"] } differential-dataflow = "0.13.3" diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index b25f253937b0d..a5dcff76760d9 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -21,14 +21,23 @@ use std::marker::PhantomData; use std::rc::Rc; use std::time::Duration; +use ::timely::container::columnation::Columnation; +use ::timely::dataflow::channels::pact::Pipeline; +use ::timely::dataflow::channels::pushers::buffer::Session; +use ::timely::dataflow::channels::pushers::{Counter, Tee}; use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher}; +use ::timely::dataflow::operators::Operator; +use ::timely::dataflow::StreamCore; use ::timely::progress::Timestamp as TimelyTimestamp; use ::timely::scheduling::Activator; use ::timely::Container; +use columnar::Columnar; use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog}; use mz_expr::{permutation_for_arrangement, MirScalarExpr}; -use mz_repr::{Datum, Diff, Row, RowPacker, SharedRow, Timestamp}; +use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, SharedRow, Timestamp}; use mz_timely_util::activator::RcActivator; +use mz_timely_util::containers::{Col2ValBatcher, Column, ColumnBuilder}; +use mz_timely_util::operator::consolidate_pact; use crate::logging::compute::Logger as ComputeLogger; use crate::typedefs::RowRowAgent; @@ -72,8 +81,8 @@ where C: Container, { /// Publishes a batch of logged events and advances the capability. - fn publish_batch(&mut self, time: &Duration, data: &mut Option) { - if let Some(data) = data.take() { + fn publish_batch(&mut self, time: &Duration, data: Option) { + if let Some(data) = data { self.event_pusher.push(Event::Messages(self.time_ms, data)); } else { // Flush: Indicate progress up to `time`. @@ -175,6 +184,70 @@ impl PermutedRowPacker { } } +/// Helper to pack collections of [`Datum`]s into key and value row. +pub(crate) struct PermutedRowRefPacker { + arity: usize, + key: Vec, + value: Vec, + key_row: Row, + value_row: Row, +} + +impl PermutedRowRefPacker { + /// Construct based on the information within the log variant. + pub(crate) fn new>(variant: V) -> Self { + let variant = variant.into(); + let key = variant.index_by(); + let arity = variant.desc().arity(); + let (_, value) = permutation_for_arrangement( + &key.iter() + .cloned() + .map(MirScalarExpr::Column) + .collect::>(), + arity, + ); + Self { + arity, + key, + value, + key_row: Row::default(), + value_row: Row::default(), + } + } + + /// Pack a slice of datums suitable for the key columns in the log variant. + pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) { + assert_eq!(datums.len(), self.arity); + self.pack_by_index(|packer, index| packer.push(datums[index])) + } + + /// Pack using a callback suitable for the key columns in the log variant. + pub(crate) fn pack_by_index( + &mut self, + logic: F, + ) -> (&RowRef, &RowRef) { + let mut packer = self.key_row.packer(); + for index in &self.key { + logic(&mut packer, *index); + } + + let mut packer = self.value_row.packer(); + for index in &self.value { + logic(&mut packer, *index); + } + + (&self.key_row, &self.value_row) + } + + pub(crate) fn key(&self) -> &RowRef { + &self.key_row + } + + pub(crate) fn value(&self) -> &RowRef { + &self.value_row + } +} + /// Information about a collection exported from a logging dataflow. struct LogCollection { /// Trace handle providing access to the logged records. @@ -184,3 +257,50 @@ struct LogCollection { /// Index of the dataflow exporting this collection. dataflow_index: usize, } + +pub(super) type Update = (D, Timestamp, Diff); +pub(super) type Pusher = + Counter>, Tee>>>; +pub(super) type OutputSession<'a, D> = Session<'a, Timestamp, ColumnBuilder>, Pusher>; + +/// A single-purpose function to consolidate and pack updates for log collection. +/// +/// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts +/// the updates into `(Row, Row)` pairs using the provided logic function. It is crucial that the +/// data is not exchanged between workers, as the consolidation would not function as desired +/// otherwise. +pub(super) fn prepare_log_collection( + input: &StreamCore>>, + log: L, + mut logic: F, +) -> StreamCore>> +where + G: ::timely::dataflow::Scope, + L: Into, + K: Columnar + Columnation + Ord, + for<'a> K::Ref<'a>: Ord + Copy, + K::Container: Clone, + V: Columnar + Columnation + Ord, + for<'a> V::Ref<'a>: Ord + Copy, + V::Container: Clone, + F: FnMut(K::Ref<'_>, V::Ref<'_>, &mut PermutedRowRefPacker) + 'static, +{ + let log = log.into(); + // TODO: Use something other than the debug representation of the log variant as a name. + let name = format!("{log:?}"); + let consolidate_name = &format!("Consolidate {name}"); + let mut packer = PermutedRowRefPacker::new(log); + consolidate_pact::, _, _, _, _>(input, Pipeline, consolidate_name) + .container::>>() + .unary::, _, _, _>(Pipeline, &format!("ToRow {name}"), |_, _| { + move |input, output| { + while let Some((time, data)) = input.next() { + let mut session = output.session_with_builder(&time); + for ((key, val), time, diff) in data.iter() { + logic(key, val, &mut packer); + session.give(((packer.key(), packer.value()), time, diff)); + } + } + } + }) +} diff --git a/src/compute/src/logging/compute.rs b/src/compute/src/logging/compute.rs index 8af72795723ff..8232c56f5ebf5 100644 --- a/src/compute/src/logging/compute.rs +++ b/src/compute/src/logging/compute.rs @@ -22,7 +22,7 @@ use differential_dataflow::Collection; use mz_compute_types::plan::LirId; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, GlobalId, Timestamp}; -use mz_timely_util::containers::{Column, ColumnBuilder}; +use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder}; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; use timely::container::CapacityContainerBuilder; @@ -310,7 +310,7 @@ pub(super) fn construct( worker.dataflow_named("Dataflow: compute logging", move |scope| { let enable_logging = config.enable_logging; - let (logs, token) = Some(event_queue.link).mz_replay::<_, ColumnBuilder<_>, _>( + let (logs, token) = Some(event_queue.link).mz_replay::<_, ProvidedBuilder<_>, _>( scope, "compute logs", config.interval, @@ -319,7 +319,7 @@ pub(super) fn construct( // If logging is disabled, we still need to install the indexes, but we can leave them // empty. We do so by immediately filtering all logs events. if enable_logging { - session.give_iterator(data.iter()) + session.give_container(&mut data.clone()) } }, ); diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index 96e0536fe9747..ed3ed7384cffe 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -14,15 +14,13 @@ use differential_dataflow::dynamic::pointstamp::PointStamp; use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder}; use differential_dataflow::Collection; use mz_compute_client::logging::{LogVariant, LoggingConfig}; -use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; use mz_repr::{Diff, Timestamp}; use mz_storage_operators::persist_source::Subtime; use mz_storage_types::errors::DataflowError; -use mz_timely_util::containers::Column; +use mz_timely_util::containers::{Column, ColumnBuilder}; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; -use timely::container::flatcontainer::FlatStack; -use timely::container::ContainerBuilder; +use timely::container::{ContainerBuilder, PushInto}; use timely::logging::{ProgressEventTimestamp, TimelyEvent, TimelyEventBuilder}; use timely::logging_core::Logger; use timely::order::Product; @@ -83,14 +81,10 @@ pub fn initialize( (logger, traces) } -/// Type to specify the region for holding reachability events. Only intended to be interpreted -/// as [`MzRegionPreference`]. -type ReachabilityEventRegionPreference = ( - OwnedRegionOpinion>, - OwnedRegionOpinion, Diff)>>, +pub(super) type ReachabilityEvent = ( + Vec, + Vec<(usize, usize, bool, Option, Diff)>, ); -pub(super) type ReachabilityEventRegion = - <(Duration, ReachabilityEventRegionPreference) as MzRegionPreference>::Region; struct LoggingContext<'a, A: Allocate> { worker: &'a mut timely::worker::Worker, @@ -99,7 +93,7 @@ struct LoggingContext<'a, A: Allocate> { now: Instant, start_offset: Duration, t_event_queue: EventQueue>, - r_event_queue: EventQueue>, + r_event_queue: EventQueue>, d_event_queue: EventQueue>, c_event_queue: EventQueue>, shared_state: Rc>, @@ -182,29 +176,21 @@ impl LoggingContext<'_, A> { if data.is_none() { event_queue.activator.activate(); } - logger.publish_batch(time, data); + logger.publish_batch(time, data.take()); }, ) } fn reachability_logger(&self) -> Logger { let event_queue = self.r_event_queue.clone(); - let mut logger = BatchLogger::, _>::new( - event_queue.link, - self.interval_ms, - ); - Logger::new( - self.now, - self.start_offset, - move |time, data: &mut Option>| { - let Some(data) = data.as_mut() else { - logger.publish_batch(time, &mut None); - event_queue.activator.activate(); - return; - }; - let mut massaged = Vec::new(); - let mut container = FlatStack::default(); + let mut logger = BatchLogger::<_, _>::new(event_queue.link, self.interval_ms); + let mut massaged = Vec::new(); + let mut builder = ColumnBuilder::default(); + + let action = move |batch_time: &Duration, data: &mut Option>| { + if let Some(data) = data { + // Handle data for (time, event) in data.drain(..) { match event { TrackerEvent::SourceUpdate(update) => { @@ -216,7 +202,7 @@ impl LoggingContext<'_, A> { }, )); - container.copy((time, (update.tracker_id, &massaged))); + builder.push_into((time, (&update.tracker_id, &massaged))); massaged.clear(); } TrackerEvent::TargetUpdate(update) => { @@ -228,15 +214,26 @@ impl LoggingContext<'_, A> { }, )); - container.copy((time, (update.tracker_id, &massaged))); + builder.push_into((time, (&update.tracker_id, &massaged))); massaged.clear(); } } + while let Some(container) = builder.extract() { + logger.publish_batch(&time, Some(std::mem::take(container))); + } + } + } else { + // Handle a flush + while let Some(container) = builder.finish() { + logger.publish_batch(batch_time, Some(std::mem::take(container))); } - logger.publish_batch(time, &mut Some(container)); + + logger.publish_batch(batch_time, None); event_queue.activator.activate(); - }, - ) + } + }; + + Logger::new(self.now, self.start_offset, action) } } diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 6fd790c559a48..51f622f93ecd7 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -12,27 +12,24 @@ use std::collections::BTreeMap; use std::convert::TryInto; use std::rc::Rc; +use std::time::Duration; -use differential_dataflow::AsCollection; +use columnar::{Columnar, Index}; +use differential_dataflow::Hashable; use mz_compute_client::logging::LoggingConfig; -use mz_expr::{permutation_for_arrangement, MirScalarExpr}; use mz_ore::cast::CastFrom; -use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; -use mz_ore::iter::IteratorExt; -use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp}; -use mz_timely_util::operator::consolidate_pact; +use mz_repr::{Datum, Diff, Row, RowRef, Timestamp}; +use mz_timely_util::containers::{Col2ValBatcher, Column, ColumnBuilder}; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; -use timely::container::flatcontainer::FlatStack; -use timely::container::CapacityContainerBuilder; -use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::operators::core::Map; +use timely::dataflow::channels::pact::ExchangeCore; +use timely::Container; -use crate::extensions::arrange::MzArrange; -use crate::logging::initialize::ReachabilityEventRegion; -use crate::logging::{EventQueue, LogCollection, LogVariant, TimelyLog}; -use crate::row_spine::{RowRowBatcher, RowRowBuilder}; -use crate::typedefs::{FlatKeyValBatcherDefault, RowRowSpine}; +use crate::extensions::arrange::MzArrangeCore; +use crate::logging::initialize::ReachabilityEvent; +use crate::logging::{prepare_log_collection, EventQueue, LogCollection, LogVariant, TimelyLog}; +use crate::row_spine::RowRowBuilder; +use crate::typedefs::RowRowSpine; /// Constructs the logging dataflow for reachability logs. /// @@ -43,7 +40,7 @@ use crate::typedefs::{FlatKeyValBatcherDefault, RowRowSpine}; pub(super) fn construct( worker: &mut timely::worker::Worker, config: &LoggingConfig, - event_queue: EventQueue>, + event_queue: EventQueue>, ) -> BTreeMap { let interval_ms = std::cmp::max(1, config.interval.as_millis()); let worker_index = worker.index(); @@ -52,16 +49,9 @@ pub(super) fn construct( // A dataflow for multiple log-derived arrangements. let traces = worker.dataflow_named("Dataflow: timely reachability logging", move |scope| { let enable_logging = config.enable_logging; - type UpdatesKey = ( - bool, - OwnedRegionOpinion>, - usize, - usize, - Option, - ); - type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region; + type UpdatesKey = (bool, Vec, usize, usize, Option); - type CB = CapacityContainerBuilder>; + type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>; let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>( scope, "reachability logs", @@ -76,7 +66,7 @@ pub(super) fn construct( for (time, (addr, massaged)) in data.iter() { let time_ms = ((time.as_millis() / interval_ms) + 1) * interval_ms; let time_ms: Timestamp = time_ms.try_into().expect("must fit"); - for (source, port, update_type, ts, diff) in massaged { + for (source, port, update_type, ts, diff) in massaged.into_iter() { let datum = (update_type, addr, source, port, ts); session.give(((datum, ()), time_ms, diff)); } @@ -85,62 +75,36 @@ pub(super) fn construct( ); // Restrict results by those logs that are meant to be active. - let logs_active = vec![LogVariant::Timely(TimelyLog::Reachability)]; + let logs_active = [LogVariant::Timely(TimelyLog::Reachability)]; - let updates = consolidate_pact::< - FlatKeyValBatcherDefault, - _, - _, - _, - _, - >(&updates, Pipeline, "Consolidate Timely reachability") - .container::>(); + let mut addr_row = Row::default(); + let updates = prepare_log_collection( + &updates, + TimelyLog::Reachability, + move |datum, (), packer| { + let (update_type, addr, source, port, ts) = datum; + let update_type = if update_type { "source" } else { "target" }; + addr_row.packer().push_list( + addr.iter() + .chain(std::iter::once(source)) + .map(|id| Datum::UInt64(u64::cast_from(id))), + ); + packer.pack_slice(&[ + addr_row.iter().next().unwrap(), + Datum::UInt64(u64::cast_from(port)), + Datum::UInt64(u64::cast_from(worker_index)), + Datum::String(update_type), + Datum::from(>::into_owned(ts)), + ]); + } + ); let mut result = BTreeMap::new(); for variant in logs_active { if config.index_logs.contains_key(&variant) { - let key = variant.index_by(); - let (_, value) = permutation_for_arrangement( - &key.iter() - .cloned() - .map(MirScalarExpr::Column) - .collect::>(), - variant.desc().arity(), - ); - - let updates = updates - .map( - move |(((update_type, addr, source, port, ts), _), time, diff)| { - let row_arena = RowArena::default(); - let update_type = if update_type { "source" } else { "target" }; - let binding = SharedRow::get(); - let mut row_builder = binding.borrow_mut(); - row_builder.packer().push_list( - addr.iter() - .copied() - .chain_one(source) - .map(|id| Datum::UInt64(u64::cast_from(id))), - ); - let datums = &[ - row_arena.push_unary_row(row_builder.clone()), - Datum::UInt64(u64::cast_from(port)), - Datum::UInt64(u64::cast_from(worker_index)), - Datum::String(update_type), - Datum::from(ts.clone()), - ]; - row_builder.packer().extend(key.iter().map(|k| datums[*k])); - let key_row = row_builder.clone(); - row_builder - .packer() - .extend(value.iter().map(|k| datums[*k])); - let value_row = row_builder.clone(); - ((key_row, value_row), time, diff) - }, - ) - .as_collection(); - let trace = updates - .mz_arrange::, RowRowBuilder<_, _>, RowRowSpine<_, _>>( + .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>( + ExchangeCore::new(|((key, _val), _time, _diff): &((&RowRef, &RowRef), _, _)| key.hashed()), &format!("Arrange {variant:?}"), ) .trace; diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index 61f43bbe36280..fdca570eab265 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -14,34 +14,31 @@ use std::collections::BTreeMap; use std::rc::Rc; use std::time::Duration; -use differential_dataflow::consolidation::ConsolidatingContainerBuilder; -use differential_dataflow::AsCollection; +use columnar::{Columnar, Index, IndexAs}; +use differential_dataflow::Hashable; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; -use mz_repr::{Datum, Diff, Timestamp}; -use mz_timely_util::operator::consolidate_pact; +use mz_repr::{Datum, Diff, RowRef, Timestamp}; +use mz_timely_util::containers::{Col2ValBatcher, Column, ColumnBuilder}; use mz_timely_util::replay::MzReplay; -use serde::{Deserialize, Serialize}; use timely::communication::Allocate; use timely::container::columnation::{Columnation, CopyRegion}; -use timely::container::CapacityContainerBuilder; -use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::channels::pushers::buffer::Session; -use timely::dataflow::channels::pushers::{Counter, Tee}; +use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; -use timely::dataflow::operators::Filter; +use timely::dataflow::StreamCore; use timely::logging::{ - ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent, - TimelyEvent, + ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ParkEventReference, ScheduleEvent, + ShutdownEvent, TimelyEvent, TimelyEventReference, }; +use timely::Container; use tracing::error; -use crate::extensions::arrange::MzArrange; +use crate::extensions::arrange::MzArrangeCore; use crate::logging::compute::{ComputeEvent, DataflowShutdown}; +use crate::logging::{prepare_log_collection, LogCollection, OutputSession}; use crate::logging::{EventQueue, LogVariant, SharedLoggingState, TimelyLog}; -use crate::logging::{LogCollection, PermutedRowPacker}; -use crate::row_spine::{RowRowBatcher, RowRowBuilder}; -use crate::typedefs::{KeyValBatcher, RowRowSpine}; +use crate::row_spine::RowRowBuilder; +use crate::typedefs::RowRowSpine; /// Constructs the logging dataflow for timely logs. /// @@ -55,27 +52,29 @@ pub(super) fn construct( event_queue: EventQueue>, shared_state: Rc>, ) -> BTreeMap { - let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); + let logging_interval_ms = + u64::try_from(std::cmp::max(1, config.interval.as_millis())).expect("must fit"); let worker_id = worker.index(); let peers = worker.peers(); let dataflow_index = worker.next_dataflow_index(); worker.dataflow_named("Dataflow: timely logging", move |scope| { - let (mut logs, token) = Some(event_queue.link) - .mz_replay::<_, CapacityContainerBuilder<_>, _>( + let enable_logging = config.enable_logging; + let (logs, token): (StreamCore<_, Column<(Duration, TimelyEvent)>>, _) = + Some(event_queue.link).mz_replay::<_, ColumnBuilder<_>, _>( scope, "timely logs", config.interval, event_queue.activator, - |mut session, data| session.give_iterator(data.iter()), + move |mut session, data| { + // If logging is disabled, we still need to install the indexes, but we can leave them + // empty. We do so by immediately filtering all logs events. + if enable_logging { + session.give_iterator(data.iter()) + } + }, ); - // If logging is disabled, we still need to install the indexes, but we can leave them - // empty. We do so by immediately filtering all logs events. - if !config.enable_logging { - logs = logs.filter(|_| false); - } - // Build a demux operator that splits the replayed event stream up into the separate // logging streams. let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone()); @@ -119,8 +118,9 @@ pub(super) fn construct( batches_received: batches_received.session_with_builder(&cap), }; - for (time, event) in data.drain(..) { - if let TimelyEvent::Messages(msg) = &event { + let shared_state = &mut shared_state.borrow_mut(); + for (time, event) in data.drain() { + if let TimelyEventReference::Messages(msg) = &event { match msg.is_send { true => assert_eq!(msg.source, worker_id), false => assert_eq!(msg.target, worker_id), @@ -129,7 +129,7 @@ pub(super) fn construct( DemuxHandler { state: &mut demux_state, - shared_state: &mut shared_state.borrow_mut(), + shared_state, output: &mut output_buffers, logging_interval_ms, peers, @@ -145,181 +145,152 @@ pub(super) fn construct( // We pre-arrange the logging streams to force a consolidation and reduce the amount of // updates that reach `Row` encoding. - let packer = PermutedRowPacker::new(TimelyLog::Operates); - let operates = consolidate_pact::, _, _, _, _>( + let operates = prepare_log_collection( &operates, - Pipeline, - "Consolidate Timely operates", - ) - .as_collection() - .map(move |(id, name)| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(id)), - Datum::UInt64(u64::cast_from(worker_id)), - Datum::String(&name), - ]) - }); - - let packer = PermutedRowPacker::new(TimelyLog::Channels); - let channels = consolidate_pact::, _, _, _, _>( + TimelyLog::Operates, + move |id, name, packer| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(id)), + Datum::UInt64(u64::cast_from(worker_id)), + Datum::String(name), + ]); + }, + ); + + let channels = prepare_log_collection( &channels, - Pipeline, - "Consolidate Timely operates", - ) - .as_collection() - .map(move |(datum, ())| { - let (source_node, source_port) = datum.source; - let (target_node, target_port) = datum.target; - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(datum.id)), - Datum::UInt64(u64::cast_from(worker_id)), - Datum::UInt64(u64::cast_from(source_node)), - Datum::UInt64(u64::cast_from(source_port)), - Datum::UInt64(u64::cast_from(target_node)), - Datum::UInt64(u64::cast_from(target_port)), - ]) - }); - - let packer = PermutedRowPacker::new(TimelyLog::Addresses); - let addresses = consolidate_pact::, _, _, _, _>( + TimelyLog::Channels, + move |datum, (), packer| { + let (source_node, source_port) = datum.source; + let (target_node, target_port) = datum.target; + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(datum.id)), + Datum::UInt64(u64::cast_from(worker_id)), + Datum::UInt64(u64::cast_from(source_node)), + Datum::UInt64(u64::cast_from(source_port)), + Datum::UInt64(u64::cast_from(target_node)), + Datum::UInt64(u64::cast_from(target_port)), + ]); + }, + ); + + let addresses = prepare_log_collection( &addresses, - Pipeline, - "Consolidate Timely addresses", - ) - .as_collection() - .map({ - move |(id, address)| { + TimelyLog::Addresses, + move |id, address, packer| { packer.pack_by_index(|packer, index| match index { 0 => packer.push(Datum::UInt64(u64::cast_from(id))), 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))), 2 => { - packer.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)))) + packer.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(i)))) } _ => unreachable!("Addresses relation has three columns"), - }) - } - }); + }); + }, + ); - let packer = PermutedRowPacker::new(TimelyLog::Parks); - let parks = consolidate_pact::, _, _, _, _>( + let parks = prepare_log_collection( &parks, - Pipeline, - "Consolidate Timely parks", - ) - .as_collection() - .map(move |(datum, ())| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(worker_id)), - Datum::UInt64(u64::try_from(datum.duration_pow).expect("duration too big")), - datum - .requested_pow - .map(|v| Datum::UInt64(v.try_into().expect("requested too big"))) - .unwrap_or(Datum::Null), - ]) - }); - - let packer = PermutedRowPacker::new(TimelyLog::BatchesSent); - let batches_sent = consolidate_pact::, _, _, _, _>( + TimelyLog::Parks, + move |datum, (), packer| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(worker_id)), + Datum::UInt64(*datum.duration_pow), + datum + .requested_pow + .map(|v| Datum::UInt64(*v)) + .unwrap_or(Datum::Null), + ]); + }, + ); + + let batches_sent = prepare_log_collection( &batches_sent, - Pipeline, - "Consolidate Timely batches sent", - ) - .as_collection() - .map(move |(datum, ())| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(datum.channel)), - Datum::UInt64(u64::cast_from(worker_id)), - Datum::UInt64(u64::cast_from(datum.worker)), - ]) - }); - - let packer = PermutedRowPacker::new(TimelyLog::BatchesReceived); - let batches_received = consolidate_pact::, _, _, _, _>( + TimelyLog::BatchesSent, + move |datum, (), packer| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(datum.channel)), + Datum::UInt64(u64::cast_from(worker_id)), + Datum::UInt64(u64::cast_from(datum.worker)), + ]); + }, + ); + + let batches_received = prepare_log_collection( &batches_received, - Pipeline, - "Consolidate Timely batches received", - ) - .as_collection() - .map(move |(datum, ())| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(datum.channel)), - Datum::UInt64(u64::cast_from(datum.worker)), - Datum::UInt64(u64::cast_from(worker_id)), - ]) - }); - - let packer = PermutedRowPacker::new(TimelyLog::MessagesSent); - let messages_sent = consolidate_pact::, _, _, _, _>( + TimelyLog::BatchesReceived, + move |datum, (), packer| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(datum.channel)), + Datum::UInt64(u64::cast_from(datum.worker)), + Datum::UInt64(u64::cast_from(worker_id)), + ]); + }, + ); + + + let messages_sent = prepare_log_collection( &messages_sent, - Pipeline, - "Consolidate Timely messages sent", - ) - .as_collection() - .map(move |(datum, ())| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(datum.channel)), - Datum::UInt64(u64::cast_from(worker_id)), - Datum::UInt64(u64::cast_from(datum.worker)), - ]) - }); - - let packer = PermutedRowPacker::new(TimelyLog::MessagesReceived); - let messages_received = consolidate_pact::, _, _, _, _>( + TimelyLog::MessagesSent, + move |datum, (), packer| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(datum.channel)), + Datum::UInt64(u64::cast_from(worker_id)), + Datum::UInt64(u64::cast_from(datum.worker)), + ]); + }, + ); + + let messages_received = prepare_log_collection( &messages_received, - Pipeline, - "Consolidate Timely messages received", - ) - .as_collection() - .map(move |(datum, ())| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(datum.channel)), - Datum::UInt64(u64::cast_from(datum.worker)), - Datum::UInt64(u64::cast_from(worker_id)), - ]) - }); - - let packer = PermutedRowPacker::new(TimelyLog::Elapsed); - let elapsed = consolidate_pact::, _, _, _, _>( + TimelyLog::MessagesReceived, + move |datum, (), packer| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(datum.channel)), + Datum::UInt64(u64::cast_from(datum.worker)), + Datum::UInt64(u64::cast_from(worker_id)), + ]); + }, + ); + + let elapsed = prepare_log_collection( &schedules_duration, - Pipeline, - "Consolidate Timely duration", - ) - .as_collection() - .map(move |(operator, _)| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(operator)), - Datum::UInt64(u64::cast_from(worker_id)), - ]) - }); + TimelyLog::Elapsed, + move |operator, (), packer| { + packer.pack_slice(&[Datum::UInt64(u64::cast_from(operator)), + Datum::UInt64(u64::cast_from(worker_id)), + ]); + }, + ); - let packer = PermutedRowPacker::new(TimelyLog::Histogram); - let histogram = consolidate_pact::, _, _, _, _>( - &schedules_histogram, - Pipeline, - "Consolidate Timely histogram", - ) - .as_collection() - .map(move |(datum, _)| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(datum.operator)), - Datum::UInt64(u64::cast_from(worker_id)), - Datum::UInt64(u64::try_from(datum.duration_pow).expect("duration too big")), - ]) - }); - use TimelyLog::*; - let logs = [ - (Operates, operates), - (Channels, channels), - (Elapsed, elapsed), - (Histogram, histogram), - (Addresses, addresses), - (Parks, parks), - (MessagesSent, messages_sent), - (MessagesReceived, messages_received), - (BatchesSent, batches_sent), - (BatchesReceived, batches_received), - ]; + let histogram = prepare_log_collection( + &schedules_histogram, + TimelyLog::Histogram, + move |datum, (), packer| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(datum.operator)), + Datum::UInt64(u64::cast_from(worker_id)), + Datum::UInt64(*datum.duration_pow), + ]); + }, + ); + + let logs = { + use TimelyLog::*; + [ + (Operates, operates), + (Channels, channels), + (Elapsed, elapsed), + (Histogram, histogram), + (Addresses, addresses), + (Parks, parks), + (MessagesSent, messages_sent), + (MessagesReceived, messages_received), + (BatchesSent, batches_sent), + (BatchesReceived, batches_received), + ] + }; // Build the output arrangements. let mut result = BTreeMap::new(); @@ -327,7 +298,8 @@ pub(super) fn construct( let variant = LogVariant::Timely(variant); if config.index_logs.contains_key(&variant) { let trace = collection - .mz_arrange::, RowRowBuilder<_, _>, RowRowSpine<_, _>>( + .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>( + ExchangeCore::new(|((key, _val), _time, _diff): &((&RowRef, &RowRef), _, _)| key.hashed()), &format!("Arrange {variant:?}"), ) .trace; @@ -364,6 +336,7 @@ struct DemuxState { schedules_data: BTreeMap>, } +#[derive(Columnar)] struct Park { /// Time when the park occurred. time: Duration, @@ -372,7 +345,7 @@ struct Park { } /// Organize message counts into number of batches and records. -#[derive(Default, Copy, Clone, Debug)] +#[derive(Default, Copy, Clone, Debug, Columnar)] struct MessageCount { /// The number of batches sent across a channel. batches: i64, @@ -380,11 +353,6 @@ struct MessageCount { records: i64, } -type Pusher = - Counter, Tee>>; -type OutputSession<'a, D> = - Session<'a, Timestamp, ConsolidatingContainerBuilder>, Pusher>; - /// Bundled output buffers used by the demux operator. // // We use tuples rather than dedicated `*Datum` structs for `operates` and `addresses` to avoid @@ -403,7 +371,8 @@ struct DemuxOutput<'a> { schedules_histogram: OutputSession<'a, (ScheduleHistogramDatum, ())>, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)] +#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))] struct ChannelDatum { id: usize, source: (usize, usize), @@ -414,17 +383,19 @@ impl Columnation for ChannelDatum { type InnerRegion = CopyRegion; } -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)] +#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))] struct ParkDatum { - duration_pow: u128, - requested_pow: Option, + duration_pow: u64, + requested_pow: Option, } impl Columnation for ParkDatum { type InnerRegion = CopyRegion; } -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)] +#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))] struct MessageDatum { channel: usize, worker: usize, @@ -434,10 +405,11 @@ impl Columnation for MessageDatum { type InnerRegion = CopyRegion; } -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)] +#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))] struct ScheduleHistogramDatum { operator: usize, - duration_pow: u128, + duration_pow: u64, } impl Columnation for ScheduleHistogramDatum { @@ -453,7 +425,7 @@ struct DemuxHandler<'a, 'b> { /// Demux output buffers. output: &'a mut DemuxOutput<'b>, /// The logging interval specifying the time granularity for the updates. - logging_interval_ms: u128, + logging_interval_ms: u64, /// The number of timely workers. peers: usize, /// The current event time. @@ -464,15 +436,15 @@ impl DemuxHandler<'_, '_> { /// Return the timestamp associated with the current event, based on the event time and the /// logging interval. fn ts(&self) -> Timestamp { - let time_ms = self.time.as_millis(); + let time_ms = u64::try_from(self.time.as_millis()).expect("must fit"); let interval = self.logging_interval_ms; let rounded = (time_ms / interval + 1) * interval; rounded.try_into().expect("must fit") } /// Handle the given timely event. - fn handle(&mut self, event: TimelyEvent) { - use TimelyEvent::*; + fn handle(&mut self, event: ::Ref<'_>) { + use TimelyEventReference::*; match event { Operates(e) => self.handle_operates(e), @@ -485,18 +457,20 @@ impl DemuxHandler<'_, '_> { } } - fn handle_operates(&mut self, event: OperatesEvent) { + fn handle_operates(&mut self, event: ::Ref<'_>) { let ts = self.ts(); - let datum = (event.id, event.name.clone()); + let datum = (event.id, event.name); self.output.operates.give((datum, ts, 1)); - let datum = (event.id, event.addr.clone()); + let datum = (event.id, event.addr); self.output.addresses.give((datum, ts, 1)); - self.state.operators.insert(event.id, event); + self.state + .operators + .insert(event.id, Columnar::into_owned(event)); } - fn handle_channels(&mut self, event: ChannelsEvent) { + fn handle_channels(&mut self, event: ::Ref<'_>) { let ts = self.ts(); let datum = ChannelDatum { id: event.id, @@ -505,18 +479,18 @@ impl DemuxHandler<'_, '_> { }; self.output.channels.give(((datum, ()), ts, 1)); - let datum = (event.id, event.scope_addr.clone()); + let datum = (event.id, event.scope_addr); self.output.addresses.give((datum, ts, 1)); - let dataflow_index = event.scope_addr[0]; + let dataflow_index = event.scope_addr.index_as(0); self.state .dataflow_channels .entry(dataflow_index) .or_default() - .push(event); + .push(Columnar::into_owned(event)); } - fn handle_shutdown(&mut self, event: ShutdownEvent) { + fn handle_shutdown(&mut self, event: ::Ref<'_>) { // Dropped operators should result in a negative record for // the `operates` collection, cancelling out the initial // operator announcement. @@ -529,13 +503,12 @@ impl DemuxHandler<'_, '_> { // Retract operator information. let ts = self.ts(); - let datum = (operator.id, operator.name); + let datum = (operator.id, &operator.name); self.output.operates.give((datum, ts, -1)); // Retract schedules information for the operator if let Some(schedules) = self.state.schedules_data.remove(&event.id) { - for (bucket, (count, elapsed_ns)) in schedules - .into_iter() + for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules) .enumerate() .filter(|(_, (count, _))| *count != 0) { @@ -559,7 +532,7 @@ impl DemuxHandler<'_, '_> { self.handle_dataflow_shutdown(dataflow_index); } - let datum = (operator.id, operator.addr); + let datum = (operator.id, &operator.addr); self.output.addresses.give((datum, ts, -1)); } @@ -586,7 +559,7 @@ impl DemuxHandler<'_, '_> { }; self.output.channels.give(((datum, ()), ts, -1)); - let datum = (channel.id, channel.scope_addr); + let datum = (channel.id, &channel.scope_addr); self.output.addresses.give((datum, ts, -1)); // Retract messages logged for this channel. @@ -621,9 +594,9 @@ impl DemuxHandler<'_, '_> { } } - fn handle_park(&mut self, event: ParkEvent) { + fn handle_park(&mut self, event: ::Ref<'_>) { match event { - ParkEvent::Park(requested) => { + ParkEventReference::Park(requested) => { let park = Park { time: self.time, requested, @@ -633,15 +606,18 @@ impl DemuxHandler<'_, '_> { error!("park without a succeeding unpark"); } } - ParkEvent::Unpark => { + ParkEventReference::Unpark(()) => { let Some(park) = self.state.last_park.take() else { - error!("unpark without a preceeding park"); + error!("unpark without a preceding park"); return; }; let duration_ns = self.time.saturating_sub(park.time).as_nanos(); - let duration_pow = duration_ns.next_power_of_two(); - let requested_pow = park.requested.map(|r| r.as_nanos().next_power_of_two()); + let duration_pow = + u64::try_from(duration_ns.next_power_of_two()).expect("must fit"); + let requested_pow = park + .requested + .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit")); let ts = self.ts(); let datum = ParkDatum { @@ -653,7 +629,7 @@ impl DemuxHandler<'_, '_> { } } - fn handle_messages(&mut self, event: MessagesEvent) { + fn handle_messages(&mut self, event: ::Ref<'_>) { let ts = self.ts(); let count = Diff::try_from(event.length).expect("must fit"); @@ -690,7 +666,7 @@ impl DemuxHandler<'_, '_> { } } - fn handle_schedule(&mut self, event: ScheduleEvent) { + fn handle_schedule(&mut self, event: ::Ref<'_>) { match event.start_stop { timely::logging::StartStop::Start => { let existing = self.state.schedule_starts.insert(event.id, self.time); @@ -706,7 +682,7 @@ impl DemuxHandler<'_, '_> { let elapsed_ns = self.time.saturating_sub(start_time).as_nanos(); let elapsed_diff = Diff::try_from(elapsed_ns).expect("must fit"); - let elapsed_pow = elapsed_ns.next_power_of_two(); + let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit"); let ts = self.ts(); let datum = event.id; diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index b7ecd5d57108c..a0432c475f351 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -13,7 +13,9 @@ use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::TraceAgent; -use differential_dataflow::trace::implementations::chunker::ColumnationChunker; +use differential_dataflow::trace::implementations::chunker::{ + ColumnationChunker, ContainerChunker, +}; use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher}; use differential_dataflow::trace::implementations::ord_neu::{ FlatValBatcher, FlatValBuilder, FlatValSpine, OrdValBatch, @@ -23,6 +25,7 @@ use differential_dataflow::trace::wrappers::frontier::TraceFrontier; use mz_ore::flatcontainer::MzRegionPreference; use mz_repr::Diff; use mz_storage_types::errors::DataflowError; +use timely::container::columnation::TimelyStack; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use timely::dataflow::ScopeParent; @@ -195,6 +198,13 @@ pub type KeyBatcher = KeyValBatcher; pub type KeyValBatcher = MergeBatcher, ColumnationChunker<((K, V), T, D)>, ColMerger<(K, V), T, D>>; +pub type KeyBatcherColumnar = KeyValBatcher; +pub type KeyValBatcherColumnar = MergeBatcher< + Vec<((K, V), T, D)>, + ContainerChunker>, + ColMerger<(K, V), T, D>, +>; + pub type FlatKeyValBatch = OrdValBatch>; pub type FlatKeyValSpine = FlatValSpine>; pub type FlatKeyValSpineDefault = FlatKeyValSpine< diff --git a/src/repr/Cargo.toml b/src/repr/Cargo.toml index d09a22d1697b5..4a4f4ff354640 100644 --- a/src/repr/Cargo.toml +++ b/src/repr/Cargo.toml @@ -31,7 +31,7 @@ arrow = { version = "53.3.0", default-features = false } bitflags = "1.3.2" bytes = "1.3.0" cfg-if = "1.0.0" -columnar = "0.2.0" +columnar = "0.2.2" columnation = "0.1.0" chrono = { version = "0.4.35", default-features = false, features = ["serde", "std"] } compact_bytes = "0.1.2" diff --git a/src/repr/src/timestamp.rs b/src/repr/src/timestamp.rs index c159916a8596a..e00efa747f497 100644 --- a/src/repr/src/timestamp.rs +++ b/src/repr/src/timestamp.rs @@ -37,6 +37,7 @@ include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs")); Arbitrary, Columnar, )] +#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))] pub struct Timestamp { /// note no `pub`. internal: u64, diff --git a/src/timely-util/Cargo.toml b/src/timely-util/Cargo.toml index 4c37d17485e92..9d895e52c755b 100644 --- a/src/timely-util/Cargo.toml +++ b/src/timely-util/Cargo.toml @@ -13,7 +13,7 @@ workspace = true ahash = { version = "0.8.11", default-features = false } bincode = "1.3.3" bytemuck = "1.21.0" -columnar = "0.2.0" +columnar = "0.2.2" columnation = "0.1.0" differential-dataflow = "0.13.3" either = "1" diff --git a/src/timely-util/src/containers.rs b/src/timely-util/src/containers.rs index 6ae6db0fd9eaf..aa7b12b11169a 100644 --- a/src/timely-util/src/containers.rs +++ b/src/timely-util/src/containers.rs @@ -382,3 +382,36 @@ pub mod batcher { } } } + +pub use provided_builder::ProvidedBuilder; + +mod provided_builder { + use timely::container::ContainerBuilder; + use timely::Container; + + /// A container builder that doesn't support pushing elements, and is only suitable for pushing + /// whole containers at session. + pub struct ProvidedBuilder { + _marker: std::marker::PhantomData, + } + + impl Default for ProvidedBuilder { + fn default() -> Self { + Self { + _marker: std::marker::PhantomData, + } + } + } + + impl ContainerBuilder for ProvidedBuilder { + type Container = C; + + fn extract(&mut self) -> Option<&mut Self::Container> { + None + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + None + } + } +} From 7af52c69db173c1b645d58a1c4f995f36eb20c68 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 17 Jan 2025 17:27:21 +0100 Subject: [PATCH 3/9] I think this isn't bad! Signed-off-by: Moritz Hoffmann --- src/compute/src/logging.rs | 131 +++++----------- src/compute/src/logging/compute.rs | 50 +++--- src/compute/src/logging/differential.rs | 64 ++++---- src/compute/src/logging/reachability.rs | 21 +-- src/compute/src/logging/timely.rs | 170 +++++++++++---------- src/compute/src/sink/copy_to_s3_oneshot.rs | 18 ++- src/timely-util/src/operator.rs | 63 +++----- 7 files changed, 224 insertions(+), 293 deletions(-) diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index a5dcff76760d9..6cd204cd698b5 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -21,7 +21,7 @@ use std::marker::PhantomData; use std::rc::Rc; use std::time::Duration; -use ::timely::container::columnation::Columnation; +use ::timely::container::ContainerBuilder; use ::timely::dataflow::channels::pact::Pipeline; use ::timely::dataflow::channels::pushers::buffer::Session; use ::timely::dataflow::channels::pushers::{Counter, Tee}; @@ -31,12 +31,11 @@ use ::timely::dataflow::StreamCore; use ::timely::progress::Timestamp as TimelyTimestamp; use ::timely::scheduling::Activator; use ::timely::Container; -use columnar::Columnar; +use differential_dataflow::trace::Batcher; use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog}; use mz_expr::{permutation_for_arrangement, MirScalarExpr}; -use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, SharedRow, Timestamp}; +use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp}; use mz_timely_util::activator::RcActivator; -use mz_timely_util::containers::{Col2ValBatcher, Column, ColumnBuilder}; use mz_timely_util::operator::consolidate_pact; use crate::logging::compute::Logger as ComputeLogger; @@ -141,73 +140,23 @@ struct SharedLoggingState { pub(crate) struct PermutedRowPacker { key: Vec, value: Vec, -} - -impl PermutedRowPacker { - /// Construct based on the information within the log variant. - pub(crate) fn new>(variant: V) -> Self { - let variant = variant.into(); - let key = variant.index_by(); - let (_, value) = permutation_for_arrangement( - &key.iter() - .cloned() - .map(MirScalarExpr::Column) - .collect::>(), - variant.desc().arity(), - ); - Self { key, value } - } - - /// Pack a slice of datums suitable for the key columns in the log variant. - pub(crate) fn pack_slice(&self, datums: &[Datum]) -> (Row, Row) { - self.pack_by_index(|packer, index| packer.push(datums[index])) - } - - /// Pack using a callback suitable for the key columns in the log variant. - pub(crate) fn pack_by_index(&self, logic: F) -> (Row, Row) { - let binding = SharedRow::get(); - let mut row_builder = binding.borrow_mut(); - - let mut packer = row_builder.packer(); - for index in &self.key { - logic(&mut packer, *index); - } - let key_row = row_builder.clone(); - - let mut packer = row_builder.packer(); - for index in &self.value { - logic(&mut packer, *index); - } - let value_row = row_builder.clone(); - - (key_row, value_row) - } -} - -/// Helper to pack collections of [`Datum`]s into key and value row. -pub(crate) struct PermutedRowRefPacker { - arity: usize, - key: Vec, - value: Vec, key_row: Row, value_row: Row, } -impl PermutedRowRefPacker { +impl PermutedRowPacker { /// Construct based on the information within the log variant. pub(crate) fn new>(variant: V) -> Self { let variant = variant.into(); let key = variant.index_by(); - let arity = variant.desc().arity(); let (_, value) = permutation_for_arrangement( &key.iter() .cloned() .map(MirScalarExpr::Column) .collect::>(), - arity, + variant.desc().arity(), ); Self { - arity, key, value, key_row: Row::default(), @@ -217,10 +166,15 @@ impl PermutedRowRefPacker { /// Pack a slice of datums suitable for the key columns in the log variant. pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) { - assert_eq!(datums.len(), self.arity); self.pack_by_index(|packer, index| packer.push(datums[index])) } + /// Pack a slice of datums suitable for the key columns in the log variant. + pub(crate) fn pack_slice_owned(&mut self, datums: &[Datum]) -> (Row, Row) { + let (key, value) = self.pack_by_index(|packer, index| packer.push(datums[index])); + (key.to_owned(), value.to_owned()) + } + /// Pack using a callback suitable for the key columns in the log variant. pub(crate) fn pack_by_index( &mut self, @@ -238,14 +192,6 @@ impl PermutedRowRefPacker { (&self.key_row, &self.value_row) } - - pub(crate) fn key(&self) -> &RowRef { - &self.key_row - } - - pub(crate) fn value(&self) -> &RowRef { - &self.value_row - } } /// Information about a collection exported from a logging dataflow. @@ -258,10 +204,9 @@ struct LogCollection { dataflow_index: usize, } -pub(super) type Update = (D, Timestamp, Diff); -pub(super) type Pusher = - Counter>, Tee>>>; -pub(super) type OutputSession<'a, D> = Session<'a, Timestamp, ColumnBuilder>, Pusher>; +pub(super) type Pusher = Counter>; +pub(super) type OutputSession<'a, CB> = + Session<'a, Timestamp, CB, Pusher<::Container>>; /// A single-purpose function to consolidate and pack updates for log collection. /// @@ -269,38 +214,38 @@ pub(super) type OutputSession<'a, D> = Session<'a, Timestamp, ColumnBuilder( - input: &StreamCore>>, +pub(super) fn consolidate_and_pack( + input: &StreamCore, log: L, mut logic: F, -) -> StreamCore>> +) -> StreamCore where G: ::timely::dataflow::Scope, + B: Batcher