-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-17381: [C++][Acero] Centralize error handling in ExecPlan #13848
Conversation
|
9dbb7e2
to
25dbf30
Compare
25dbf30
to
2cdc239
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the cleanup, this is definitely simplifying ExecNode/ExecPlan. I have some initial thoughts.
// COMMIT cd5346e14450d7e5ca156acb4c2f396885c77aa0 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// COMMIT cd5346e14450d7e5ca156acb4c2f396885c77aa0 |
// The task group isn't relevant in synchronous execution mode | ||
if (!exec_context_->executor()) return Future<>::Make(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually this case will go away
// producers precede consumers | ||
sorted_nodes_ = TopoSort(); | ||
|
||
Status st = Status::OK(); | ||
|
||
using rev_it = std::reverse_iterator<NodeVector::iterator>; | ||
for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it != end; ++it) { | ||
auto node = *it; | ||
|
||
EVENT(span_, "StartProducing:" + node->label(), | ||
{{"node.label", node->label()}, {"node.kind_name", node->kind_name()}}); | ||
st = node->StartProducing(); | ||
EVENT(span_, "StartProducing:" + node->label(), {{"status", st.ToString()}}); | ||
for (std::unique_ptr<ExecNode>& n : nodes_) { | ||
Status st = n->StartProducing(); | ||
if (!st.ok()) { | ||
// Stop nodes that successfully started, in reverse order | ||
stopped_ = true; | ||
StopProducingImpl(it.base(), sorted_nodes_.end()); | ||
for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it != it.base(); | ||
++fw_it) { | ||
Future<> fut = (*fw_it)->finished(); | ||
if (!fut.is_finished()) fut.MarkFinished(); | ||
} | ||
Abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the order no longer matter here?
// StartProducing will have added some tasks to the task group. | ||
// Now we end the task group so that as soon as we run out of tasks, | ||
// we've finished executing. | ||
EndTaskGroup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't a more appropriate place to trigger EndTaskGroup
be when InputFinished
is received on all sinks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EndTaskGroup has a nice property that it ends when it runs out of tasks to perform, here's the comment:
/// It is allowed for tasks to be added after this call provided the future has not yet
/// completed. This should be safe as long as the tasks being added are added as part
/// of a task that is tracked. As soon as the count of running tasks reaches 0 this
/// future will be marked complete.
So we will end when all of the tasks have finished running and no new tasks have been scheduled.
if (aborted_) { | ||
for (std::unique_ptr<ExecNode>& node : nodes_) node->Abort(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't we call node->Abort
when we transition to aborted_ = true
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to avoid any possible race conditions while aborting/doing cleanup and running tasks, so it's only safe to Abort when we're sure that no other tasks are running.
START_COMPUTE_SPAN( | ||
span_, std::string(kind_name()) + ":" + label(), | ||
{{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}}); | ||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very happy to see this move into the base class.
ARROW_ASSIGN_OR_RAISE(ExecBatch projected, DoProject(std::move(batch))); | ||
return output_->InputReceived(this, std::move(projected)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point maybe we should just move the body of DoProject
into this method?
Status InputFinished(ExecNode* input, int num_batches) override { | ||
END_SPAN(span_); | ||
return output_->InputFinished(this, num_batches); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be a default implementation for ExecNode::InputFinished
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it probably can be. Actually this span thing is a bit broken right now in general because we don't enforce that InputFinished is called after all batches have been output. InputFinished is merely to specify the total number of batches that will be output, so e.g. in the case of scalar aggregates that output only one row ever, InputFinished
is called in StartProducing
, and so a project above a scalar aggregate node would be ended immediately.
/// Nodes are stopped in topological order, such that any node | ||
/// is stopped before all of its outputs. | ||
void StopProducing(); | ||
void Abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the comment?
|
||
/// \brief A future which will be marked finished when this node has stopped producing. | ||
virtual Future<> finished() { return finished_; } | ||
/// \brief Abort execution and perform any needed cleanup (such as closing files, etc.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does Abort execution
mean for a node? In theory all "execution" is handled via the scheduler so does a node really need to do anything here? Why ExecNode::Abort
instead of doing the cleanup in the ExecNode
destructor?
@zagto do you mind taking a look at this when you get a chance? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work. I love seeing the code becoming cleaner and easier to unterstand.
fut.AddCallback([this](const Status& status) { | ||
if (!status.ok()) { | ||
std::lock_guard<std::mutex> guard(abort_mutex_); | ||
errors_.emplace_back(std::move(status)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this std::move does anything, given that status is a const reference.
Process(); | ||
Status st = Process(); | ||
if (!st.ok()) { | ||
plan_->Abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we get a non-ok status here, would that mean we just abort while discarding the Status/message? This seems confusing to the user. Maybe we could have an ExecPlan::Abort(Status)
that adds the status to ExecPlanImpl::errors_
?
[](const Datum& value) { return value.is_scalar(); })); | ||
|
||
auto values = target.values; | ||
auto values = batch.values; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto values = batch.values; | |
auto values = std::move(batch.values); |
} | ||
|
||
Status Init() override { | ||
RETURN_NOT_OK(ExecNode::Init()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this intentional?
// Source should finish fairly quickly | ||
ASSERT_FINISHES_OK(source->finished()); | ||
SleepABit(); | ||
SleepABit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we 3 calls to SleepABit
? Probably because one may not be enough on slower systems, but I think a comment would be helpful here
1cc334d
to
279bf83
Compare
279bf83
to
1c75db4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you interested in dusting this off and rebasing now that the previous cleanup has merged?
/// \brief Stop producing definitively to a single output | ||
/// | ||
/// This call is a hint that an output node has completed and is not willing | ||
/// to receive any further data. | ||
virtual void StopProducing(ExecNode* output) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've since learned that this is still needed. This covers the case where a LIMIT X node is placed on one branch of a query. It is intended to stop part of the plan but not abort the entire plan. Do you think we can leave it in?
@save-buffer are you interested in rebasing this? |
Closing because it has been untouched for a while, in case it's still relevant feel free to reopen and move it forward 👍 |
No description provided.