Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10173: [Rust][DataFusion] Implement support for direct comparison to scalar values #8660

Closed

Conversation

yordan-pavlov
Copy link
Contributor

This PR addresses the inefficient comparison to scalar values, where an array is built with the scalar value repeated, by
changing the return value of expressions from Result<ArrayRef> to Result<ColumnarValue> where ColumnarValue is defined as:

pub enum ColumnarValue {
    /// Array of values
    Array(ArrayRef),
    /// A single value 
    Scalar(ScalarValue)
}

This enables scalar values to be used in comparison operators directly, and for the simple query used in the benchmark ("select f32, f64 from t where f32 >= 250 and f64 > 250") shows approximately 10x performance improvement:

before:
filter_scalar time: [35.733 ms 36.613 ms 37.924 ms]

after:
filter_scalar time: [3.5938 ms 3.6450 ms 3.7035 ms]
change: [-90.048% -89.846% -89.625%] (p = 0.00 < 0.05)

I have also added a benchmark to compare the change in performance when comparing two arrays (using query "select f32, f64 from t where f32 >= f64") and it is negligible:

before:
filter_array time: [11.601 ms 11.656 ms 11.718 ms]

after:
filter_array time: [11.854 ms 11.957 ms 12.070 ms]
change: [+1.8032% +3.6391% +5.5671%] (p = 0.00 < 0.05)

@andygrove @alamb let me know what you think

@github-actions
Copy link

}
(ColumnarValue::Scalar(scalar), ColumnarValue::Array(array)) => {
// if right is literal and left is array - reverse operator and parameters
let result: Result<ArrayRef> = match &self.op {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this block is duplicated for the two match arms and could be moved into a function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question; the code blocks are not exactly the same, there is a small difference; notice how in (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) we have Operator::Lt => binary_array_op_scalar!(array, scalar.clone(), lt), but under (ColumnarValue::Scalar(scalar), ColumnarValue::Array(array)) we have Operator::Lt => binary_array_op_scalar!(array, scalar.clone(), gt);
this is because there is only one version of arrow comparison kernel functions for scalar comparison where the scalar value can only be on one side of the comparison, for example pub fn lt_scalar<T>(left: &PrimitiveArray<T>, right: T::Native) -> Result<BooleanArray>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense. I hadn't looked closely enough to see the differences.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another structure would be to normalize the invocations by finding the array, and the literal and then have a single call site for invoking the comparison

Like turning both array > lit_1 and lit_1 < array into

A = array
lit = lit_
op = >

However this involves changing the comparison ops and I am not sure I can claim the code would be any simpler / potentially less bug prone.

@@ -100,6 +100,30 @@ pub enum Distribution {
SinglePartition,
}

/// Represents the result from an expression
pub enum ColumnarValue {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could push the new ColumnarValue enum down to the core arrow crate since it isn't specific to DataFusion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly; where / how could you see the ColumnarValue enum used in core arrow? also wouldn't ScalarValue need to move as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++ uses a Datum, which is also an enum over Scalar, Array, and a few other things.
We could have a separate module called arrow::scalar, then in the long run we could convert the compute kernels to take Datum, and push the optimisations of "array vs scalar" there

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

It would also simplify the API of many vertical operations (e.g. aggregates), as their input and result would have a common type. I have a branch on which I am doing that, but I have not finish it yet.

(spoiler alert: it is not so easy ^_^)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also went down that rabbit hole over a year ago, yeah it's not easy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think starting with ColumnValue in DataFusion and then hoisting it out into arrow makes a lot of sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @alamb ; I think if we wanted to move ColumnarValue into arrow it would be better to do that in a separate PR after this one has been merged

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that our use of Datum in C++ is far from perfect, it's actually a relatively complex data structure and I've contemplated using something simplified that doesn't have as many non-trivial C++ objects inside it in the internals of function execution

@andygrove
Copy link
Member

@yordan-pavlov I took a quick skim through and this is looking really good! Could you rebase?

@nevi-me nevi-me added the needs-rebase A PR that needs to be rebased by the author label Nov 14, 2020
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this PR -- thanks @yordan-pavlov -- I think we should merge this once we get this rebased and the tests are passing. Let me know if you need any help -- this optimization is directly relevant to work we are doing in my work project.

I am going to try and fire up my TPCH benchmark locally and see if I can get any more performance data

)?))
} else {
Err(ExecutionError::General(format!(
"compute_utf8_op_scalar failed to cast literal value {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"compute_utf8_op_scalar failed to cast literal value {}",
"internal error: compute_utf8_op_scalar failed to cast literal value {}",

The point being that if this code is hit it isn't likely a bug in how someone is using datafusion, it is a bug in datafusion itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an error for this: Internal

}
(ColumnarValue::Scalar(scalar), ColumnarValue::Array(array)) => {
// if right is literal and left is array - reverse operator and parameters
let result: Result<ArrayRef> = match &self.op {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another structure would be to normalize the invocations by finding the array, and the literal and then have a single call site for invoking the comparison

Like turning both array > lit_1 and lit_1 < array into

A = array
lit = lit_
op = >

However this involves changing the comparison ops and I am not sure I can claim the code would be any simpler / potentially less bug prone.

let (left, right) = match (left_value, right_value) {
// if both arrays - extract and continue execution
(ColumnarValue::Array(left), ColumnarValue::Array(right)) => (left, right),
// if both literals - not supported
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine -- we should handle such scalar op scalar things in the planner / optimizer, in my opinion

@@ -1571,24 +1754,6 @@ impl Literal {
}
}

/// Build array containing the same literal value repeated. This is necessary because the Arrow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

let array_to_sort = match values_to_sort {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => {
return Err(ExecutionError::General(format!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, I like this approach -- we should be removing scalar values out of Sort exprs in the planner, not during execution

@@ -100,6 +100,30 @@ pub enum Distribution {
SinglePartition,
}

/// Represents the result from an expression
pub enum ColumnarValue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think starting with ColumnValue in DataFusion and then hoisting it out into arrow makes a lot of sense

self.to_array_of_size(1)
}

/// Converts a scalar value into an 1-row array.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Converts a scalar value into an 1-row array.
/// Converts a scalar value into an array of `size` rows.

@alamb
Copy link
Contributor

alamb commented Nov 14, 2020

I tried to run this code against our simple TPCH Q1 implementation (which is dominated by evaluating expressions against constants), and sadly it hit an error

cd arrow/rust/benchmarks
cargo run --release --bin tpch -- --iterations 3 --path /Users/alamb/Software/tpch_data/SF10-parquet-64 --format parquet --query 1 --batch-size 4096
    Finished release [optimized] target(s) in 0.13s
     Running `/Users/alamb/Software/arrow2/rust/target/release/tpch --iterations 3 --path /Users/alamb/Software/tpch_data/SF10-parquet-64 --format parquet --query 1 --batch-size 4096`
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 2, batch_size: 4096, path: "/Users/alamb/Software/tpch_data/SF10-parquet-64", file_format: "parquet" }

Error: ArrowError(ExternalError(General("Scalar values on left side of operator - are not supported")))

I plan to figure out what is going on and try and fix it tomorrow morning, US Eastern time. As I said this I think this PR is a major step forward and I want to see it merged!

Andrew

@andygrove
Copy link
Member

I would recommend that we implement an optimizer rule to "swap" the order of expressions when we see an unsupported combination such as a scalar on the left. The logic for this rule already exists in this PR. The benefit of having it as an optimizer rule is that it would handle invalid plans created from both the SQL and DataFrame APIs.

@yordan-pavlov
Copy link
Contributor Author

thanks for all the comments, I will try to rebase this evening

@yordan-pavlov yordan-pavlov force-pushed the impl_scalar_expr_results branch 2 times, most recently from 66d8080 to 3e12c10 Compare November 14, 2020 22:39
@yordan-pavlov
Copy link
Contributor Author

I have now rebased the branch on the latest changes from master; will try to review comments tomorrow.

@yordan-pavlov
Copy link
Contributor Author

I tried to run this code against our simple TPCH Q1 implementation (which is dominated by evaluating expressions against constants), and sadly it hit an error

@alamb it might be necessary to fallback to generating an array where the scalar value is repeated, for some operations that do not have a version which accepts a scalar argument

@nevi-me nevi-me removed the needs-rebase A PR that needs to be rebased by the author label Nov 16, 2020
@alamb
Copy link
Contributor

alamb commented Nov 16, 2020

@alamb it might be necessary to fallback to generating an array where the scalar value is repeated, for some operations that do not have a version which accepts a scalar argument

@yordan-pavlov I agree with this plan. While long term I would like to see the ability to evaluate every function with constant arguments, that is definitely beyond the scope of this PR, and so keeping the fall back to constant arrays is a good idea.

Given how important this functionality is, I suggest we focus on getting this PR into master as soon as possible and then iterating on additional functionality like additional function support, converting to canonical expr _op_ colref form, constant folding, hoisting ColumValue into arrow, etc. There is so much great work to do!

@yordan-pavlov
Copy link
Contributor Author

@alamb I have now implemented falling back to scalar arrays for operations where scalar arguments are currently not supported; this should now work for operators such as "-" where previously an error was returned ("Scalar values on left side of operator - are not supported"); I have also added an extra test for this;
could you try running the TPCH test again?

@alamb
Copy link
Contributor

alamb commented Nov 16, 2020

@yordan-pavlov When I ran the benchmark locally again on my laptop:

cargo run --release --bin tpch -- --iterations 3 --path /Users/alamb/Software/tpch_data/SF10-parquet-64 --format parquet --query 1 --batch-size 4096

benchmarks on this branch (fd77fd6)

Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 2, batch_size: 4096, path: "/Users/alamb/Software/tpch_data/SF10-parquet-64", file_format: "parquet", mem_table: false }
Query 1 iteration 0 took 5371 ms
Query 1 iteration 1 took 6121 ms
Query 1 iteration 2 took 6160 ms
alamb@ip-192-168-0-133 benchmarks %

benchmark on master e5fce7f

Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 2, batch_size: 4096, path: "/Users/alamb/Software/tpch_data/SF10-parquet-64", file_format: "parquet", mem_table: false }
Query 1 iteration 0 took 7640 ms
Query 1 iteration 1 took 7808 ms
Query 1 iteration 2 took 7765 ms

So it seems to me the the performance improved a bit. I am second guessing my performance setup here - it is with a laptop and I do wonder if my CPU is being throttled.

Regardless, I think this is good enough results!

@codecov-io
Copy link

Codecov Report

Merging #8660 (e5fce7f) into master (6b910ab) will decrease coverage by 0.01%.
The diff coverage is 75.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #8660      +/-   ##
==========================================
- Coverage   84.56%   84.54%   -0.02%     
==========================================
  Files         177      177              
  Lines       43611    43645      +34     
==========================================
+ Hits        36879    36901      +22     
- Misses       6732     6744      +12     
Impacted Files Coverage Δ
rust/arrow/src/compute/kernels/cast.rs 96.66% <ø> (ø)
rust/arrow/src/util/bit_util.rs 100.00% <ø> (ø)
...ust/datafusion/src/physical_plan/hash_aggregate.rs 86.68% <69.04%> (-2.59%) ⬇️
rust/arrow/src/buffer.rs 95.65% <100.00%> (ø)
rust/arrow/src/compute/kernels/aggregate.rs 100.00% <100.00%> (ø)
rust/datafusion/src/physical_plan/merge.rs 66.12% <0.00%> (+1.61%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6b910ab...fd77fd6. Read the comment docs.

@yordan-pavlov
Copy link
Contributor Author

@alamb thanks for running that test again; even if comparisons against scalar values are significantly faster, there is much more to the tests, such as loading data, etc.; I will be looking at that next. Overall though, good result as you said.

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Super cool!

)?))
} else {
Err(ExecutionError::General(format!(
"compute_utf8_op_scalar failed to cast literal value {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an error for this: Internal

GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
…on to scalar values

This PR addresses the inefficient comparison to scalar values, where an array is built with the scalar value repeated, by
changing the return value of expressions from `Result<ArrayRef>` to `Result<ColumnarValue>`  where `ColumnarValue` is defined as:
```
pub enum ColumnarValue {
    /// Array of values
    Array(ArrayRef),
    /// A single value
    Scalar(ScalarValue)
}
```

This enables scalar values to be used in comparison operators directly, and for the simple query used in the benchmark ("select f32, f64 from t where f32 >= 250 and f64 > 250") shows approximately 10x performance improvement:

before:
filter_scalar time: [35.733 ms 36.613 ms 37.924 ms]

after:
filter_scalar time: [3.5938 ms 3.6450 ms 3.7035 ms]
change: [-90.048% -89.846% -89.625%] (p = 0.00 < 0.05)

I have also added a benchmark to compare the change in performance when comparing two arrays (using query "select f32, f64 from t where f32 >= f64") and it is negligible:

before:
filter_array time: [11.601 ms 11.656 ms 11.718 ms]

after:
filter_array time: [11.854 ms 11.957 ms 12.070 ms]
change: [+1.8032% +3.6391% +5.5671%] (p = 0.00 < 0.05)

@andygrove @alamb let me know what you think

Closes apache#8660 from yordan-pavlov/impl_scalar_expr_results

Lead-authored-by: Yordan Pavlov <yordan.pavlov@outlook.com>
Co-authored-by: Yordan Pavlov <64363766+yordan-pavlov@users.noreply.github.com>
Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants