Skip to content

Commit

Permalink
fix(stream): fix inner interval join (risingwavelabs#9071)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Apr 10, 2023
1 parent ed5af28 commit 10190e8
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 7 deletions.
21 changes: 20 additions & 1 deletion src/frontend/planner_test/tests/testdata/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.ts) }
└─StreamTableScan { table: t2, columns: [t2.ts, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: band hash join
- name: interval join(left outer join)
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
Expand All @@ -95,6 +95,25 @@
└─StreamExchange { dist: HashShard(t2.v1) }
└─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, (AtTimeZone((AtTimeZone(t2.ts, 'UTC':Varchar) + '00:00:00':Interval), 'UTC':Varchar) + '00:00:01':Interval) as $expr2, t2._row_id], output_watermarks: [t2.ts, $expr2] }
└─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: interval join (inner join)
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.ts as t2_ts, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1 join t2 on (t1.v1 = t2.v1 and (t1.ts >= t2.ts + INTERVAL '1' SECOND) and (t2.ts >= t1.ts + INTERVAL '1' SECOND));
logical_plan: |
LogicalProject { exprs: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v1) AND (t1.ts >= (t2.ts + '00:00:01':Interval)) AND (t2.ts >= (t1.ts + '00:00:01':Interval)), output: all }
├─LogicalScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id] }
└─LogicalScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id] }
stream_plan: |
StreamMaterialize { columns: [t1_ts, t1_v1, t1_v2, t2_ts, t2_v1, t2_v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, t1_v1], pk_columns: [t1._row_id, t2._row_id, t1_v1], pk_conflict: "NoCheck", watermark_columns: [t1_ts, t2_ts] }
└─StreamIntervalJoin { type: Inner, predicate: t1.v1 = t2.v1 AND (t1.ts >= $expr2) AND ($expr1 <= t2.ts), conditions_to_clean_left_state_table: (t1.ts >= $expr2), conditions_to_clean_right_state_table: ($expr1 <= t2.ts), output_watermarks: [t1.ts, t2.ts], output: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
| └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, (AtTimeZone((AtTimeZone(t1.ts, 'UTC':Varchar) + '00:00:00':Interval), 'UTC':Varchar) + '00:00:01':Interval) as $expr1, t1._row_id], output_watermarks: [t1.ts, $expr1] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.v1) }
└─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, (AtTimeZone((AtTimeZone(t2.ts, 'UTC':Varchar) + '00:00:00':Interval), 'UTC':Varchar) + '00:00:01':Interval) as $expr2, t2._row_id], output_watermarks: [t2.ts, $expr2] }
└─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: union all
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ macro_rules! impl_has_variant {
impl_has_variant! {InputRef, Literal, FunctionCall, AggCall, Subquery, TableFunction, WindowFunction}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct InequalityInputPair {
pub struct InequalityInputPair {
/// Input index of greater side of inequality.
pub(crate) key_required_larger: usize,
/// Input index of less side of inequality.
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,11 @@ impl LogicalJoin {
// For inner joins, pull non-equal conditions to a filter operator on top of it
// We do so as the filter operator can apply the non-equal condition batch-wise (vectorized)
// as opposed to the HashJoin, which applies the condition row-wise.
let pull_filter = self.join_type() == JoinType::Inner && predicate.has_non_eq();

let stream_hash_join = StreamHashJoin::new(logical_join.core.clone(), predicate.clone());
let pull_filter = self.join_type() == JoinType::Inner
&& stream_hash_join.eq_join_predicate().has_non_eq()
&& stream_hash_join.inequality_pairs().is_empty();
if pull_filter {
let default_indices = (0..self.internal_column_num()).collect::<Vec<_>>();

Expand Down Expand Up @@ -952,7 +956,7 @@ impl LogicalJoin {
Ok(plan)
}
} else {
Ok(StreamHashJoin::new(logical_join.core, predicate).into())
Ok(stream_hash_join.into())
}
}

Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,20 @@ impl StreamHashJoin {
assert_eq!(dk_indices_in_jk.len(), left_dk_indices.len());
dk_indices_in_jk
}

pub fn inequality_pairs(&self) -> &Vec<(bool, InequalityInputPair)> {
&self.inequality_pairs
}
}

impl fmt::Display for StreamHashJoin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut builder = if self.is_append_only {
f.debug_struct("StreamAppendOnlyHashJoin")
} else if self.clean_left_state_conjunction_idx.is_some()
let mut builder = if self.clean_left_state_conjunction_idx.is_some()
&& self.clean_right_state_conjunction_idx.is_some()
{
f.debug_struct("StreamIntervalJoin")
} else if self.is_append_only {
f.debug_struct("StreamAppendOnlyHashJoin")
} else {
f.debug_struct("StreamHashJoin")
};
Expand Down

0 comments on commit 10190e8

Please sign in to comment.