Skip to content
4 changes: 2 additions & 2 deletions differential-dataflow/examples/columnar/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! Demonstrates columnar-backed arrangements in an iterative scope,
//! exercising Enter, Leave, Negate, ResultsIn on RecordedUpdates,
//! and Push on Updates for the reduce builder path.
//! and Push on UpdatesTyped for the reduce builder path.

use timely::container::{ContainerBuilder, PushInto};
use timely::dataflow::InputHandle;
Expand Down Expand Up @@ -87,7 +87,7 @@ fn main() {
///
/// This module exercises the container traits needed for iterative columnar
/// computation: Enter, Leave, Negate, ResultsIn on RecordedUpdates, and
/// Push on Updates for the reduce builder path.
/// Push on UpdatesTyped for the reduce builder path.
mod reachability {

use timely::order::Product;
Expand Down
56 changes: 28 additions & 28 deletions differential-dataflow/src/columnar/arrangement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
//! - Type aliases (`ValSpine`, `ValBatcher`, `ValBuilder`) glue columnar storage
//! into DD's trace machinery.
//! - `Coltainer<C>` wraps a columnar `C::Container` as a DD `BatchContainer`.
//! - `TrieChunker` strips `RecordedUpdates` down to `Updates` for the merge batcher.
//! - `batcher` contains required trait stubs for `Updates`.
//! - `TrieChunker` strips `RecordedUpdates` down to `UpdatesTyped` for the merge batcher.
//! - `batcher` contains required trait stubs for `UpdatesTyped`.
//! - `trie_merger` is the batch-at-a-time merging logic.
//! - `builder::ValMirror` is the `trace::Builder` that seals melded chunks into
//! an `OrdValBatch`.
Expand Down Expand Up @@ -122,12 +122,12 @@ pub mod batch_container {
}
}

use super::updates::Updates;
use super::updates::UpdatesTyped;
use super::RecordedUpdates;
use crate::trace::implementations::merge_batcher::MergeBatcher;
type ValBatcher2<U> = MergeBatcher<RecordedUpdates<U>, TrieChunker<U>, trie_merger::TrieMerger<U>>;

/// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher.
/// A chunker that unwraps `RecordedUpdates` into bare `UpdatesTyped` for the merge batcher.
///
/// The intended behavior is to produce chunks whose size is within 1-2x `LINK_TARGET`.
/// It ships large batches immediately, accumulates small batches, consolidates as they
Expand All @@ -137,14 +137,14 @@ type ValBatcher2<U> = MergeBatcher<RecordedUpdates<U>, TrieChunker<U>, trie_merg
/// each of which is put in `self.stage`
pub struct TrieChunker<U: super::layout::ColumnarUpdate> {
/// Insufficiently large updates we haven't figured out how to ship yet.
blobs: Vec<(Updates<U>, bool)>,
blobs: Vec<(UpdatesTyped<U>, bool)>,
/// Sum of `len()` across `blobs`.
blob_records: usize,
/// Ready-to-emit chunks. Each is sorted and consolidated; size ≥ `LINK_TARGET`
/// (or smaller, only for the final chunk produced by `finish`).
ready: std::collections::VecDeque<Updates<U>>,
ready: std::collections::VecDeque<UpdatesTyped<U>>,
/// Staging area for the next pull call.
stage: Option<Updates<U>>,
stage: Option<UpdatesTyped<U>>,
}

impl<U: super::layout::ColumnarUpdate> Default for TrieChunker<U> {
Expand All @@ -160,7 +160,7 @@ impl<U: super::layout::ColumnarUpdate> Default for TrieChunker<U> {

impl<U: super::layout::ColumnarUpdate> TrieChunker<U> {
/// Consolidate and empty `self.blobs`, into `self.ready` if large enough or else return.
fn consolidate_blobs(&mut self) -> Updates<U> {
fn consolidate_blobs(&mut self) -> UpdatesTyped<U> {
// Single consolidated entry: pass through, no work.
if self.blobs.len() == 1 && self.blobs[0].1 {
let (result, _) = self.blobs.pop().unwrap();
Expand All @@ -169,14 +169,14 @@ impl<U: super::layout::ColumnarUpdate> TrieChunker<U> {
}

// TODO: Improve consolidation through column-oriented sorts.
let result = Updates::<U>::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter()));
let result = UpdatesTyped::<U>::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter()));
self.blobs.clear();
self.blob_records = 0;
result
}

/// Push a non-empty `Updates` into blobs and update accounting.
fn absorb(&mut self, updates: Updates<U>, consolidated: bool) {
/// Push a non-empty `UpdatesTyped` into blobs and update accounting.
fn absorb(&mut self, updates: UpdatesTyped<U>, consolidated: bool) {
self.blob_records += updates.len();
self.blobs.push((updates, consolidated));
}
Expand All @@ -194,7 +194,7 @@ impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut R
// Into if small enough, as we can further consolidate, but if not we need to
// consolidate and then either ship (if large) or hold (if small) the results.

let updates = std::mem::take(&mut container.updates);
let updates = std::mem::take(&mut container.updates).into_typed();
let consolidated = container.consolidated;
let len = updates.len();

Expand Down Expand Up @@ -237,7 +237,7 @@ impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut R
}

impl<U: super::layout::ColumnarUpdate> timely::container::ContainerBuilder for TrieChunker<U> {
type Container = Updates<U>;
type Container = UpdatesTyped<U>;
fn extract(&mut self) -> Option<&mut Self::Container> {
self.stage = self.ready.pop_front();
self.stage.as_mut()
Expand All @@ -253,24 +253,24 @@ impl<U: super::layout::ColumnarUpdate> timely::container::ContainerBuilder for T
}

pub mod batcher {
//! Batcher trait stubs required to plug `Updates` into DD's merge batcher.
//! Batcher trait stubs required to plug `UpdatesTyped` into DD's merge batcher.

use columnar::Len;
use timely::progress::frontier::{Antichain, AntichainRef};
use crate::trace::implementations::merge_batcher::container::InternalMerge;

use super::super::layout::ColumnarUpdate as Update;
use super::super::updates::Updates;
use super::super::updates::UpdatesTyped;

impl<U: Update> timely::container::SizableContainer for Updates<U> {
fn at_capacity(&self) -> bool { self.diffs.values.len() >= crate::columnar::LINK_TARGET }
impl<U: Update> timely::container::SizableContainer for UpdatesTyped<U> {
fn at_capacity(&self) -> bool { self.view().diffs.values.len() >= crate::columnar::LINK_TARGET }
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
}

/// Required by `reduce_abelian`'s bound `Builder::Input: InternalMerge`.
/// Not called at runtime — our batcher uses `TrieMerger` instead.
/// TODO: Relax the bound in DD's reduce to remove this requirement.
impl<U: Update> InternalMerge for Updates<U> {
impl<U: Update> InternalMerge for UpdatesTyped<U> {
type TimeOwned = U::Time;
fn len(&self) -> usize { unimplemented!() }
fn clear(&mut self) {
Expand Down Expand Up @@ -298,7 +298,7 @@ pub mod builder {
use crate::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage};
use crate::trace::Description;

use super::super::updates::Updates;
use super::super::updates::UpdatesTyped;
use super::super::layout::ColumnarUpdate as Update;
use super::super::layout::ColumnarLayout as Layout;
use super::Coltainer;
Expand All @@ -316,14 +316,14 @@ pub mod builder {
output
}

/// Trace [`Builder`](crate::trace::Builder) that accumulates `Updates`
/// Trace [`Builder`](crate::trace::Builder) that accumulates `UpdatesTyped`
/// chunks and seals them into a single [`OrdValBatch`].
pub struct ValMirror<U: Update> {
chunks: Vec<Updates<U>>,
chunks: Vec<UpdatesTyped<U>>,
}
impl<U: Update> crate::trace::Builder for ValMirror<U> {
type Time = U::Time;
type Input = Updates<U>;
type Input = UpdatesTyped<U>;
type Output = OrdValBatch<Layout<U>>;

fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
Expand All @@ -343,12 +343,12 @@ pub mod builder {

// Meld sorted, consolidated chain entries in order.
// Pre-allocate to avoid reallocations during meld.
use columnar::{Borrow, Container};
let mut updates = Updates::<U>::default();
updates.keys.reserve_for(chain.iter().map(|c| c.keys.borrow()));
updates.vals.reserve_for(chain.iter().map(|c| c.vals.borrow()));
updates.times.reserve_for(chain.iter().map(|c| c.times.borrow()));
updates.diffs.reserve_for(chain.iter().map(|c| c.diffs.borrow()));
use columnar::Container;
let mut updates = UpdatesTyped::<U>::default();
updates.keys.reserve_for(chain.iter().map(|c| c.view().keys));
updates.vals.reserve_for(chain.iter().map(|c| c.view().vals));
updates.times.reserve_for(chain.iter().map(|c| c.view().times));
updates.diffs.reserve_for(chain.iter().map(|c| c.view().diffs));
let mut builder = super::super::updates::UpdatesBuilder::new_from(updates);
for chunk in chain.iter() {
builder.meld(chunk);
Expand Down
Loading
Loading