Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-34911: [C++] Add first and last aggregator #34912

Merged
merged 15 commits into from
Apr 28, 2023

Conversation

icexelloss
Copy link
Contributor

@icexelloss icexelloss commented Apr 5, 2023

Rationale for this change

This PR adds "first" and "last" aggregator and support using those with Acero's segmented aggregation.

What changes are included in this PR?

  • Numeric Scalar Aggregator (bool, int types, floating types)
  • Numeric Hash Aggregator (bool, int types, floating types)
  • Docstring
  • Non-Numeric Scalar Aggregator (string, binary, fixed binary, temporal)
  • Non-Numeric Hash Aggregator (string, binary, fixed binary, temporal)
  • Add ordered flag in aggregate kernels
  • Implement and test skip null
  • Update compute.rst

Are these changes tested?

  • Compute Kernel Test (Scalar Kernels, all supported datatypes)
  • Hash Aggregate Test (Hash Kernels, all supported datatypes)
  • Segmented Aggregation Test (Both Scalar and Hash Kernels)

Are there any user-facing changes?

Yes. Added First and Last aggregator.

@github-actions
Copy link

github-actions bot commented Apr 5, 2023

@icexelloss
Copy link
Contributor Author

This is work in progress but I want to put this up because I got segmented aggregation to work with numeric types. Will finish up the rest of the box (see check boxes in the description).

@icexelloss
Copy link
Contributor Author

@westonpace I pretty much followed the way that min/max aggregator is implemented so hopefully there is no surprises here. Still, would be appreciate if you can take a look at whether this general approach is correct.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Apr 5, 2023
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few general questions.

Also, how does first compare with something like LIMIT 1?

Do you think there is value in instead implementing something like nth_value? E.g. https://docs.snowflake.com/en/sql-reference/functions/nth_value

cpp/src/arrow/compute/kernels/hash_aggregate.cc Outdated Show resolved Hide resolved
Comment on lines 336 to 463
for (int64_t i = 0; i < arr.length(); i++) {
local.MergeOne(arr.GetView(i));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't you break as soon as you encounter a value? Why do you need to iterate the entire array? I suppose if you want both first AND last then you might need to iterate from both directions. Something like...

if (!has_first) {
  int index = 0;
  while (index < length) {
    if (arr[index] != null || !skip_nulls) {
      has_first = true;
      first = arr[index];
      break;
    } else {
      index++;
    }
  }
}
// No need to check has_last here since we always assume the current batch is replacing the last
int index = length - 1;
while (index >= 0) {
  if (arr[index] != null || !skip_nulls) {
    last = arr[index];
    break;
  } else {
    index--;
  }
}  

Also, it appears that last carries quite a bit more cost than first. Imagine you were searching for first and skip_nulls=false. All you need to do is look at one value and you can skip all future batches.

Given this I'm not sure if we want to combine first/last into a single kernel. Or at least, make it possible in some way to skip data if last isn't needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to be close to what you have. Can you take a look if that looks fine to you?

Comment on lines 1258 to 1288
struct NullSentinel {
static constexpr CType value() { return std::numeric_limits<CType>::min(); }
};

template <>
struct NullSentinel<float> {
static constexpr float value() { return std::numeric_limits<float>::infinity(); }
};

template <>
struct NullSentinel<double> {
static constexpr double value() { return std::numeric_limits<double>::infinity(); }
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe UninitializedSentinel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this and ended up reusing AntiExtrema for the sentinel values

@icexelloss
Copy link
Contributor Author

Also, how does first compare with something like LIMIT 1?

I see them as different things. limit to me seems like "get partial results from the full results". first feels like a ordered required aggregation function, which can be used with group by, window aggregations. limit can only be used (IIRC) as the last statement in the query plan, while first can appear in the middle.

Do you think there is value in instead implementing something like nth_value? E.g. https://docs.snowflake.com/en/sql-reference/functions/nth_value

I think there is value. I can look into if this is easy to do.

@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Apr 14, 2023
@icexelloss icexelloss force-pushed the acero-first-last-agg-2 branch 2 times, most recently from c0a8463 to e8ddf80 Compare April 17, 2023 14:47
@icexelloss icexelloss changed the title GH-34911: [C++] [WIP] Add first and last aggregator GH-34911: [C++] Add first and last aggregator Apr 18, 2023
}

template <typename T = Type>
enable_if_base_binary<T, Status> MakeOffsetsValues(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from the binary version of the grouped min/max kernel. I will try to refactor this to a base class.

Copy link
Contributor Author

@icexelloss icexelloss Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I failed to address this because the templates make it hard to refactor and share the logic between "MinMax" and "FirstLast" for binary types. if that's ok I would like to leave that as follow up to better refactor this and avoid adding more complexity to this PR.

@github-actions github-actions bot added awaiting changes Awaiting changes awaiting change review Awaiting change review and removed awaiting change review Awaiting change review awaiting changes Awaiting changes labels Apr 18, 2023
@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Apr 27, 2023
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Apr 27, 2023
@icexelloss
Copy link
Contributor Author

icexelloss commented Apr 27, 2023

@westonpace I addressed all the comments I think.

For skip_nulls=false, it's actually non-trivial to support and I ended up adding three more flags (first_is_null, last_is_null, has_any_values) to support this. Please take look.

https://github.com/apache/arrow/pull/34912/files#diff-395ffe24a47c8284e800ec4bc812075ac9efe77d8e24f430c5a4bbe2b5809940R343

@ianmcook
Copy link
Member

The additions to compute.rst look good to me, thanks!

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the null behavior I would expect. Thanks for fixing that. This looks good, appreciate the persistence.

@westonpace westonpace merged commit 34bfbd9 into apache:main Apr 28, 2023
@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting change review Awaiting change review labels Apr 28, 2023
@ursabot
Copy link

ursabot commented Apr 28, 2023

Benchmark runs are scheduled for baseline = 05a61d6 and contender = 34bfbd9. 34bfbd9 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️3.8% ⬆️2.21%] test-mac-arm
[Failed ⬇️19.69% ⬆️0.0% ⚠️ Contender and baseline run contexts do not match] ursa-i9-9960x
[Finished ⬇️3.1% ⬆️1.26% ⚠️ Contender and baseline run contexts do not match] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 34bfbd93 ec2-t3-xlarge-us-east-2
[Failed] 34bfbd93 test-mac-arm
[Failed] 34bfbd93 ursa-i9-9960x
[Finished] 34bfbd93 ursa-thinkcentre-m75q
[Finished] 05a61d6f ec2-t3-xlarge-us-east-2
[Finished] 05a61d6f test-mac-arm
[Failed] 05a61d6f ursa-i9-9960x
[Finished] 05a61d6f ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ursabot
Copy link

ursabot commented Apr 28, 2023

['Python', 'R'] benchmarks have high level of regressions.
test-mac-arm
ursa-i9-9960x

westonpace added a commit that referenced this pull request May 2, 2023
…d segmentation fault (#35384)

### Rationale for this change

The recent change (#34912) calculates the max concurrency using `plan->query_context()->executor()->GetCapacity()`.  This is later used to initialize the kernel states.  However, this is different than what we used to use.  The previous method used was `plan->query_context()->max_concurrency()` which is slightly different(if the aggregate node IS run in parallel then we initialize one state for each CPU thread, one for each I/O thread, and one for the calling user thread).

This is unfortunately a bit complicated as `max_concurrency` would not be a good indicator to use when determining if the plan is running in parallel or not.  So we need to query both properties and use them in their respective spots.

### What changes are included in this PR?

Now, `max_concurrency` is used to figure out how many thread local states need to be initialized and `GetCapacity` is used to figure out if there are multiple CPU threads or not.

### Are these changes tested?

The bug was caught by the benchmarks which is a bit concerning.  Most of the CI have a very small number of CPU threads and don't experience much concurrency and so I think we just didn't see this pattern. Or possibly, this pattern is only experienced in the legacy way that pyarrow launches exec plans.

### Are there any user-facing changes?

No.
* Closes: #35383

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
liujiacheng777 pushed a commit to LoongArch-Python/arrow that referenced this pull request May 11, 2023
### Rationale for this change
This PR adds "first" and "last" aggregator and support using those with Acero's segmented aggregation.

### What changes are included in this PR?
- [x] Numeric Scalar Aggregator (bool, int types, floating types)
- [x] Numeric Hash Aggregator (bool, int types, floating types)
- [x] Docstring
- [x] Non-Numeric Scalar Aggregator (string, binary, fixed binary, temporal)
- [x] Non-Numeric Hash Aggregator (string, binary, fixed binary, temporal)
- [x] Add `ordered` flag in aggregate kernels
- [x] Implement and test skip null
- [x] Update compute.rst

### Are these changes tested?
- [x] Compute Kernel Test (Scalar Kernels, all supported datatypes)
- [x] Hash Aggregate Test (Hash Kernels, all supported datatypes)
- [x] Segmented Aggregation Test (Both Scalar and Hash Kernels)

### Are there any user-facing changes?
Yes. Added First and Last aggregator.

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
liujiacheng777 pushed a commit to LoongArch-Python/arrow that referenced this pull request May 11, 2023
…o avoid segmentation fault (apache#35384)

### Rationale for this change

The recent change (apache#34912) calculates the max concurrency using `plan->query_context()->executor()->GetCapacity()`.  This is later used to initialize the kernel states.  However, this is different than what we used to use.  The previous method used was `plan->query_context()->max_concurrency()` which is slightly different(if the aggregate node IS run in parallel then we initialize one state for each CPU thread, one for each I/O thread, and one for the calling user thread).

This is unfortunately a bit complicated as `max_concurrency` would not be a good indicator to use when determining if the plan is running in parallel or not.  So we need to query both properties and use them in their respective spots.

### What changes are included in this PR?

Now, `max_concurrency` is used to figure out how many thread local states need to be initialized and `GetCapacity` is used to figure out if there are multiple CPU threads or not.

### Are these changes tested?

The bug was caught by the benchmarks which is a bit concerning.  Most of the CI have a very small number of CPU threads and don't experience much concurrency and so I think we just didn't see this pattern. Or possibly, this pattern is only experienced in the legacy way that pyarrow launches exec plans.

### Are there any user-facing changes?

No.
* Closes: apache#35383

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
ArgusLi pushed a commit to Bit-Quill/arrow that referenced this pull request May 15, 2023
### Rationale for this change
This PR adds "first" and "last" aggregator and support using those with Acero's segmented aggregation.

### What changes are included in this PR?
- [x] Numeric Scalar Aggregator (bool, int types, floating types)
- [x] Numeric Hash Aggregator (bool, int types, floating types)
- [x] Docstring
- [x] Non-Numeric Scalar Aggregator (string, binary, fixed binary, temporal)
- [x] Non-Numeric Hash Aggregator (string, binary, fixed binary, temporal)
- [x] Add `ordered` flag in aggregate kernels
- [x] Implement and test skip null
- [x] Update compute.rst

### Are these changes tested?
- [x] Compute Kernel Test (Scalar Kernels, all supported datatypes)
- [x] Hash Aggregate Test (Hash Kernels, all supported datatypes)
- [x] Segmented Aggregation Test (Both Scalar and Hash Kernels)

### Are there any user-facing changes?
Yes. Added First and Last aggregator.

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
ArgusLi pushed a commit to Bit-Quill/arrow that referenced this pull request May 15, 2023
…o avoid segmentation fault (apache#35384)

### Rationale for this change

The recent change (apache#34912) calculates the max concurrency using `plan->query_context()->executor()->GetCapacity()`.  This is later used to initialize the kernel states.  However, this is different than what we used to use.  The previous method used was `plan->query_context()->max_concurrency()` which is slightly different(if the aggregate node IS run in parallel then we initialize one state for each CPU thread, one for each I/O thread, and one for the calling user thread).

This is unfortunately a bit complicated as `max_concurrency` would not be a good indicator to use when determining if the plan is running in parallel or not.  So we need to query both properties and use them in their respective spots.

### What changes are included in this PR?

Now, `max_concurrency` is used to figure out how many thread local states need to be initialized and `GetCapacity` is used to figure out if there are multiple CPU threads or not.

### Are these changes tested?

The bug was caught by the benchmarks which is a bit concerning.  Most of the CI have a very small number of CPU threads and don't experience much concurrency and so I think we just didn't see this pattern. Or possibly, this pattern is only experienced in the legacy way that pyarrow launches exec plans.

### Are there any user-facing changes?

No.
* Closes: apache#35383

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
rtpsw pushed a commit to rtpsw/arrow that referenced this pull request May 16, 2023
### Rationale for this change
This PR adds "first" and "last" aggregator and support using those with Acero's segmented aggregation.

### What changes are included in this PR?
- [x] Numeric Scalar Aggregator (bool, int types, floating types)
- [x] Numeric Hash Aggregator (bool, int types, floating types)
- [x] Docstring
- [x] Non-Numeric Scalar Aggregator (string, binary, fixed binary, temporal)
- [x] Non-Numeric Hash Aggregator (string, binary, fixed binary, temporal)
- [x] Add `ordered` flag in aggregate kernels
- [x] Implement and test skip null
- [x] Update compute.rst

### Are these changes tested?
- [x] Compute Kernel Test (Scalar Kernels, all supported datatypes)
- [x] Hash Aggregate Test (Hash Kernels, all supported datatypes)
- [x] Segmented Aggregation Test (Both Scalar and Hash Kernels)

### Are there any user-facing changes?
Yes. Added First and Last aggregator.

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
rtpsw pushed a commit to rtpsw/arrow that referenced this pull request May 16, 2023
…o avoid segmentation fault (apache#35384)

### Rationale for this change

The recent change (apache#34912) calculates the max concurrency using `plan->query_context()->executor()->GetCapacity()`.  This is later used to initialize the kernel states.  However, this is different than what we used to use.  The previous method used was `plan->query_context()->max_concurrency()` which is slightly different(if the aggregate node IS run in parallel then we initialize one state for each CPU thread, one for each I/O thread, and one for the calling user thread).

This is unfortunately a bit complicated as `max_concurrency` would not be a good indicator to use when determining if the plan is running in parallel or not.  So we need to query both properties and use them in their respective spots.

### What changes are included in this PR?

Now, `max_concurrency` is used to figure out how many thread local states need to be initialized and `GetCapacity` is used to figure out if there are multiple CPU threads or not.

### Are these changes tested?

The bug was caught by the benchmarks which is a bit concerning.  Most of the CI have a very small number of CPU threads and don't experience much concurrency and so I think we just didn't see this pattern. Or possibly, this pattern is only experienced in the legacy way that pyarrow launches exec plans.

### Are there any user-facing changes?

No.
* Closes: apache#35383

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants