-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-15374: [C++][FlightRPC] Add support for MemoryManager in data methods #12239
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if (ARROW_PREDICT_FALSE(!memory_manager_->is_cpu() && data->body)) { | ||
ARROW_ASSIGN_OR_RAISE(data->body, Buffer::ViewOrCopy(data->body, memory_manager_)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Does it make sense to move this code inside peekable_reader_->Next()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could, it does make error reporting more complicated though since PeekableReader doesn't have a way to report errors.
return Status::Invalid("Expected buffer on device: ", device.ToString(), | ||
". Was allocated on device: ", buffer->device()->ToString()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also recurse into child data here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I realize the test might only use primitive data, in which case it doesn't matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a guard against this.
|
||
FlightStreamChunk chunk; | ||
ASSERT_OK(reader->Next(&chunk)); | ||
for (const auto& column : chunk.data->columns()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check the contents of the returned data here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I added a helper to do this.
// This will likely lead to abort as gRPC cannot recover from an error here | ||
return ToGrpcStatus(status); | ||
} | ||
slices.push_back(slice); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(not sure how grpc::Slice
behaves wrt copies)
slices.push_back(slice); | |
slices.push_back(std::move(slice)); |
Benchmark runs are scheduled for baseline = 690e22f and contender = a7f518c. a7f518c is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
This enables using Flight with something like CUDA without having to manually copy data. Given the work around UCX on the mailing list, this would enable alternative backends to optimize based on where data is allocated.