Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-7233: [C++] Use Result<T> in remaining value-returning IPC APIs #6867

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions c_glib/arrow-glib/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,11 @@ garrow_record_batch_file_reader_read_record_batch(GArrowRecordBatchFileReader *r
GError **error)
{
auto arrow_reader = garrow_record_batch_file_reader_get_raw(reader);
std::shared_ptr<arrow::RecordBatch> arrow_record_batch;
auto status = arrow_reader->ReadRecordBatch(i, &arrow_record_batch);
auto arrow_record_batch = arrow_reader->ReadRecordBatch(i);

if (garrow_error_check(error,
status,
"[record-batch-file-reader][read-record-batch]")) {
return garrow_record_batch_new_raw(&arrow_record_batch);
if (garrow::check(error, arrow_record_batch,
"[record-batch-file-reader][read-record-batch]")) {
return garrow_record_batch_new_raw(&(*arrow_record_batch));
} else {
return NULL;
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class IpcScanTask : public ScanTask {
return nullptr;
}

std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader_->ReadRecordBatch(i_++, &batch));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> batch,
reader_->ReadRecordBatch(i_++));
return projector_.Project(*batch, pool_);
}

Expand Down
16 changes: 12 additions & 4 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "arrow/ipc/writer.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -279,7 +280,15 @@ class GrpcIpcMessageReader : public ipc::MessageReader {
stream_(std::move(stream)),
stream_finished_(false) {}

Status ReadNextMessage(std::unique_ptr<ipc::Message>* out) override {
::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
std::unique_ptr<ipc::Message> out;
RETURN_NOT_OK(GetNextMessage(&out));
return out;
}

protected:
Status GetNextMessage(std::unique_ptr<ipc::Message>* out) {
// TODO: Use Result APIs
if (stream_finished_) {
*out = nullptr;
flight_reader_->last_app_metadata_ = nullptr;
Expand All @@ -303,7 +312,6 @@ class GrpcIpcMessageReader : public ipc::MessageReader {
return Status::OK();
}

protected:
Status OverrideWithServerError(Status&& st) {
// Get the gRPC status if not OK, to propagate any server error message
RETURN_NOT_OK(internal::FromGrpcStatus(stream_->Finish(), &rpc_->context));
Expand Down Expand Up @@ -484,8 +492,8 @@ Status GrpcStreamWriter::Open(
std::unique_ptr<GrpcStreamWriter> result(new GrpcStreamWriter(writer));
std::unique_ptr<ipc::internal::IpcPayloadWriter> payload_writer(new DoPutPayloadWriter(
descriptor, std::move(rpc), std::move(response), read_mutex, writer, result.get()));
RETURN_NOT_OK(ipc::internal::OpenRecordBatchWriter(std::move(payload_writer), schema,
&result->batch_writer_));
ARROW_ASSIGN_OR_RAISE(result->batch_writer_, ipc::internal::OpenRecordBatchWriter(
std::move(payload_writer), schema));
*out = std::move(result);
return Status::OK();
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/flight/internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ Status FromProto(const pb::FlightData& pb_data, FlightDescriptor* descriptor,
if (header_buf == nullptr || body_buf == nullptr) {
return Status::UnknownError("Could not create buffers from protobuf");
}
return ipc::Message::Open(header_buf, body_buf, message);
return ipc::Message::Open(header_buf, body_buf).Value(message);
}

// FlightEndpoint
Expand Down Expand Up @@ -466,10 +466,10 @@ Status FromProto(const pb::SchemaResult& pb_result, std::string* result) {

Status SchemaToString(const Schema& schema, std::string* out) {
// TODO(wesm): Do we care about better memory efficiency here?
std::shared_ptr<Buffer> serialized_schema;
ipc::DictionaryMemo unused_dict_memo;
RETURN_NOT_OK(ipc::SerializeSchema(schema, &unused_dict_memo, default_memory_pool(),
&serialized_schema));
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Buffer> serialized_schema,
ipc::SerializeSchema(schema, &unused_dict_memo, default_memory_pool()));
*out = std::string(reinterpret_cast<const char*>(serialized_schema->data()),
static_cast<size_t>(serialized_schema->size()));
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ grpc::Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) {
}

Status FlightData::OpenMessage(std::unique_ptr<ipc::Message>* message) {
return ipc::Message::Open(metadata, body, message);
return ipc::Message::Open(metadata, body).Value(message);
}

// The pointer bitcast hack below causes legitimate warnings, silence them.
Expand Down
15 changes: 11 additions & 4 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,17 @@ class FlightIpcMessageReader : public ipc::MessageReader {
std::shared_ptr<Buffer>* last_metadata)
: reader_(reader), app_metadata_(last_metadata) {}

Status ReadNextMessage(std::unique_ptr<ipc::Message>* out) override {
const FlightDescriptor& descriptor() const { return descriptor_; }

::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
std::unique_ptr<ipc::Message> out;
RETURN_NOT_OK(GetNextMessage(&out));
return out;
}

protected:
Status GetNextMessage(std::unique_ptr<ipc::Message>* out) {
// TODO: Migrate to Result APIs
if (stream_finished_) {
*out = nullptr;
*app_metadata_ = nullptr;
Expand Down Expand Up @@ -131,9 +141,6 @@ class FlightIpcMessageReader : public ipc::MessageReader {
return Status::OK();
}

const FlightDescriptor& descriptor() const { return descriptor_; }

protected:
grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader_;
bool stream_finished_ = false;
bool first_message_ = true;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/feather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ class ReaderV2 : public Reader {
ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(source_, options));
std::vector<std::shared_ptr<RecordBatch>> batches(reader->num_record_batches());
for (int i = 0; i < reader->num_record_batches(); ++i) {
RETURN_NOT_OK(reader->ReadRecordBatch(i, &batches[i]));
ARROW_ASSIGN_OR_RAISE(batches[i], reader->ReadRecordBatch(i));
}

// XXX: Handle included_fields in RecordBatchFileReader::schema
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/ipc/file_to_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ Status ConvertToStream(const char* path) {
ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(in_file.get()));
ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewStreamWriter(&sink, reader->schema()));
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> chunk;
RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> chunk, reader->ReadRecordBatch(i));
RETURN_NOT_OK(writer->WriteRecordBatch(*chunk));
}
return writer->Close();
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/ipc/json_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ static Status ConvertArrowToJson(const std::string& arrow_path,
RETURN_NOT_OK(internal::json::JsonWriter::Open(reader->schema(), &writer));

for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> batch, reader->ReadRecordBatch(i));
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}

Expand Down Expand Up @@ -152,7 +151,7 @@ static Status ValidateArrowVsJson(const std::string& arrow_path,
std::shared_ptr<RecordBatch> json_batch;
for (int i = 0; i < json_nbatches; ++i) {
RETURN_NOT_OK(json_reader->ReadRecordBatch(i, &json_batch));
RETURN_NOT_OK(arrow_reader->ReadRecordBatch(i, &arrow_batch));
ARROW_ASSIGN_OR_RAISE(arrow_batch, arrow_reader->ReadRecordBatch(i));
Status valid_st = json_batch->ValidateFull();
if (!valid_st.ok()) {
return Status::Invalid("JSON record batch ", i, " did not validate:\n",
Expand Down
72 changes: 34 additions & 38 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ namespace ipc {

class Message::MessageImpl {
public:
explicit MessageImpl(const std::shared_ptr<Buffer>& metadata,
const std::shared_ptr<Buffer>& body)
: metadata_(metadata), message_(nullptr), body_(body) {}
explicit MessageImpl(std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body)
: metadata_(std::move(metadata)), message_(nullptr), body_(std::move(body)) {}

Status Open() {
RETURN_NOT_OK(
Expand Down Expand Up @@ -112,15 +111,15 @@ class Message::MessageImpl {
std::shared_ptr<Buffer> body_;
};

Message::Message(const std::shared_ptr<Buffer>& metadata,
const std::shared_ptr<Buffer>& body) {
impl_.reset(new MessageImpl(metadata, body));
Message::Message(std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body) {
impl_.reset(new MessageImpl(std::move(metadata), std::move(body)));
}

Status Message::Open(const std::shared_ptr<Buffer>& metadata,
const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out) {
out->reset(new Message(metadata, body));
return (*out)->impl_->Open();
Result<std::unique_ptr<Message>> Message::Open(std::shared_ptr<Buffer> metadata,
std::shared_ptr<Buffer> body) {
std::unique_ptr<Message> result(new Message(std::move(metadata), std::move(body)));
RETURN_NOT_OK(result->impl_->Open());
return result;
}

Message::~Message() {}
Expand Down Expand Up @@ -182,8 +181,8 @@ Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_lengt
return Status::OK();
}

Status Message::ReadFrom(std::shared_ptr<Buffer> metadata, io::InputStream* stream,
std::unique_ptr<Message>* out) {
Result<std::unique_ptr<Message>> Message::ReadFrom(std::shared_ptr<Buffer> metadata,
io::InputStream* stream) {
RETURN_NOT_OK(MaybeAlignMetadata(&metadata));
int64_t body_length = -1;
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
Expand All @@ -194,11 +193,12 @@ Status Message::ReadFrom(std::shared_ptr<Buffer> metadata, io::InputStream* stre
" bytes for message body, got ", body->size());
}

return Message::Open(metadata, body, out);
return Message::Open(metadata, body);
}

Status Message::ReadFrom(const int64_t offset, std::shared_ptr<Buffer> metadata,
io::RandomAccessFile* file, std::unique_ptr<Message>* out) {
Result<std::unique_ptr<Message>> Message::ReadFrom(const int64_t offset,
std::shared_ptr<Buffer> metadata,
io::RandomAccessFile* file) {
RETURN_NOT_OK(MaybeAlignMetadata(&metadata));
int64_t body_length = -1;
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
Expand All @@ -209,7 +209,7 @@ Status Message::ReadFrom(const int64_t offset, std::shared_ptr<Buffer> metadata,
" bytes for message body, got ", body->size());
}

return Message::Open(metadata, body, out);
return Message::Open(metadata, body);
}

Status WritePadding(io::OutputStream* stream, int64_t nbytes) {
Expand Down Expand Up @@ -261,8 +261,8 @@ std::string FormatMessageType(Message::Type type) {
return "unknown";
}

Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
std::unique_ptr<Message>* message) {
Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file) {
if (static_cast<size_t>(metadata_length) < sizeof(int32_t)) {
return Status::Invalid("metadata_length should be at least 4");
}
Expand Down Expand Up @@ -309,7 +309,7 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile

std::shared_ptr<Buffer> metadata =
SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size);
return Message::ReadFrom(offset + metadata_length, metadata, file, message);
return Message::ReadFrom(offset + metadata_length, metadata, file);
}

Status AlignStream(io::InputStream* stream, int32_t alignment) {
Expand All @@ -336,9 +336,7 @@ Status CheckAligned(io::FileInterface* stream, int32_t alignment) {
}
}

namespace {

Result<std::unique_ptr<Message>> DoReadMessage(io::InputStream* file, MemoryPool* pool) {
Result<std::unique_ptr<Message>> ReadMessage(io::InputStream* file, MemoryPool* pool) {
int32_t continuation = 0;
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, file->Read(sizeof(int32_t), &continuation));

Expand Down Expand Up @@ -374,19 +372,7 @@ Result<std::unique_ptr<Message>> DoReadMessage(io::InputStream* file, MemoryPool
ARROW_ASSIGN_OR_RAISE(metadata,
Buffer::ViewOrCopy(metadata, CPUDevice::memory_manager(pool)));

std::unique_ptr<Message> message;
RETURN_NOT_OK(Message::ReadFrom(metadata, file, &message));
return std::move(message);
}

} // namespace

Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) {
return DoReadMessage(file, default_memory_pool()).Value(out);
}

Result<std::unique_ptr<Message>> ReadMessage(io::InputStream* file, MemoryPool* pool) {
return DoReadMessage(file, pool);
return Message::ReadFrom(metadata, file);
}

Status WriteMessage(const Buffer& message, const IpcWriteOptions& options,
Expand Down Expand Up @@ -436,9 +422,7 @@ class InputStreamMessageReader : public MessageReader {

~InputStreamMessageReader() {}

Status ReadNextMessage(std::unique_ptr<Message>* message) {
return ReadMessage(stream_, message);
}
Result<std::unique_ptr<Message>> ReadNextMessage() { return ReadMessage(stream_); }

private:
io::InputStream* stream_;
Expand All @@ -454,5 +438,17 @@ std::unique_ptr<MessageReader> MessageReader::Open(
return std::unique_ptr<MessageReader>(new InputStreamMessageReader(owned_stream));
}

// ----------------------------------------------------------------------
// Deprecated functions

Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
std::unique_ptr<Message>* message) {
return ReadMessage(offset, metadata_length, file).Value(message);
}

Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) {
return ReadMessage(file, default_memory_pool()).Value(out);
}

} // namespace ipc
} // namespace arrow
Loading