Skip to content

Commit

Permalink
GH-32884: [C++] Add ordered aggregation (#34311)
Browse files Browse the repository at this point in the history
This PR implements "Segmented Aggregation" to the existing aggregation
node to improve aggregation on ordered data.

A segment group is defined as "a continuous chunk of data that have the
same segment key value. e.g, if the input data looks like

```
[0, 0, 0, 1, 2, 2] 
```

Then there are three segments `[0, 0, 0]` `[1]` `[2, 2]`

(Note the "group" in "segment group" here is added to differentiate from
"segment", which is defined as "a continuous chunk of data with in a
ExecBatch")

Segment aggregation can be used to replace existing hash aggregation in
the case that data are ordered. The benefit of this is
(1) We can output aggregation result earlier (as soon as a segment group
is fully consumed).
(2) We only need to hold partial aggregation for one segment group to
reduce memory usage.

See https://issues.apache.org/jira/browse/ARROW-17642

Replaces #14352
* Closes: #32884

Follow ups
=======
* #34475 
* #34529

---------

Co-authored-by: Li Jin <ice.xelloss@gmail.com>
  • Loading branch information
rtpsw and icexelloss committed Mar 10, 2023
1 parent 4c05a3b commit 9baefea
Show file tree
Hide file tree
Showing 11 changed files with 1,731 additions and 195 deletions.
12 changes: 12 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Expand Up @@ -147,6 +147,18 @@ ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) const {
return out;
}

Result<ExecBatch> ExecBatch::SelectValues(const std::vector<int>& ids) const {
std::vector<Datum> selected_values;
selected_values.reserve(ids.size());
for (int id : ids) {
if (id < 0 || static_cast<size_t>(id) >= values.size()) {
return Status::Invalid("ExecBatch invalid value selection: ", id);
}
selected_values.push_back(values[id]);
}
return ExecBatch(std::move(selected_values), length);
}

namespace {

enum LengthInferenceError {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/compute/exec.h
Expand Up @@ -181,6 +181,12 @@ struct ARROW_EXPORT ExecBatch {
/// \brief Infer the ExecBatch length from values.
static Result<int64_t> InferLength(const std::vector<Datum>& values);

/// Creates an ExecBatch with length-validation.
///
/// If any value is given, then all values must have a common length. If the given
/// length is negative, then the length of the ExecBatch is set to this common length,
/// or to 1 if no values are given. Otherwise, the given length must equal the common
/// length, if any value is given.
static Result<ExecBatch> Make(std::vector<Datum> values, int64_t length = -1);

Result<std::shared_ptr<RecordBatch>> ToRecordBatch(
Expand Down Expand Up @@ -240,6 +246,8 @@ struct ARROW_EXPORT ExecBatch {

ExecBatch Slice(int64_t offset, int64_t length) const;

Result<ExecBatch> SelectValues(const std::vector<int>& ids) const;

/// \brief A convenience for returning the types from the batch.
std::vector<TypeHolder> GetTypes() const {
std::vector<TypeHolder> result;
Expand Down
309 changes: 276 additions & 33 deletions cpp/src/arrow/compute/exec/aggregate_node.cc

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions cpp/src/arrow/compute/exec/exec_plan.h
Expand Up @@ -241,8 +241,7 @@ class ARROW_EXPORT ExecNode {
/// concurrently, potentially even before the call to StartProducing
/// has finished.
/// - PauseProducing(), ResumeProducing(), StopProducing() may be called
/// by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished()
/// methods
/// by the downstream nodes' InputReceived(), InputFinished() methods
///
/// StopProducing may be called due to an error, by the user (e.g. cancel), or
/// because a node has all the data it needs (e.g. limit, top-k on sorted data).
Expand Down
24 changes: 20 additions & 4 deletions cpp/src/arrow/compute/exec/options.h
Expand Up @@ -221,21 +221,37 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions {
std::vector<std::string> names;
};

/// \brief Make a node which aggregates input batches, optionally grouped by keys.
/// \brief Make a node which aggregates input batches, optionally grouped by keys and
/// optionally segmented by segment-keys. Both keys and segment-keys determine the group.
/// However segment-keys are also used for determining grouping segments, which should be
/// large, and allow streaming a partial aggregation result after processing each segment.
/// One common use-case for segment-keys is ordered aggregation, in which the segment-key
/// attribute specifies a column with non-decreasing values or a lexicographically-ordered
/// set of such columns.
///
/// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is
/// expected to be a HashAggregate function. If the keys attribute is an empty vector,
/// then each aggregate is assumed to be a ScalarAggregate function.
///
/// If the segment_keys attribute is a non-empty vector, then segmented aggregation, as
/// described above, applies.
///
/// The keys and segment_keys vectors must be disjoint.
class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
public:
explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys = {})
: aggregates(std::move(aggregates)), keys(std::move(keys)) {}
std::vector<FieldRef> keys = {},
std::vector<FieldRef> segment_keys = {})
: aggregates(std::move(aggregates)),
keys(std::move(keys)),
segment_keys(std::move(segment_keys)) {}

// aggregations which will be applied to the targetted fields
std::vector<Aggregate> aggregates;
// keys by which aggregations will be grouped
// keys by which aggregations will be grouped (optional)
std::vector<FieldRef> keys;
// keys by which aggregations will be segmented (optional)
std::vector<FieldRef> segment_keys;
};

constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30; // 1GiB
Expand Down
103 changes: 103 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Expand Up @@ -1578,5 +1578,108 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) {
}
}

TEST(ExecPlanExecution, SegmentedAggregationWithMultiThreading) {
BatchesWithSchema data;
data.batches = {ExecBatchFromJSON({int32()}, "[[1]]")};
data.schema = schema({field("i32", int32())});
Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"count", nullptr, "i32", "count(i32)"},
},
/*keys=*/{"i32"}, /*segment_leys=*/{"i32"}}}});
EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, HasSubstr("multi-threaded"),
DeclarationToExecBatches(std::move(plan)));
}

TEST(ExecPlanExecution, SegmentedAggregationWithOneSegment) {
BatchesWithSchema data;
data.batches = {
ExecBatchFromJSON({int32(), int32(), int32()}, "[[1, 1, 1], [1, 2, 1], [1, 1, 2]]"),
ExecBatchFromJSON({int32(), int32(), int32()},
"[[1, 2, 2], [1, 1, 3], [1, 2, 3]]")};
data.schema = schema({
field("a", int32()),
field("b", int32()),
field("c", int32()),
});

Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"hash_sum", nullptr, "c", "sum(c)"},
{"hash_mean", nullptr, "c", "mean(c)"},
},
/*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));

auto expected = ExecBatchFromJSON({int64(), float64(), int32(), int32()},
R"([[6, 2, 1, 1], [6, 2, 2, 1]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}

TEST(ExecPlanExecution, SegmentedAggregationWithTwoSegments) {
BatchesWithSchema data;
data.batches = {
ExecBatchFromJSON({int32(), int32(), int32()}, "[[1, 1, 1], [1, 2, 1], [1, 1, 2]]"),
ExecBatchFromJSON({int32(), int32(), int32()},
"[[2, 2, 2], [2, 1, 3], [2, 2, 3]]")};
data.schema = schema({
field("a", int32()),
field("b", int32()),
field("c", int32()),
});

Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"hash_sum", nullptr, "c", "sum(c)"},
{"hash_mean", nullptr, "c", "mean(c)"},
},
/*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));

auto expected = ExecBatchFromJSON(
{int64(), float64(), int32(), int32()},
R"([[3, 1.5, 1, 1], [1, 1, 2, 1], [3, 3, 1, 2], [5, 2.5, 2, 2]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}

TEST(ExecPlanExecution, SegmentedAggregationWithBatchCrossingSegment) {
BatchesWithSchema data;
data.batches = {
ExecBatchFromJSON({int32(), int32(), int32()}, "[[1, 1, 1], [1, 1, 1], [2, 2, 2]]"),
ExecBatchFromJSON({int32(), int32(), int32()},
"[[2, 2, 2], [3, 3, 3], [3, 3, 3]]")};
data.schema = schema({
field("a", int32()),
field("b", int32()),
field("c", int32()),
});

Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"hash_sum", nullptr, "c", "sum(c)"},
{"hash_mean", nullptr, "c", "mean(c)"},
},
/*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));

auto expected = ExecBatchFromJSON({int64(), float64(), int32(), int32()},
R"([[2, 1, 1, 1], [4, 2, 2, 2], [6, 3, 3, 3]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}

} // namespace compute
} // namespace arrow

0 comments on commit 9baefea

Please sign in to comment.