-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
Describe the bug, including details regarding any error messages, version, and platform.
Current Behavior
The SourceNode is affected by backpressure. This means currently, that a SourceNode stops scheduling tasks once PauseProducing is invoked and awaits a future. Once that future is marked finished, execution continues. This means that any job that has been scheduled is not affected by backpressure.
Example
My SourceNode gets data, sends it to the MapNode which decompresses the data, and sends it to the SinkNode that streams it out.
The generator in my SourceNode creates an ExecBatch without awaiting any other futures:
arrow::Future<std::optional<arrow::ExecBatch>> operator()() {
try {
return produceNextBatch();
} catch (const std::exception& exception) {
return arrow::Status::ExecutionError(exception.what());
}
};
Here, I noticed that this SourceNode generates and schedules all batches for propagation through the ExecPlan, before the first result is ever consumed.
Backpressure that eventually gets applied by the SinkNode after receiving the first few batches doesn't stop any of these scheduled tasks from starting. Once these start, they will be decompressed and will create a large memory spike.
I hoped that backpressure could prevent this memory spike.
Why does this happen
The limitation of the current backpressure implementation is that the SourceNode only checks backpressure when scheduling..
return generator_().Then(
[this](
const std::optional<ExecBatch>& morsel_or_end) -> Future<ControlFlow<int>> {
std::unique_lock<std::mutex> lock(mutex_);
if (IsIterationEnd(morsel_or_end) || stop_requested_) {
return Break(batch_count_);
}
lock.unlock();
SliceAndDeliverMorsel(*morsel_or_end);
lock.lock();
if (!backpressure_future_.is_finished()) {
EVENT_ON_CURRENT_SPAN("SourceNode::BackpressureApplied");
return backpressure_future_.Then(
[]() -> ControlFlow<int> { return Continue(); });
}
return Future<ControlFlow<int>>::MakeFinished(Continue());
}
.. not when the scheduled tasks are eventually executed in SliceAndDeliverMorsel:
plan_->query_context()->ScheduleTask(
[this, morsel_length, use_legacy_batching, initial_batch_index, morsel,
has_ordering = !ordering_.is_unordered()]() {
int64_t offset = 0;
int batch_index = initial_batch_index;
do {
int64_t batch_size =
std::min<int64_t>(morsel_length - offset, ExecPlan::kMaxBatchSize);
// In order for the legacy batching model to work we must
// not slice batches from the source
if (use_legacy_batching) {
batch_size = morsel_length;
}
ExecBatch batch = morsel.Slice(offset, batch_size);
UnalignedBufferHandling unaligned_buffer_handling =
plan_->query_context()->options().unaligned_buffer_handling.value_or(
GetDefaultUnalignedBufferHandling());
ARROW_RETURN_NOT_OK(
HandleUnalignedBuffers(&batch, unaligned_buffer_handling));
if (has_ordering) {
batch.index = batch_index;
}
offset += batch_size;
batch_index++;
ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch)));
} while (offset < morsel.length);
return Status::OK();
},
"SourceNode::ProcessMorsel");
Possible mitigation
The solution could be to make the lambda scheduled by the SourceNode susceptible to backpressure by awaiting the corresponding future backpressure_future_.
I also saw that there exists a ThrottledAsyncTaskScheduler that performs similar tasks, but using this might be dangerous regarding the introduction of dead-locks.
I would also be interested whether there are other ways to circumvent the memory spikes in such cases.
Component(s)
C++