Skip to content

Commit

Permalink
fix(flow): Arrange get range with batch unaligned (#3552)
Browse files Browse the repository at this point in the history
* fix: Arrange get range with batch unaligned

* chore: per review

* refactor: sort at apply_updates
  • Loading branch information
discord9 committed Mar 22, 2024
1 parent c9ac72e commit 9f020aa
Showing 1 changed file with 269 additions and 18 deletions.
287 changes: 269 additions & 18 deletions src/flow/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet};
use std::ops::Bound;
use std::sync::Arc;

use itertools::Itertools;
Expand All @@ -24,11 +25,12 @@ use crate::expr::error::InternalSnafu;
use crate::expr::{EvalError, ScalarExpr};
use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow, Row, Timestamp};

pub type Batch = BTreeMap<Row, SmallVec<[DiffRow; 2]>>;
pub type Spine = BTreeMap<Timestamp, Batch>;

/// Determine when should a key expire according to it's event timestamp in key,
/// if a key is expired, any future updates to it should be ignored
/// Note that key is expired by it's event timestamp(contained in the key), not by the time it's inserted(system timestamp)
///
/// TODO(discord9): find a better way to handle key expiration, like write to disk or something instead of throw away
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
pub struct KeyExpiryManager {
/// a map from event timestamp to key, used for expire keys
Expand Down Expand Up @@ -121,13 +123,16 @@ pub struct Arrangement {
/// And for consolidated batch(i.e. btach representing now), there should be only one update for each key with `diff==1`
///
/// And since most time a key gots updated by first delete then insert, small vec with size of 2 make sense
spine: BTreeMap<Timestamp, BTreeMap<Row, SmallVec<[DiffRow; 2]>>>,
/// TODO: batch size balancing?
spine: Spine,
/// if set to false, will not update current value of the arrangement, useful for case like `map -> arrange -> reduce`
full_arrangement: bool,
/// flag to mark that this arrangement haven't been written to, so that it can be cloned and shared
is_written: bool,
/// manage the expire state of the arrangement
expire_state: Option<KeyExpiryManager>,
/// the time that the last compaction happened, also know as current time
last_compaction_time: Option<Timestamp>,
}

impl Arrangement {
Expand All @@ -137,6 +142,7 @@ impl Arrangement {
full_arrangement: false,
is_written: false,
expire_state: None,
last_compaction_time: None,
}
}

Expand All @@ -160,6 +166,7 @@ impl Arrangement {
continue;
}
}

// the first batch with key that's greater or equal to ts
let batch = if let Some((_, batch)) = self.spine.range_mut(ts..).next() {
batch
Expand All @@ -171,20 +178,87 @@ impl Arrangement {
{
let key_updates = batch.entry(key).or_insert(smallvec![]);
key_updates.push((val, ts, diff));
// a stable sort make updates sort in order of insertion
// without changing the order of updates within same tick
key_updates.sort_by_key(|r| r.1);
}
}
Ok(max_late_by)
}

/// find out the time of next update in the future
/// that is the next update with `timestamp > now`
pub fn get_next_update_time(&self, now: &Timestamp) -> Option<Timestamp> {
// iter over batches that only have updates of `timestamp>now` and find the first non empty batch, then get the minimum timestamp in that batch
let next_batches = self.spine.range((Bound::Excluded(now), Bound::Unbounded));
for (_ts, batch) in next_batches {
let min_ts = batch
.iter()
.flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts))
.min();
if let Some(min_ts) = min_ts {
return Some(min_ts);
} else {
continue;
}
}
// all batches are empty, return now
None
}

/// get the last compaction time
pub fn get_compaction(&self) -> Option<Timestamp> {
self.last_compaction_time
}

/// split spine off at `now`, and return the spine that's before `now`(including `now`)
fn split_lte(&mut self, now: &Timestamp) -> Spine {
let mut before = self.spine.split_off(&(now + 1));
std::mem::swap(&mut before, &mut self.spine);

// if before's last key == now, then all the keys we needed are found
if before
.last_key_value()
.map(|(k, _v)| *k == *now)
.unwrap_or(false)
{
return before;
}

// also need to move all keys from the first batch in spine with timestamp<=now to before
// we know that all remaining keys to be split off are last key < key <= now, we will make them into a new batch
if let Some(mut first_batch) = self.spine.first_entry() {
let mut new_batch: Batch = Default::default();
// remove all keys with val of empty vec
first_batch.get_mut().retain(|key, updates| {
// remove keys <= now from updates
updates.retain(|(val, ts, diff)| {
if *ts <= *now {
new_batch.entry(key.clone()).or_insert(smallvec![]).push((
val.clone(),
*ts,
*diff,
));
}
*ts > *now
});
!updates.is_empty()
});

before.entry(*now).or_default().extend(new_batch);
}
before
}

/// advance time to `now` and consolidate all older(`now` included) updates to the first key
///
/// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired
pub fn set_compaction(&mut self, now: Timestamp) -> Result<Option<Duration>, EvalError> {
let mut max_late_by: Option<Duration> = None;

let mut should_compact = self.spine.split_off(&(now + 1));
std::mem::swap(&mut should_compact, &mut self.spine);
let should_compact = self.split_lte(&now);

self.last_compaction_time = Some(now);
// if a full arrangement is not needed, we can just discard everything before and including now
if !self.full_arrangement {
return Ok(None);
Expand Down Expand Up @@ -221,18 +295,62 @@ impl Arrangement {
}

/// get the updates of the arrangement from the given range of time
pub fn get_updates_in_range<R: std::ops::RangeBounds<Timestamp>>(
pub fn get_updates_in_range<R: std::ops::RangeBounds<Timestamp> + Clone>(
&self,
range: R,
) -> Vec<KeyValDiffRow> {
let mut result = vec![];
for (_ts, batch) in self.spine.range(range) {
for (key, updates) in batch.clone() {
for (val, ts, diff) in updates {
result.push(((key.clone(), val), ts, diff));
// three part:
// 1.the starting batch with first key >= range.start, which may contain updates that not in range
// 2. the batches with key in range
// 3. the last batch with first key > range.end, which may contain updates that are in range
let mut is_first = true;
for (_ts, batch) in self.spine.range(range.clone()) {
if is_first {
for (key, updates) in batch {
let iter = updates
.iter()
.filter(|(_val, ts, _diff)| range.contains(ts))
.map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff));
result.extend(iter);
}
is_first = false;
} else {
for (key, updates) in batch.clone() {
result.extend(
updates
.iter()
.map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)),
);
}
}
}

// deal with boundary include start and end
// and for the next batch with upper_bound >= range.end
// we need to search for updates within range
let neg_bound = match range.end_bound() {
Bound::Included(b) => {
// if boundary is aligned, the last batch in range actually cover the full range
// then there will be no further keys we need in the next batch
if self.spine.contains_key(b) {
return result;
}
Bound::Excluded(*b)
}
Bound::Excluded(b) => Bound::Included(*b),
Bound::Unbounded => return result,
};
let search_range = (neg_bound, Bound::Unbounded);
if let Some(last_batch) = self.spine.range(search_range).next() {
for (key, updates) in last_batch.1 {
let iter = updates
.iter()
.filter(|(_val, ts, _diff)| range.contains(ts))
.map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff));
result.extend(iter);
}
};
result
}

Expand Down Expand Up @@ -260,22 +378,51 @@ impl Arrangement {
/// get current state of things
/// useful for query existing keys(i.e. reduce and join operator need to query existing state)
pub fn get(&self, now: Timestamp, key: &Row) -> Option<(Row, Timestamp, Diff)> {
if self
.spine
.first_key_value()
.map(|(ts, _)| *ts >= now)
.unwrap_or(false)
if self.full_arrangement
&& self
.spine
.first_key_value()
.map(|(ts, _)| *ts >= now)
.unwrap_or(false)
{
self.spine
.first_key_value()
.and_then(|(_ts, batch)| batch.get(key).and_then(|v| v.first()).cloned())
} else {
// check keys <= now to know current value
let mut final_val = None;
for (_ts, batch) in self.spine.range(..=now) {

let with_extra_batch = {
let unaligned = self.spine.range(..=now);
if unaligned
.clone()
.last()
.map(|(ts, _)| *ts == now)
.unwrap_or(false)
{
// this extra chain is there just to make type the same
unaligned.chain(None)
} else {
// if the last key is not equal to now, then we need to include the next batch
// because we know last batch key < now < next batch key
// therefore next batch may contain updates that we want
unaligned.chain(
self.spine
.range((Bound::Excluded(now), Bound::Unbounded))
.next(),
)
}
};
for (ts, batch) in with_extra_batch {
if let Some(new_rows) = batch.get(key).map(|v| v.iter()) {
for new_row in new_rows {
final_val = compact_diff_row(final_val, new_row);
if *ts <= now {
for new_row in new_rows {
final_val = compact_diff_row(final_val, new_row);
}
} else {
for new_row in new_rows.filter(|new_row| new_row.1 <= now) {
final_val = compact_diff_row(final_val, new_row);
}
}
}
}
Expand Down Expand Up @@ -530,4 +677,108 @@ mod test {
assert_eq!(arr.get(12, &Row::new(vec![1i64.into()])), None);
}
}

/// test if split_lte get ranges that are not aligned with batch boundaries
/// this split_lte can correctly retrieve all updates in the range, including updates that are in the batches
/// near the boundary of input range
#[test]
fn test_split_off() {
let mut arr = Arrangement::new();
// manually create batch ..=1 and 2..=3
arr.spine.insert(1, Default::default());
arr.spine.insert(3, Default::default());
arr.apply_updates(
2,
vec![((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 2, 1)],
)
.unwrap();
// updates falls into the range of 2..=3
let mut arr1 = arr.clone();
{
assert_eq!(arr.get_next_update_time(&1), Some(2));
// split expect to take batch ..=1 and create a new batch 2..=2(which contain update)
let split = &arr.split_lte(&2);
assert_eq!(split.len(), 2);
assert_eq!(split[&2].len(), 1);
let _ = &arr.split_lte(&3);
assert_eq!(arr.get_next_update_time(&1), None);
}
{
// take all updates with timestamp <=1, will get no updates
let split = &arr1.split_lte(&1);
assert_eq!(split.len(), 1);
}
}

/// test if get ranges is not aligned with boundary of batch,
/// whether can get correct result
#[test]
fn test_get_by_range() {
let mut arr = Arrangement::new();

// will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch
// TODO(discord9): manually set batch
let updates: Vec<KeyValDiffRow> = vec![
((Row::new(vec![1i64.into()]), Row::empty()), 2, 1),
((Row::new(vec![1i64.into()]), Row::empty()), 1, 1),
((Row::new(vec![2i64.into()]), Row::empty()), 4, 1),
((Row::new(vec![3i64.into()]), Row::empty()), 3, 1),
((Row::new(vec![3i64.into()]), Row::empty()), 6, 1),
((Row::new(vec![1i64.into()]), Row::empty()), 5, 1),
];
arr.apply_updates(0, updates).unwrap();
assert_eq!(
arr.get_updates_in_range(2..=5),
vec![
((Row::new(vec![1i64.into()]), Row::empty()), 2, 1),
((Row::new(vec![2i64.into()]), Row::empty()), 4, 1),
((Row::new(vec![3i64.into()]), Row::empty()), 3, 1),
((Row::new(vec![1i64.into()]), Row::empty()), 5, 1),
]
);
}

/// test if get with range unaligned with batch boundary
/// can get correct result
#[test]
fn test_get_unaligned() {
let mut arr = Arrangement::new();

// will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch
// TODO(discord9): manually set batch
let key = Row::new(vec![1i64.into()]);
let updates: Vec<KeyValDiffRow> = vec![
((key.clone(), Row::new(vec![1i64.into()])), 2, 1),
((key.clone(), Row::new(vec![2i64.into()])), 1, 1),
((key.clone(), Row::new(vec![3i64.into()])), 4, 1),
((key.clone(), Row::new(vec![4i64.into()])), 3, 1),
((key.clone(), Row::new(vec![5i64.into()])), 6, 1),
((key.clone(), Row::new(vec![6i64.into()])), 5, 1),
];
arr.apply_updates(0, updates).unwrap();
// aligned with batch boundary
assert_eq!(arr.get(2, &key), Some((Row::new(vec![1i64.into()]), 2, 1)));
// unaligned with batch boundary
assert_eq!(arr.get(3, &key), Some((Row::new(vec![4i64.into()]), 3, 1)));
}

/// test if out of order updates can be sorted correctly
#[test]
fn test_out_of_order_apply_updates() {
let mut arr = Arrangement::new();

let key = Row::new(vec![1i64.into()]);
let updates: Vec<KeyValDiffRow> = vec![
((key.clone(), Row::new(vec![5i64.into()])), 6, 1),
((key.clone(), Row::new(vec![2i64.into()])), 2, -1),
((key.clone(), Row::new(vec![1i64.into()])), 2, 1),
((key.clone(), Row::new(vec![2i64.into()])), 1, 1),
((key.clone(), Row::new(vec![3i64.into()])), 4, 1),
((key.clone(), Row::new(vec![4i64.into()])), 3, 1),
((key.clone(), Row::new(vec![6i64.into()])), 5, 1),
];
arr.apply_updates(0, updates.clone()).unwrap();
let sorted = updates.iter().sorted_by_key(|r| r.1).cloned().collect_vec();
assert_eq!(arr.get_updates_in_range(1..7), sorted);
}
}

0 comments on commit 9f020aa

Please sign in to comment.