Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flow): Arrange get range with batch unaligned #3552

Merged
merged 3 commits into from
Mar 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
287 changes: 269 additions & 18 deletions src/flow/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet};
use std::ops::Bound;
use std::sync::Arc;

use itertools::Itertools;
Expand All @@ -24,11 +25,12 @@ use crate::expr::error::InternalSnafu;
use crate::expr::{EvalError, ScalarExpr};
use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow, Row, Timestamp};

pub type Batch = BTreeMap<Row, SmallVec<[DiffRow; 2]>>;
pub type Spine = BTreeMap<Timestamp, Batch>;
Comment on lines +28 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Maybe also move/duplicate some document to those alias


/// Determine when should a key expire according to it's event timestamp in key,
/// if a key is expired, any future updates to it should be ignored
/// Note that key is expired by it's event timestamp(contained in the key), not by the time it's inserted(system timestamp)
///
/// TODO(discord9): find a better way to handle key expiration, like write to disk or something instead of throw away
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
pub struct KeyExpiryManager {
/// a map from event timestamp to key, used for expire keys
Expand Down Expand Up @@ -121,13 +123,16 @@ pub struct Arrangement {
/// And for consolidated batch(i.e. btach representing now), there should be only one update for each key with `diff==1`
///
/// And since most time a key gots updated by first delete then insert, small vec with size of 2 make sense
spine: BTreeMap<Timestamp, BTreeMap<Row, SmallVec<[DiffRow; 2]>>>,
/// TODO: batch size balancing?
spine: Spine,
/// if set to false, will not update current value of the arrangement, useful for case like `map -> arrange -> reduce`
full_arrangement: bool,
/// flag to mark that this arrangement haven't been written to, so that it can be cloned and shared
is_written: bool,
/// manage the expire state of the arrangement
expire_state: Option<KeyExpiryManager>,
/// the time that the last compaction happened, also know as current time
last_compaction_time: Option<Timestamp>,
}

impl Arrangement {
Expand All @@ -137,6 +142,7 @@ impl Arrangement {
full_arrangement: false,
is_written: false,
expire_state: None,
last_compaction_time: None,
}
}

Expand All @@ -160,6 +166,7 @@ impl Arrangement {
continue;
}
}

// the first batch with key that's greater or equal to ts
let batch = if let Some((_, batch)) = self.spine.range_mut(ts..).next() {
batch
Expand All @@ -171,20 +178,87 @@ impl Arrangement {
{
let key_updates = batch.entry(key).or_insert(smallvec![]);
key_updates.push((val, ts, diff));
// a stable sort make updates sort in order of insertion
// without changing the order of updates within same tick
key_updates.sort_by_key(|r| r.1);
}
}
Ok(max_late_by)
}

/// find out the time of next update in the future
/// that is the next update with `timestamp > now`
pub fn get_next_update_time(&self, now: &Timestamp) -> Option<Timestamp> {
// iter over batches that only have updates of `timestamp>now` and find the first non empty batch, then get the minimum timestamp in that batch
let next_batches = self.spine.range((Bound::Excluded(now), Bound::Unbounded));
for (_ts, batch) in next_batches {
let min_ts = batch
.iter()
.flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts))
.min();
if let Some(min_ts) = min_ts {
return Some(min_ts);
} else {
continue;
}
Comment on lines +201 to +203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary continue?

Suggested change
} else {
continue;
}
}

}
// all batches are empty, return now
None
}

/// get the last compaction time
pub fn get_compaction(&self) -> Option<Timestamp> {
self.last_compaction_time
}

/// split spine off at `now`, and return the spine that's before `now`(including `now`)
fn split_lte(&mut self, now: &Timestamp) -> Spine {
let mut before = self.spine.split_off(&(now + 1));
std::mem::swap(&mut before, &mut self.spine);

// if before's last key == now, then all the keys we needed are found
if before
.last_key_value()
.map(|(k, _v)| *k == *now)
.unwrap_or(false)
{
return before;
}

// also need to move all keys from the first batch in spine with timestamp<=now to before
// we know that all remaining keys to be split off are last key < key <= now, we will make them into a new batch
if let Some(mut first_batch) = self.spine.first_entry() {
let mut new_batch: Batch = Default::default();
// remove all keys with val of empty vec
first_batch.get_mut().retain(|key, updates| {
// remove keys <= now from updates
updates.retain(|(val, ts, diff)| {
if *ts <= *now {
new_batch.entry(key.clone()).or_insert(smallvec![]).push((
val.clone(),
*ts,
*diff,
));
}
*ts > *now
});
!updates.is_empty()
});

before.entry(*now).or_default().extend(new_batch);
}
before
}

/// advance time to `now` and consolidate all older(`now` included) updates to the first key
///
/// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired
pub fn set_compaction(&mut self, now: Timestamp) -> Result<Option<Duration>, EvalError> {
let mut max_late_by: Option<Duration> = None;

let mut should_compact = self.spine.split_off(&(now + 1));
std::mem::swap(&mut should_compact, &mut self.spine);
let should_compact = self.split_lte(&now);

self.last_compaction_time = Some(now);
// if a full arrangement is not needed, we can just discard everything before and including now
if !self.full_arrangement {
return Ok(None);
Expand Down Expand Up @@ -221,18 +295,62 @@ impl Arrangement {
}

/// get the updates of the arrangement from the given range of time
pub fn get_updates_in_range<R: std::ops::RangeBounds<Timestamp>>(
pub fn get_updates_in_range<R: std::ops::RangeBounds<Timestamp> + Clone>(
&self,
range: R,
) -> Vec<KeyValDiffRow> {
let mut result = vec![];
for (_ts, batch) in self.spine.range(range) {
for (key, updates) in batch.clone() {
for (val, ts, diff) in updates {
result.push(((key.clone(), val), ts, diff));
// three part:
// 1.the starting batch with first key >= range.start, which may contain updates that not in range
// 2. the batches with key in range
// 3. the last batch with first key > range.end, which may contain updates that are in range
let mut is_first = true;
for (_ts, batch) in self.spine.range(range.clone()) {
if is_first {
for (key, updates) in batch {
let iter = updates
.iter()
.filter(|(_val, ts, _diff)| range.contains(ts))
.map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff));
result.extend(iter);
}
is_first = false;
} else {
for (key, updates) in batch.clone() {
result.extend(
updates
.iter()
.map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)),
);
}
}
}

// deal with boundary include start and end
// and for the next batch with upper_bound >= range.end
// we need to search for updates within range
let neg_bound = match range.end_bound() {
Bound::Included(b) => {
// if boundary is aligned, the last batch in range actually cover the full range
// then there will be no further keys we need in the next batch
if self.spine.contains_key(b) {
return result;
}
Bound::Excluded(*b)
}
Bound::Excluded(b) => Bound::Included(*b),
Bound::Unbounded => return result,
};
let search_range = (neg_bound, Bound::Unbounded);
if let Some(last_batch) = self.spine.range(search_range).next() {
for (key, updates) in last_batch.1 {
let iter = updates
.iter()
.filter(|(_val, ts, _diff)| range.contains(ts))
.map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff));
result.extend(iter);
}
};
result
}

Expand Down Expand Up @@ -260,22 +378,51 @@ impl Arrangement {
/// get current state of things
/// useful for query existing keys(i.e. reduce and join operator need to query existing state)
pub fn get(&self, now: Timestamp, key: &Row) -> Option<(Row, Timestamp, Diff)> {
if self
.spine
.first_key_value()
.map(|(ts, _)| *ts >= now)
.unwrap_or(false)
if self.full_arrangement
&& self
.spine
.first_key_value()
.map(|(ts, _)| *ts >= now)
.unwrap_or(false)
{
self.spine
.first_key_value()
.and_then(|(_ts, batch)| batch.get(key).and_then(|v| v.first()).cloned())
} else {
// check keys <= now to know current value
let mut final_val = None;
for (_ts, batch) in self.spine.range(..=now) {

let with_extra_batch = {
let unaligned = self.spine.range(..=now);
if unaligned
.clone()
.last()
.map(|(ts, _)| *ts == now)
.unwrap_or(false)
{
// this extra chain is there just to make type the same
unaligned.chain(None)
} else {
// if the last key is not equal to now, then we need to include the next batch
// because we know last batch key < now < next batch key
// therefore next batch may contain updates that we want
unaligned.chain(
self.spine
.range((Bound::Excluded(now), Bound::Unbounded))
.next(),
)
}
};
for (ts, batch) in with_extra_batch {
if let Some(new_rows) = batch.get(key).map(|v| v.iter()) {
for new_row in new_rows {
final_val = compact_diff_row(final_val, new_row);
if *ts <= now {
for new_row in new_rows {
final_val = compact_diff_row(final_val, new_row);
}
} else {
for new_row in new_rows.filter(|new_row| new_row.1 <= now) {
final_val = compact_diff_row(final_val, new_row);
}
}
}
}
Expand Down Expand Up @@ -530,4 +677,108 @@ mod test {
assert_eq!(arr.get(12, &Row::new(vec![1i64.into()])), None);
}
}

/// test if split_lte get ranges that are not aligned with batch boundaries
/// this split_lte can correctly retrieve all updates in the range, including updates that are in the batches
/// near the boundary of input range
#[test]
fn test_split_off() {
let mut arr = Arrangement::new();
// manually create batch ..=1 and 2..=3
arr.spine.insert(1, Default::default());
arr.spine.insert(3, Default::default());
arr.apply_updates(
2,
vec![((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 2, 1)],
)
.unwrap();
// updates falls into the range of 2..=3
let mut arr1 = arr.clone();
{
assert_eq!(arr.get_next_update_time(&1), Some(2));
// split expect to take batch ..=1 and create a new batch 2..=2(which contain update)
let split = &arr.split_lte(&2);
assert_eq!(split.len(), 2);
assert_eq!(split[&2].len(), 1);
let _ = &arr.split_lte(&3);
assert_eq!(arr.get_next_update_time(&1), None);
}
{
// take all updates with timestamp <=1, will get no updates
let split = &arr1.split_lte(&1);
assert_eq!(split.len(), 1);
}
}

/// test if get ranges is not aligned with boundary of batch,
/// whether can get correct result
#[test]
fn test_get_by_range() {
let mut arr = Arrangement::new();

// will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch
// TODO(discord9): manually set batch
let updates: Vec<KeyValDiffRow> = vec![
((Row::new(vec![1i64.into()]), Row::empty()), 2, 1),
((Row::new(vec![1i64.into()]), Row::empty()), 1, 1),
((Row::new(vec![2i64.into()]), Row::empty()), 4, 1),
((Row::new(vec![3i64.into()]), Row::empty()), 3, 1),
((Row::new(vec![3i64.into()]), Row::empty()), 6, 1),
((Row::new(vec![1i64.into()]), Row::empty()), 5, 1),
];
arr.apply_updates(0, updates).unwrap();
assert_eq!(
arr.get_updates_in_range(2..=5),
vec![
((Row::new(vec![1i64.into()]), Row::empty()), 2, 1),
((Row::new(vec![2i64.into()]), Row::empty()), 4, 1),
((Row::new(vec![3i64.into()]), Row::empty()), 3, 1),
((Row::new(vec![1i64.into()]), Row::empty()), 5, 1),
]
);
}

/// test if get with range unaligned with batch boundary
/// can get correct result
#[test]
fn test_get_unaligned() {
let mut arr = Arrangement::new();

// will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch
// TODO(discord9): manually set batch
let key = Row::new(vec![1i64.into()]);
let updates: Vec<KeyValDiffRow> = vec![
((key.clone(), Row::new(vec![1i64.into()])), 2, 1),
((key.clone(), Row::new(vec![2i64.into()])), 1, 1),
((key.clone(), Row::new(vec![3i64.into()])), 4, 1),
((key.clone(), Row::new(vec![4i64.into()])), 3, 1),
((key.clone(), Row::new(vec![5i64.into()])), 6, 1),
((key.clone(), Row::new(vec![6i64.into()])), 5, 1),
];
arr.apply_updates(0, updates).unwrap();
// aligned with batch boundary
assert_eq!(arr.get(2, &key), Some((Row::new(vec![1i64.into()]), 2, 1)));
// unaligned with batch boundary
assert_eq!(arr.get(3, &key), Some((Row::new(vec![4i64.into()]), 3, 1)));
}

/// test if out of order updates can be sorted correctly
#[test]
fn test_out_of_order_apply_updates() {
let mut arr = Arrangement::new();

let key = Row::new(vec![1i64.into()]);
let updates: Vec<KeyValDiffRow> = vec![
((key.clone(), Row::new(vec![5i64.into()])), 6, 1),
((key.clone(), Row::new(vec![2i64.into()])), 2, -1),
((key.clone(), Row::new(vec![1i64.into()])), 2, 1),
((key.clone(), Row::new(vec![2i64.into()])), 1, 1),
((key.clone(), Row::new(vec![3i64.into()])), 4, 1),
((key.clone(), Row::new(vec![4i64.into()])), 3, 1),
((key.clone(), Row::new(vec![6i64.into()])), 5, 1),
];
arr.apply_updates(0, updates.clone()).unwrap();
let sorted = updates.iter().sorted_by_key(|r| r.1).cloned().collect_vec();
assert_eq!(arr.get_updates_in_range(1..7), sorted);
}
}