From 5a0dcc6c9f404a3e101448f98f2c3eab424ae4ec Mon Sep 17 00:00:00 2001 From: Max Burke Date: Mon, 6 Mar 2023 10:00:42 -0800 Subject: [PATCH 1/4] Pass booleans by value instead of by reference --- .../src/physical_optimizer/dist_enforcement.rs | 4 ++-- .../core/src/physical_plan/joins/hash_join.rs | 18 +++++++++--------- .../physical_plan/joins/symmetric_hash_join.rs | 18 +++++++++--------- datafusion/core/src/physical_plan/planner.rs | 4 +++- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index ae56b70bae4b..26d685a9072f 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -171,7 +171,7 @@ fn adjust_input_keys_ordering( filter.clone(), join_type, PartitionMode::Partitioned, - null_equals_null, + *null_equals_null, )?) as Arc) }; Ok(Some(reorder_partitioned_join_keys( @@ -606,7 +606,7 @@ fn reorder_join_keys_to_inputs( filter.clone(), join_type, PartitionMode::Partitioned, - null_equals_null, + *null_equals_null, )?)) } else { Ok(plan) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 7752e1ebf4e3..1c2de5a149a0 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -201,7 +201,7 @@ impl HashJoinExec { filter: Option, join_type: &JoinType, partition_mode: PartitionMode, - null_equals_null: &bool, + null_equals_null: bool, ) -> Result { let left_schema = left.schema(); let right_schema = right.schema(); @@ -230,7 +230,7 @@ impl HashJoinExec { mode: partition_mode, metrics: ExecutionPlanMetricsSet::new(), column_indices, - null_equals_null: *null_equals_null, + null_equals_null, }) } @@ -265,8 +265,8 @@ impl HashJoinExec { } /// Get null_equals_null - pub fn null_equals_null(&self) -> &bool { - &self.null_equals_null + pub fn null_equals_null(&self) -> bool { + self.null_equals_null } } @@ -402,7 +402,7 @@ impl ExecutionPlan for HashJoinExec { self.filter.clone(), &self.join_type, self.mode, - &self.null_equals_null, + self.null_equals_null, )?)) } @@ -691,7 +691,7 @@ pub fn build_join_indices( on_probe: &[Column], filter: Option<&JoinFilter>, random_state: &RandomState, - null_equals_null: &bool, + null_equals_null: bool, hashes_buffer: &mut Vec, offset: Option, build_side: JoinSide, @@ -762,7 +762,7 @@ pub fn build_equal_condition_join_indices( build_on: &[Column], probe_on: &[Column], random_state: &RandomState, - null_equals_null: &bool, + null_equals_null: bool, hashes_buffer: &mut Vec, offset: Option, ) -> Result<(UInt64Array, UInt32Array)> { @@ -804,7 +804,7 @@ pub fn build_equal_condition_join_indices( row, &build_join_values, &keys_values, - *null_equals_null, + null_equals_null, )? { build_indices.append(offset_build_index as u64); probe_indices.append(row as u32); @@ -1207,7 +1207,7 @@ impl HashJoinStream { &self.on_right, self.filter.as_ref(), &self.random_state, - &self.null_equals_null, + self.null_equals_null, &mut hashes_buffer, None, JoinSide::Left, diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index c377be6d1ea0..e58362a05744 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -249,7 +249,7 @@ impl SymmetricHashJoinExec { on: JoinOn, filter: JoinFilter, join_type: &JoinType, - null_equals_null: &bool, + null_equals_null: bool, ) -> Result { let left_schema = left.schema(); let right_schema = right.schema(); @@ -349,7 +349,7 @@ impl SymmetricHashJoinExec { random_state, metrics: ExecutionPlanMetricsSet::new(), column_indices, - null_equals_null: *null_equals_null, + null_equals_null: null_equals_null, }) } @@ -379,8 +379,8 @@ impl SymmetricHashJoinExec { } /// Get null_equals_null - pub fn null_equals_null(&self) -> &bool { - &self.null_equals_null + pub fn null_equals_null(&self) -> bool { + self.null_equals_null } } @@ -472,7 +472,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { self.on.clone(), self.filter.clone(), &self.join_type, - &self.null_equals_null, + self.null_equals_null, )?)) } @@ -1054,7 +1054,7 @@ impl OneSideHashJoiner { probe_offset: usize, column_indices: &[ColumnIndex], random_state: &RandomState, - null_equals_null: &bool, + null_equals_null: bool, ) -> Result> { if self.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 { return Ok(Some(RecordBatch::new_empty(schema.clone()))); @@ -1371,7 +1371,7 @@ impl SymmetricHashJoinStream { probe_hash_joiner.offset, &self.column_indices, &self.random_state, - &self.null_equals_null, + self.null_equals_null, )?; // Increment the offset for the probe hash joiner: probe_hash_joiner.offset += probe_batch.num_rows(); @@ -1504,7 +1504,7 @@ mod tests { on, filter, join_type, - &null_equals_null, + null_equals_null, )?; let mut batches = vec![]; @@ -1551,7 +1551,7 @@ mod tests { Some(filter), join_type, PartitionMode::Partitioned, - &null_equals_null, + null_equals_null, )?; let mut batches = vec![]; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 0a1ac4054f3c..5495a7bd71cc 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -860,6 +860,8 @@ impl DefaultPhysicalPlanner { schema: join_schema, .. }) => { + let null_equals_null = *null_equals_null; + // If join has expression equijoin keys, add physical projecton. let has_expr_join_key = keys.iter().any(|(l, r)| { !(matches!(l, Expr::Column(_)) @@ -1030,7 +1032,7 @@ impl DefaultPhysicalPlanner { join_on, *join_type, vec![SortOptions::default(); join_on_len], - *null_equals_null, + null_equals_null, )?)) } } else if session_state.config().target_partitions() > 1 From 6d898225dc03dedd8f0cfefc34839622170df84b Mon Sep 17 00:00:00 2001 From: Max Burke Date: Mon, 6 Mar 2023 10:37:18 -0800 Subject: [PATCH 2/4] fix build error --- datafusion/proto/src/physical_plan/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index b9b23a3ced55..5d28ef6c97a8 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -536,7 +536,7 @@ impl AsExecutionPlan for PhysicalPlanNode { filter, &join_type.into(), partition_mode, - &hashjoin.null_equals_null, + hashjoin.null_equals_null, )?)) } PhysicalPlanType::Union(union) => { @@ -827,7 +827,7 @@ impl AsExecutionPlan for PhysicalPlanNode { on, join_type: join_type.into(), partition_mode: partition_mode.into(), - null_equals_null: *exec.null_equals_null(), + null_equals_null: exec.null_equals_null(), filter, }, ))), From 20dbe321e0a08df6af4d8aba42e4ab1a5e41593e Mon Sep 17 00:00:00 2001 From: Max Burke Date: Wed, 8 Mar 2023 09:45:52 -0800 Subject: [PATCH 3/4] Fix tests --- .../src/physical_optimizer/dist_enforcement.rs | 2 +- .../core/src/physical_optimizer/join_selection.rs | 14 +++++++------- .../core/src/physical_optimizer/pipeline_fixer.rs | 2 +- .../core/src/physical_plan/joins/hash_join.rs | 8 ++++---- .../src/physical_plan/joins/symmetric_hash_join.rs | 2 +- datafusion/core/tests/join_fuzz.rs | 2 +- datafusion/proto/src/physical_plan/mod.rs | 2 +- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index 26d685a9072f..919273af7467 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -1086,7 +1086,7 @@ mod tests { None, join_type, PartitionMode::Partitioned, - &false, + false, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 2589fa625e31..d9787881f161 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -488,7 +488,7 @@ mod tests { None, &JoinType::Left, PartitionMode::CollectLeft, - &false, + false, ) .unwrap(); @@ -536,7 +536,7 @@ mod tests { None, &JoinType::Left, PartitionMode::CollectLeft, - &false, + false, ) .unwrap(); @@ -587,7 +587,7 @@ mod tests { None, &join_type, PartitionMode::Partitioned, - &false, + false, ) .unwrap(); @@ -652,7 +652,7 @@ mod tests { None, &JoinType::Inner, PartitionMode::CollectLeft, - &false, + false, ) .unwrap(); let child_schema = child_join.schema(); @@ -668,7 +668,7 @@ mod tests { None, &JoinType::Left, PartitionMode::CollectLeft, - &false, + false, ) .unwrap(); @@ -705,7 +705,7 @@ mod tests { None, &JoinType::Inner, PartitionMode::CollectLeft, - &false, + false, ) .unwrap(); @@ -930,7 +930,7 @@ mod tests { None, &JoinType::Inner, PartitionMode::Auto, - &false, + false, ) .unwrap(); diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs index 99bcf264fadd..7e85f0c0d5f7 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs @@ -710,7 +710,7 @@ mod hash_join_tests { None, &t.initial_join_type, t.initial_mode, - &false, + false, )?; let initial_hash_join_state = PipelineStatePropagator { diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 1c2de5a149a0..021f41ef1f76 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -1356,7 +1356,7 @@ mod tests { None, join_type, PartitionMode::CollectLeft, - &null_equals_null, + null_equals_null, ) } @@ -1375,7 +1375,7 @@ mod tests { Some(filter), join_type, PartitionMode::CollectLeft, - &null_equals_null, + null_equals_null, ) } @@ -1429,7 +1429,7 @@ mod tests { None, join_type, PartitionMode::Partitioned, - &null_equals_null, + null_equals_null, )?; let columns = columns(&join.schema()); @@ -2680,7 +2680,7 @@ mod tests { &[Column::new("a", 0)], &[Column::new("a", 0)], &random_state, - &false, + false, &mut vec![0; right.num_rows()], None, )?; diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index e58362a05744..8ce13c214654 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -2465,7 +2465,7 @@ mod tests { right_side_joiner.offset, &join_column_indices, &random_state, - &false, + false, )?; assert_eq!(left_side_joiner.visited_rows.is_empty(), should_be_empty); Ok(()) diff --git a/datafusion/core/tests/join_fuzz.rs b/datafusion/core/tests/join_fuzz.rs index 40966843cec4..48e3da188678 100644 --- a/datafusion/core/tests/join_fuzz.rs +++ b/datafusion/core/tests/join_fuzz.rs @@ -153,7 +153,7 @@ async fn run_join_test( None, &join_type, PartitionMode::Partitioned, - &false, + false, ) .unwrap(), ); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 5d28ef6c97a8..59aa98756b15 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1367,7 +1367,7 @@ mod roundtrip_tests { None, join_type, *partition_mode, - &false, + false, )?))?; } } From 7c6a9dfa56edc8132166a79cedf612c718cdc986 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Wed, 8 Mar 2023 11:25:38 -0800 Subject: [PATCH 4/4] clippy --- datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 8ce13c214654..3af983d8f06a 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -349,7 +349,7 @@ impl SymmetricHashJoinExec { random_state, metrics: ExecutionPlanMetricsSet::new(), column_indices, - null_equals_null: null_equals_null, + null_equals_null, }) }