Skip to content

Commit

Permalink
fix(stream): Better cache handling for temporal join. (risingwavelabs…
Browse files Browse the repository at this point in the history
  • Loading branch information
wsx-ucb committed Mar 31, 2023
1 parent d687d71 commit ad53077
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions src/stream/src/executor/temporal_join.rs
Expand Up @@ -31,7 +31,7 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::StateStore;

use super::{Barrier, Executor, Message, MessageStream, StreamExecutorError, StreamExecutorResult};
use crate::cache::{new_with_hasher_in, ManagedLruCache};
use crate::cache::{cache_may_stale, new_with_hasher_in, ExecutorCache};
use crate::common::StreamChunkBuilder;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{ActorContextRef, BoxedExecutor, JoinType, JoinTypePrimitive, PkIndices};
Expand Down Expand Up @@ -59,7 +59,7 @@ pub struct TemporalJoinExecutor<S: StateStore, const T: JoinTypePrimitive> {
struct TemporalSide<S: StateStore> {
source: StorageTable<S>,
table_output_indices: Vec<usize>,
cache: ManagedLruCache<OwnedRow, Option<OwnedRow>, DefaultHasher, SharedStatsAlloc<Global>>,
cache: ExecutorCache<OwnedRow, Option<OwnedRow>, DefaultHasher, SharedStatsAlloc<Global>>,
}

impl<S: StateStore> TemporalSide<S> {
Expand All @@ -83,7 +83,7 @@ impl<S: StateStore> TemporalSide<S> {
})
}

fn update(&mut self, payload: Vec<StreamChunk>, join_keys: &[usize], epoch: u64) {
fn update(&mut self, payload: Vec<StreamChunk>, join_keys: &[usize]) {
payload.iter().flat_map(|c| c.rows()).for_each(|(op, row)| {
let key = row.project(join_keys).into_owned_row();
if let Some(value) = self.cache.get_mut(&key) {
Expand All @@ -93,7 +93,6 @@ impl<S: StateStore> TemporalSide<S> {
};
}
});
self.cache.update_epoch(epoch);
}
}

Expand Down Expand Up @@ -195,7 +194,11 @@ impl<S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor<S, T> {

let alloc = StatsAlloc::new(Global).shared();

let cache = new_with_hasher_in(watermark_epoch, DefaultHasher::default(), alloc);
let cache = ExecutorCache::new(new_with_hasher_in(
watermark_epoch,
DefaultHasher::default(),
alloc,
));

Self {
ctx,
Expand Down Expand Up @@ -277,9 +280,17 @@ impl<S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor<S, T> {
}
}
InternalMessage::Barrier(updates, barrier) => {
self.right_table.cache.evict();
if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) {
let prev_vnodes =
self.right_table.source.update_vnode_bitmap(vnodes.clone());
if cache_may_stale(&prev_vnodes, &vnodes) {
self.right_table.cache.clear();
}
}
self.right_table.cache.update_epoch(barrier.epoch.curr);
self.right_table.update(updates, &self.right_join_keys);
prev_epoch = Some(barrier.epoch.curr);
self.right_table
.update(updates, &self.right_join_keys, barrier.epoch.curr);
yield Message::Barrier(barrier)
}
}
Expand Down

0 comments on commit ad53077

Please sign in to comment.