Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-16894: [C++] Add Benchmarks for Asof Join Node #13426

Merged
merged 91 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from 82 commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
23b8c71
wip
icexelloss Apr 18, 2022
f4b2106
wip
icexelloss Apr 18, 2022
7ab446d
wip
icexelloss Apr 21, 2022
138daee
wip
icexelloss Apr 26, 2022
94a8453
wip: First test pass
icexelloss Apr 26, 2022
6466b80
Fix code style and lint (partial)
icexelloss Apr 28, 2022
4c33452
Add support for mutliple tables; Add more tests
icexelloss May 9, 2022
c6c6093
Clean up code style (Pass ninja lint now), switch to unbounded queue
icexelloss May 10, 2022
643e368
Clean up some files
icexelloss May 10, 2022
0781a16
Clean up some files
icexelloss May 11, 2022
fc75844
Minor clean up
icexelloss May 11, 2022
26bc862
Fix nulls in test result
icexelloss May 11, 2022
5a6afbd
Clean up includes
icexelloss May 11, 2022
4f7cac7
Clean up error handling
icexelloss May 12, 2022
8773317
Error handling
icexelloss May 12, 2022
22c9941
Fix compiler warning
icexelloss May 17, 2022
6b27e6b
Fix Wshorten-64-to-32 error
icexelloss May 23, 2022
775be1d
Fix lint
icexelloss May 23, 2022
2dc5691
Fix lint
icexelloss May 23, 2022
a9dd980
Fix compiler warning Wunused-result
icexelloss May 24, 2022
0f39fce
Fix format
icexelloss May 24, 2022
761e5de
Remove debug statement
icexelloss May 24, 2022
0387e5c
Update cpp/src/arrow/compute/exec/asof_join_node.cc
icexelloss May 26, 2022
15ba43d
Update cpp/src/arrow/compute/exec/asof_join_node.cc
icexelloss May 26, 2022
b92a303
Update cpp/src/arrow/compute/exec/asof_join_node.cc
icexelloss May 26, 2022
f0edd17
Update cpp/src/arrow/compute/exec/asof_join.h
icexelloss May 26, 2022
58f229d
Update cpp/src/arrow/compute/exec/asof_join_node.cc
icexelloss May 26, 2022
15e2783
Update cpp/src/arrow/compute/exec/asof_join_node.cc
icexelloss May 31, 2022
7aa252a
Update cpp/src/arrow/compute/exec/asof_join_node.cc
icexelloss May 31, 2022
9c332eb
Apply suggestions from code review
icexelloss May 31, 2022
cae5592
Address PR comments
icexelloss May 31, 2022
8f32500
Took another pass of remaing functions to mixed style
icexelloss May 31, 2022
eead16e
ninja lint
icexelloss May 31, 2022
d0aec2f
Remove asof_join.h
icexelloss Jun 1, 2022
f783818
Address comments
icexelloss Jun 1, 2022
1a79d10
Clean up tests
icexelloss Jun 1, 2022
83398c8
ninja format
icexelloss Jun 1, 2022
9f3d5c9
Use implicit ctor for optional
icexelloss Jun 1, 2022
e61b9c1
Refactor tests
icexelloss Jun 1, 2022
1b4f26b
Address comments and add check/test for unsuported datatypes
icexelloss Jun 2, 2022
7287e84
Merge branch 'master' of https://github.com/apache/arrow into asof_jo…
Jun 10, 2022
554f7c3
Merge branch 'master' of https://github.com/apache/arrow into asof_jo…
Jun 10, 2022
a221715
wip: initial asof benchmark
Jun 10, 2022
0c72a94
fix linting and formatting issues
Jun 10, 2022
d4cb13e
update as of join with static testing files
Jun 13, 2022
121b3bd
Apply suggestions from code review
icexelloss Jun 14, 2022
66b6b98
Apply suggestions from code review
icexelloss Jun 14, 2022
776ff18
reroute stdout node output to stderr, add hashjoin benchmarks, add as…
Jun 16, 2022
b4cd1e5
Merge remote-tracking branch 'ice/arrow-asof-join' into asof_join_ben…
Jun 16, 2022
6154eae
refactor, fix bytes/second
Jun 21, 2022
599f408
add asymmetric benchmarks
Jun 23, 2022
2009b8a
remove isolated benchmark
Jun 23, 2022
a33eba0
add system call to bamboo-streaming setup and teardown, refactor benc…
Jun 24, 2022
780e964
lint, add more documentation, change function name
Jun 24, 2022
4020830
update benchmarks, remove nonsalient benchmarks
Jun 29, 2022
5d73258
add verification tests
Jul 6, 2022
4df409a
add struct, add suggestions for code cleanup
Jul 7, 2022
fd87a66
add benchmark generation files, refactor benchmarks with tables, shor…
Jul 8, 2022
4a8d67c
remove assignment from StartAndCollect
Jul 8, 2022
16e86bf
remove datagen
Jul 8, 2022
b3228f2
update data generation scripts
Jul 8, 2022
aaa2b04
update data generation scripts, temporary directories
Jul 12, 2022
798d2b3
fix benchmark scripts
Jul 12, 2022
c414e8a
fix typo in benchmark scripts naming
Jul 12, 2022
b8fb090
remove join schema (unused)
Jul 12, 2022
c9a608c
add benchmarking readme
Jul 12, 2022
47aeb70
remove unused csv writer
Jul 12, 2022
1752a87
add mem pools
Jul 12, 2022
a7fc6c6
update asof join node
Jul 12, 2022
0a74f7e
Merge branch 'master' of https://github.com/apache/arrow into asof_join2
Jul 12, 2022
89f25cc
remove python data generation scripts, write data gen in cpp
Jul 15, 2022
6f9bb79
Merge branch 'master' of https://github.com/apache/arrow into asof_jo…
Jul 15, 2022
c69336a
undo memory pool changes
Jul 15, 2022
9adf631
remove unused include
Jul 15, 2022
8a2d0a8
add documentation, fix benchmarks
Jul 15, 2022
ca37af5
remove hashjoin benchmarks, rename function
Jul 15, 2022
3622d5e
remove unecessary includes
Jul 15, 2022
cc17f47
refactor and clean up code
Jul 15, 2022
5c6a32c
fix variables in MakeRandomTable
Jul 15, 2022
d9f7c14
adjust seeding so all columns are different
Jul 15, 2022
ac5bcd1
fix data generation to be time major-order instead of id major-oder
Jul 15, 2022
7512796
refactor asof, other style changes from code rev
Jul 18, 2022
ff84700
follow comment format
Jul 18, 2022
0111994
make spacing consistent
Jul 18, 2022
df1a566
Merge branch 'master' of https://github.com/apache/arrow into asof_jo…
Jul 19, 2022
7816ade
change TableGenerationProperties.seed to int from uint
Jul 21, 2022
fd33711
adjust data types to silence build warnings
Jul 21, 2022
cdbf5f2
respond to code review from weston
Jul 26, 2022
3658a3a
remove table gen export
Jul 26, 2022
3a212e5
add documentation change, add arrow check to randomtable
Jul 27, 2022
bb3565a
ninja lint format
Jul 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ add_arrow_benchmark(project_benchmark
benchmark_util.cc
project_benchmark.cc)

add_arrow_benchmark(asof_join_benchmark PREFIX "arrow-compute")

add_arrow_benchmark(tpch_benchmark PREFIX "arrow-compute")

if(ARROW_BUILD_OPENMP_BENCHMARKS)
Expand Down
171 changes: 171 additions & 0 deletions cpp/src/arrow/compute/exec/asof_join_benchmark.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <string>

#include "benchmark/benchmark.h"

#include "arrow/compute/exec/test_util.h"
#include "arrow/dataset/file_parquet.h"
#include "arrow/table.h"
#include "arrow/testing/future_util.h"

namespace arrow {
namespace compute {

static const char* kTimeCol = "time";
static const char* kKeyCol = "id";
const int kDefaultStart = 0;
const int kDefaultEnd = 500;
const int kDefaultMinColumnVal = -10000;
const int kDefaultMaxColumnVal = 10000;

struct TableStats {
std::shared_ptr<Table> table;
size_t total_rows;
size_t total_bytes;
};

static TableStats MakeTable(const TableGenerationProperties& properties) {
std::shared_ptr<Table> table = MakeRandomTimeSeriesTable(properties);
size_t row_size = sizeof(double) * (table.get()->schema()->num_fields() - 2) +
sizeof(int64_t) + sizeof(int32_t);
Comment on lines +46 to +47
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some utilities you can use in arrow/util/byte_size.h too if you wanted a more accurate version of the size (e.g. will report size used by validity bitmaps).

However, this is fine too I think. It represents a more conceptual data size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some testing, it seems these numbers are identical.

size_t rows = table.get()->num_rows();
return {table, rows, rows * row_size};
}

static ExecNode* MakeTableSourceNode(std::shared_ptr<arrow::compute::ExecPlan> plan,
std::shared_ptr<Table> table, int batch_size) {
return *arrow::compute::MakeExecNode(
"table_source", plan.get(), {},
arrow::compute::TableSourceNodeOptions(table, batch_size));
iChauster marked this conversation as resolved.
Show resolved Hide resolved
}
iChauster marked this conversation as resolved.
Show resolved Hide resolved

static void TableJoinOverhead(benchmark::State& state,
TableGenerationProperties left_table_properties,
int left_table_batch_size,
iChauster marked this conversation as resolved.
Show resolved Hide resolved
TableGenerationProperties right_table_properties,
int right_table_batch_size, int num_right_tables,
std::string factory_name, ExecNodeOptions& options) {
ExecContext ctx(default_memory_pool(), nullptr);

left_table_properties.column_prefix = "lt";
left_table_properties.seed = 0;
TableStats left_table_stats = MakeTable(left_table_properties);

size_t right_hand_rows = 0;
size_t right_hand_bytes = 0;
std::vector<TableStats> right_input_tables;
right_input_tables.reserve(num_right_tables);

for (int i = 0; i < num_right_tables; i++) {
right_table_properties.column_prefix = "rt" + std::to_string(i);
right_table_properties.seed = i + 1;
TableStats right_table_stats = MakeTable(right_table_properties);
right_hand_rows += right_table_stats.total_rows;
right_hand_bytes += right_table_stats.total_bytes;
right_input_tables.push_back(right_table_stats);
iChauster marked this conversation as resolved.
Show resolved Hide resolved
}

for (auto _ : state) {
state.PauseTiming();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
ExecPlan::Make(&ctx));
std::vector<ExecNode*> input_nodes = {
MakeTableSourceNode(plan, left_table_stats.table, left_table_batch_size)};
input_nodes.reserve(right_input_tables.size() + 1);
for (TableStats table_stats : right_input_tables) {
input_nodes.push_back(
MakeTableSourceNode(plan, table_stats.table, right_table_batch_size));
}
ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * join_node,
MakeExecNode(factory_name, plan.get(), input_nodes, options));
AsyncGenerator<util::optional<ExecBatch>> sink_gen;
MakeExecNode("sink", plan.get(), {join_node}, SinkNodeOptions{&sink_gen});
state.ResumeTiming();
ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
}

state.counters["total_rows_per_second"] = benchmark::Counter(
static_cast<double>(state.iterations() *
(left_table_stats.total_rows + right_hand_rows)),
benchmark::Counter::kIsRate);
iChauster marked this conversation as resolved.
Show resolved Hide resolved

state.counters["total_bytes_per_second"] = benchmark::Counter(
static_cast<double>(state.iterations() *
(left_table_stats.total_bytes + right_hand_bytes)),
benchmark::Counter::kIsRate);

state.counters["maximum_peak_memory"] =
benchmark::Counter(static_cast<double>(ctx.memory_pool()->max_memory()));
}

static void AsOfJoinOverhead(benchmark::State& state) {
int64_t tolerance = 0;
AsofJoinNodeOptions options = AsofJoinNodeOptions(kTimeCol, kKeyCol, tolerance);
TableJoinOverhead(
state,
TableGenerationProperties{int(state.range(0)), int(state.range(1)),
int(state.range(2)), "", kDefaultMinColumnVal,
kDefaultMaxColumnVal, 0, kDefaultStart, kDefaultEnd},
int(state.range(3)),
TableGenerationProperties{int(state.range(5)), int(state.range(6)),
int(state.range(7)), "", kDefaultMinColumnVal,
kDefaultMaxColumnVal, 0, kDefaultStart, kDefaultEnd},
int(state.range(8)), int(state.range(4)), "asofjoin", options);
}

// this generates the set of right hand tables to test on.
void SetArgs(benchmark::internal::Benchmark* bench) {
bench
->ArgNames({"left_freq", "left_cols", "left_ids", "left_batch_size",
"num_right_tables", "right_freq", "right_cols", "right_ids",
"right_batch_size"})
->UseRealTime();
int default_freq = 5;
int default_cols = 20;
int default_ids = 500;
int default_num_tables = 1;
int default_batch_size = 100;

for (int freq : {1, 5, 10}) {
bench->Args({freq, default_cols, default_ids, default_batch_size, default_num_tables,
freq, default_cols, default_ids, default_batch_size});
}

for (int cols : {10, 20, 100}) {
bench->Args({default_freq, cols, default_ids, default_batch_size, default_num_tables,
default_freq, cols, default_ids, default_batch_size});
}
for (int ids : {100, 500, 1000}) {
bench->Args({default_freq, default_cols, ids, default_batch_size, default_num_tables,
default_freq, default_cols, ids, default_batch_size});
}
for (int num_tables : {1, 10, 50}) {
bench->Args({default_freq, default_cols, default_ids, default_batch_size, num_tables,
default_freq, default_cols, default_ids, default_batch_size});
}
for (int batch_size : {1, 500, 1000}) {
iChauster marked this conversation as resolved.
Show resolved Hide resolved
bench->Args({default_freq, default_cols, default_ids, batch_size, default_num_tables,
default_freq, default_cols, default_ids, batch_size});
}
}

BENCHMARK(AsOfJoinOverhead)->Apply(SetArgs);

} // namespace compute
} // namespace arrow
37 changes: 37 additions & 0 deletions cpp/src/arrow/compute/exec/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "arrow/datum.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/testing/builder.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
Expand Down Expand Up @@ -459,5 +460,41 @@ void PrintTo(const Declaration& decl, std::ostream* os) {
*os << "}";
}

std::shared_ptr<Table> MakeRandomTimeSeriesTable(
const TableGenerationProperties& properties) {
int total_columns = properties.num_columns + 2;
std::vector<std::shared_ptr<Array>> columns;
columns.reserve(total_columns);
arrow::FieldVector field_vector;
field_vector.reserve(total_columns);

field_vector.push_back(field("time", int64()));
field_vector.push_back(field("id", int32()));

Int64Builder time_column_builder;
Int32Builder id_column_builder;
for (int time = properties.start; time <= properties.end;
time += properties.time_frequency) {
for (int id = 0; id < properties.num_ids; id++) {
time_column_builder.Append(time);
id_column_builder.Append(id);
}
}

int num_rows = time_column_builder.length();
columns.push_back(time_column_builder.Finish().ValueOrDie());
columns.push_back(id_column_builder.Finish().ValueOrDie());
Copy link
Contributor Author

@iChauster iChauster Jul 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is using ValueOrDie() here okay? I tried using CHECK_OK_AND_ASSIGN but I don't think that works in a non-void function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you change the function to return Result<std::shared_ptr<Table>> (which you should) then you can use ARROW_ASSIGN_OR_RAISE. CHECK_... and ASSERT_... should only be used in the test/benchmark files themselves. In helper functions (e.g. test_util.cc) you should return a Status or a Result<T>

iChauster marked this conversation as resolved.
Show resolved Hide resolved

for (int i = 0; i < properties.num_columns; i++) {
field_vector.push_back(
field(properties.column_prefix + std::to_string(i), float64()));
random::RandomArrayGenerator rand = random::RandomArrayGenerator(properties.seed + i);
iChauster marked this conversation as resolved.
Show resolved Hide resolved
columns.push_back(
rand.Float64(num_rows, properties.min_column_value, properties.max_column_value));
}
std::shared_ptr<arrow::Schema> schema = arrow::schema(std::move(field_vector));
return Table::Make(schema, columns, num_rows);
}

} // namespace compute
} // namespace arrow
34 changes: 34 additions & 0 deletions cpp/src/arrow/compute/exec/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,39 @@ class Random64Bit {
std::uniform_int_distribution<uint64_t> dist_;
};

// Specify properties of a table to be generated.
struct TableGenerationProperties {
/// Indicates the amount of time between data points that lie between
/// the start and end parameters.
int time_frequency;
/// The number of additional random columns in the table.
int num_columns;
/// The number of unique keys in the table.
int num_ids;
/// Specifies the prefix of each randomly generated column.
std::string column_prefix;
/// Specifies the minimum value in the randomly generated column(s).
int min_column_value;
/// Specifies the maximum value in the randomly generated column(s).
int max_column_value;
/// The random seed the random array generator is given to generate the additional
/// columns.
uint seed;
/// Specifies the beginning of 'time' recorded in the table, inclusive.
int start;
/// Specifies the end of 'time' recorded in the table, inclusive.
int end;
};

/// The table generated in accordance to the TableGenerationProperties has the following
/// schema: time (int64) id (int32) [properties.column_prefix]0 (float64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the 0 in [properties.column_prefix]0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think this one got caught in the linting / formatter and made it a bit unclear, but each column is numbered from 0 to n - 1 inclusive, so each column name is something like [properties.column_prefix][i] where i = {0...n-1}. Is there a way I can make this clearer through the comments?

iChauster marked this conversation as resolved.
Show resolved Hide resolved
/// [properties.column_prefix]1 (float64)
/// ...
/// [properties.column_prefix][properties.num_columns] (float64)
/// Each id has rows corresponding to a singular data point in the time range (start, end,
/// time_frequency). The table is sorted by time.
std::shared_ptr<Table> MakeRandomTimeSeriesTable(
const TableGenerationProperties& properties);

} // namespace compute
} // namespace arrow