Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41149: [C++][Acero] Fix asof join race #41614

Merged
merged 13 commits into from
May 14, 2024
73 changes: 44 additions & 29 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,10 @@ class InputState {
// true when the queue is empty and, when memo may have future entries (the case of a
// positive tolerance), when the memo is empty.
// used when checking whether RHS is up to date with LHS.
bool CurrentEmpty() const {
return memo_.no_future_ ? Empty() : memo_.times_.empty() && Empty();
// NOTE: The emptiness must be decided by a single call to Empty() in caller, due to the
// potential race with Push(), see GH-41614.
bool CurrentEmpty(bool empty) const {
return memo_.no_future_ ? empty : (memo_.times_.empty() && empty);
}

// in case memo may not have future entries (the case of a non-positive tolerance),
Expand Down Expand Up @@ -650,13 +652,15 @@ class InputState {
// timestamp, update latest_time and latest_ref_row to the value that immediately pass
// the horizon. Update the memo-store with any entries or future entries so observed.
// Returns true if updates were made, false if not.
Result<bool> AdvanceAndMemoize(OnType ts) {
// NOTE: The emptiness must be decided by a single call to Empty() in caller, due to the
// potential race with Push(), see GH-41614.
Result<bool> AdvanceAndMemoize(OnType ts, bool empty) {
// Advance the right side row index until we reach the latest right row (for each key)
// for the given left timestamp.
DEBUG_SYNC(node_, "Advancing input ", index_, DEBUG_MANIP(std::endl));

// Check if already updated for TS (or if there is no latest)
if (Empty()) { // can't advance if empty and no future entries
if (empty) { // can't advance if empty and no future entries
return memo_.no_future_ ? false : memo_.RemoveEntriesWithLesserTime(ts);
}

Expand Down Expand Up @@ -918,34 +922,46 @@ class CompositeTableBuilder {
// guaranteeing this probability is below 1 in a billion. The fix is 128-bit hashing.
// See ARROW-17653
class AsofJoinNode : public ExecNode {
// Advances the RHS as far as possible to be up to date for the current LHS timestamp
Result<bool> UpdateRhs() {
// A simple wrapper for the result of a single call to UpdateRhs(), identifying:
// 1) If any RHS has advanced.
// 2) If all RHS are up to date with LHS.
struct RhsUpdateState {
bool any_advanced;
bool all_up_to_date_with_lhs;
};
// Advances the RHS as far as possible to be up to date for the current LHS timestamp,
// and checks if all RHS are up to date with LHS. The reason they have to be performed
// together is that they both depend on the emptiness of the RHS, which can be changed
// by Push() executing in another thread.
Result<RhsUpdateState> UpdateRhs() {
auto& lhs = *state_.at(0);
auto lhs_latest_time = lhs.GetLatestTime();
bool any_updated = false;
for (size_t i = 1; i < state_.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(bool advanced, state_[i]->AdvanceAndMemoize(lhs_latest_time));
any_updated |= advanced;
}
return any_updated;
}

// Returns false if RHS not up to date for LHS
bool IsUpToDateWithLhsRow() const {
auto& lhs = *state_[0];
if (lhs.Empty()) return false; // can't proceed if nothing on the LHS
OnType lhs_ts = lhs.GetLatestTime();
RhsUpdateState update_state{/*any_advanced=*/false, /*all_up_to_date_with_lhs=*/true};
for (size_t i = 1; i < state_.size(); ++i) {
auto& rhs = *state_[i];
if (!rhs.Finished()) {

// Obtain RHS emptiness once for subsequent AdvanceAndMemoize() and CurrentEmpty().
bool rhs_empty = rhs.Empty();
// Obtain RHS current time here because AdvanceAndMemoize() can change the
// emptiness.
OnType rhs_current_time = rhs_empty ? OnType{} : rhs.GetLatestTime();

ARROW_ASSIGN_OR_RAISE(bool advanced,
rhs.AdvanceAndMemoize(lhs_latest_time, rhs_empty));
update_state.any_advanced |= advanced;

if (update_state.all_up_to_date_with_lhs && !rhs.Finished()) {
// If RHS is finished, then we know it's up to date
if (rhs.CurrentEmpty())
return false; // RHS isn't finished, but is empty --> not up to date
if (lhs_ts > rhs.GetCurrentTime())
return false; // RHS isn't up to date (and not finished)
if (rhs.CurrentEmpty(rhs_empty)) {
// RHS isn't finished, but is empty --> not up to date
update_state.all_up_to_date_with_lhs = false;
} else if (lhs_latest_time > rhs_current_time) {
// RHS isn't up to date (and not finished)
update_state.all_up_to_date_with_lhs = false;
}
}
}
return true;
return update_state;
}

Result<std::shared_ptr<RecordBatch>> ProcessInner() {
Expand All @@ -963,20 +979,19 @@ class AsofJoinNode : public ExecNode {
// If LHS is finished or empty then there's nothing we can do here
if (lhs.Finished() || lhs.Empty()) break;

// Advance each of the RHS as far as possible to be up to date for the LHS timestamp
ARROW_ASSIGN_OR_RAISE(bool any_rhs_advanced, UpdateRhs());
ARROW_ASSIGN_OR_RAISE(auto rhs_update_state, UpdateRhs());

// If we have received enough inputs to produce the next output batch
// (decided by IsUpToDateWithLhsRow), we will perform the join and
// materialize the output batch. The join is done by advancing through
// the LHS and adding joined row to rows_ (done by Emplace). Finally,
// input batches that are no longer needed are removed to free up memory.
if (IsUpToDateWithLhsRow()) {
if (rhs_update_state.all_up_to_date_with_lhs) {
dst.Emplace(state_, tolerance_);
ARROW_ASSIGN_OR_RAISE(bool advanced, lhs.Advance());
if (!advanced) break; // if we can't advance LHS, we're done for this batch
} else {
if (!any_rhs_advanced) break; // need to wait for new data
if (!rhs_update_state.any_advanced) break; // need to wait for new data
}
}

Expand Down
54 changes: 54 additions & 0 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1678,5 +1678,59 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) {
/*slow_r0=*/false);
}

// Reproduction of GH-40675: A logical race between Process() and Push() that can be more
// easily observed with single small batch.
TEST(AsofJoinTest, RhsEmptinessRace) {
auto left_batch = ExecBatchFromJSON(
pitrou marked this conversation as resolved.
Show resolved Hide resolved
{int64(), utf8()}, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, "f"]])");
auto right_batch = ExecBatchFromJSON(
{int64(), utf8(), float64()}, R"([[2, "a", 1.0], [9, "b", 3.0], [15, "g", 5.0]])");

Declaration left{
"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("colA", int64()), field("col2", utf8())}),
{std::move(left_batch)})};
Declaration right{
"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("colB", int64()), field("col3", utf8()),
field("colC", float64())}),
{std::move(right_batch)})};
AsofJoinNodeOptions asof_join_opts({{{"colA"}, {{"col2"}}}, {{"colB"}, {{"col3"}}}}, 1);
Declaration asof_join{
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};

ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));

auto exp_batch = ExecBatchFromJSON(
{int64(), utf8(), float64()},
R"([[1, "a", 1.0], [1, "b", null], [5, "a", null], [6, "b", null], [7, "f", null]])");
AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches);
}

// Reproduction of GH-41149: Another case of the same root cause as GH-40675, but with
// empty "by" columns.
TEST(AsofJoinTest, RhsEmptinessRaceEmptyBy) {
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
pitrou marked this conversation as resolved.
Show resolved Hide resolved
auto right_batch =
ExecBatchFromJSON({utf8(), int64()}, R"([["Z", 2], ["B", 3], ["A", 4]])");

Declaration left{"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
{std::move(left_batch)})};
Declaration right{
"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("colVals", utf8()), field("on", int64())}),
{std::move(right_batch)})};
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
Declaration asof_join{
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};

ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));

auto exp_batch =
ExecBatchFromJSON({int64(), utf8()}, R"([[1, "Z"], [2, "Z"], [3, "B"]])");
AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches);
}

} // namespace acero
} // namespace arrow
Loading