Skip to content

Commit

Permalink
ARROW-7233: [C++] Use Result<T> in remaining value-returning IPC APIs
Browse files Browse the repository at this point in the history
There were a number of APIs left.

Closes #6867 from wesm/result-review-ipc-apis

Lead-authored-by: Wes McKinney <wesm+git@apache.org>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
wesm and pitrou committed Apr 9, 2020
1 parent a3f2678 commit 4593754
Show file tree
Hide file tree
Showing 27 changed files with 302 additions and 245 deletions.
10 changes: 4 additions & 6 deletions c_glib/arrow-glib/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,11 @@ garrow_record_batch_file_reader_read_record_batch(GArrowRecordBatchFileReader *r
GError **error)
{
auto arrow_reader = garrow_record_batch_file_reader_get_raw(reader);
std::shared_ptr<arrow::RecordBatch> arrow_record_batch;
auto status = arrow_reader->ReadRecordBatch(i, &arrow_record_batch);
auto arrow_record_batch = arrow_reader->ReadRecordBatch(i);

if (garrow_error_check(error,
status,
"[record-batch-file-reader][read-record-batch]")) {
return garrow_record_batch_new_raw(&arrow_record_batch);
if (garrow::check(error, arrow_record_batch,
"[record-batch-file-reader][read-record-batch]")) {
return garrow_record_batch_new_raw(&(*arrow_record_batch));
} else {
return NULL;
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class IpcScanTask : public ScanTask {
return nullptr;
}

std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader_->ReadRecordBatch(i_++, &batch));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> batch,
reader_->ReadRecordBatch(i_++));
return projector_.Project(*batch, pool_);
}

Expand Down
16 changes: 12 additions & 4 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "arrow/ipc/writer.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -279,7 +280,15 @@ class GrpcIpcMessageReader : public ipc::MessageReader {
stream_(std::move(stream)),
stream_finished_(false) {}

Status ReadNextMessage(std::unique_ptr<ipc::Message>* out) override {
::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
std::unique_ptr<ipc::Message> out;
RETURN_NOT_OK(GetNextMessage(&out));
return out;
}

protected:
Status GetNextMessage(std::unique_ptr<ipc::Message>* out) {
// TODO: Use Result APIs
if (stream_finished_) {
*out = nullptr;
flight_reader_->last_app_metadata_ = nullptr;
Expand All @@ -303,7 +312,6 @@ class GrpcIpcMessageReader : public ipc::MessageReader {
return Status::OK();
}

protected:
Status OverrideWithServerError(Status&& st) {
// Get the gRPC status if not OK, to propagate any server error message
RETURN_NOT_OK(internal::FromGrpcStatus(stream_->Finish(), &rpc_->context));
Expand Down Expand Up @@ -484,8 +492,8 @@ Status GrpcStreamWriter::Open(
std::unique_ptr<GrpcStreamWriter> result(new GrpcStreamWriter(writer));
std::unique_ptr<ipc::internal::IpcPayloadWriter> payload_writer(new DoPutPayloadWriter(
descriptor, std::move(rpc), std::move(response), read_mutex, writer, result.get()));
RETURN_NOT_OK(ipc::internal::OpenRecordBatchWriter(std::move(payload_writer), schema,
&result->batch_writer_));
ARROW_ASSIGN_OR_RAISE(result->batch_writer_, ipc::internal::OpenRecordBatchWriter(
std::move(payload_writer), schema));
*out = std::move(result);
return Status::OK();
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/flight/internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ Status FromProto(const pb::FlightData& pb_data, FlightDescriptor* descriptor,
if (header_buf == nullptr || body_buf == nullptr) {
return Status::UnknownError("Could not create buffers from protobuf");
}
return ipc::Message::Open(header_buf, body_buf, message);
return ipc::Message::Open(header_buf, body_buf).Value(message);
}

// FlightEndpoint
Expand Down Expand Up @@ -466,10 +466,10 @@ Status FromProto(const pb::SchemaResult& pb_result, std::string* result) {

Status SchemaToString(const Schema& schema, std::string* out) {
// TODO(wesm): Do we care about better memory efficiency here?
std::shared_ptr<Buffer> serialized_schema;
ipc::DictionaryMemo unused_dict_memo;
RETURN_NOT_OK(ipc::SerializeSchema(schema, &unused_dict_memo, default_memory_pool(),
&serialized_schema));
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Buffer> serialized_schema,
ipc::SerializeSchema(schema, &unused_dict_memo, default_memory_pool()));
*out = std::string(reinterpret_cast<const char*>(serialized_schema->data()),
static_cast<size_t>(serialized_schema->size()));
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ grpc::Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) {
}

Status FlightData::OpenMessage(std::unique_ptr<ipc::Message>* message) {
return ipc::Message::Open(metadata, body, message);
return ipc::Message::Open(metadata, body).Value(message);
}

// The pointer bitcast hack below causes legitimate warnings, silence them.
Expand Down
15 changes: 11 additions & 4 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,17 @@ class FlightIpcMessageReader : public ipc::MessageReader {
std::shared_ptr<Buffer>* last_metadata)
: reader_(reader), app_metadata_(last_metadata) {}

Status ReadNextMessage(std::unique_ptr<ipc::Message>* out) override {
const FlightDescriptor& descriptor() const { return descriptor_; }

::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
std::unique_ptr<ipc::Message> out;
RETURN_NOT_OK(GetNextMessage(&out));
return out;
}

protected:
Status GetNextMessage(std::unique_ptr<ipc::Message>* out) {
// TODO: Migrate to Result APIs
if (stream_finished_) {
*out = nullptr;
*app_metadata_ = nullptr;
Expand Down Expand Up @@ -131,9 +141,6 @@ class FlightIpcMessageReader : public ipc::MessageReader {
return Status::OK();
}

const FlightDescriptor& descriptor() const { return descriptor_; }

protected:
grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader_;
bool stream_finished_ = false;
bool first_message_ = true;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/feather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ class ReaderV2 : public Reader {
ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(source_, options));
std::vector<std::shared_ptr<RecordBatch>> batches(reader->num_record_batches());
for (int i = 0; i < reader->num_record_batches(); ++i) {
RETURN_NOT_OK(reader->ReadRecordBatch(i, &batches[i]));
ARROW_ASSIGN_OR_RAISE(batches[i], reader->ReadRecordBatch(i));
}

// XXX: Handle included_fields in RecordBatchFileReader::schema
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/ipc/file_to_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ Status ConvertToStream(const char* path) {
ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(in_file.get()));
ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewStreamWriter(&sink, reader->schema()));
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> chunk;
RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> chunk, reader->ReadRecordBatch(i));
RETURN_NOT_OK(writer->WriteRecordBatch(*chunk));
}
return writer->Close();
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/ipc/json_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ static Status ConvertArrowToJson(const std::string& arrow_path,
RETURN_NOT_OK(internal::json::JsonWriter::Open(reader->schema(), &writer));

for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> batch, reader->ReadRecordBatch(i));
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}

Expand Down Expand Up @@ -152,7 +151,7 @@ static Status ValidateArrowVsJson(const std::string& arrow_path,
std::shared_ptr<RecordBatch> json_batch;
for (int i = 0; i < json_nbatches; ++i) {
RETURN_NOT_OK(json_reader->ReadRecordBatch(i, &json_batch));
RETURN_NOT_OK(arrow_reader->ReadRecordBatch(i, &arrow_batch));
ARROW_ASSIGN_OR_RAISE(arrow_batch, arrow_reader->ReadRecordBatch(i));
Status valid_st = json_batch->ValidateFull();
if (!valid_st.ok()) {
return Status::Invalid("JSON record batch ", i, " did not validate:\n",
Expand Down
72 changes: 34 additions & 38 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ namespace ipc {

class Message::MessageImpl {
public:
explicit MessageImpl(const std::shared_ptr<Buffer>& metadata,
const std::shared_ptr<Buffer>& body)
: metadata_(metadata), message_(nullptr), body_(body) {}
explicit MessageImpl(std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body)
: metadata_(std::move(metadata)), message_(nullptr), body_(std::move(body)) {}

Status Open() {
RETURN_NOT_OK(
Expand Down Expand Up @@ -112,15 +111,15 @@ class Message::MessageImpl {
std::shared_ptr<Buffer> body_;
};

Message::Message(const std::shared_ptr<Buffer>& metadata,
const std::shared_ptr<Buffer>& body) {
impl_.reset(new MessageImpl(metadata, body));
Message::Message(std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body) {
impl_.reset(new MessageImpl(std::move(metadata), std::move(body)));
}

Status Message::Open(const std::shared_ptr<Buffer>& metadata,
const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out) {
out->reset(new Message(metadata, body));
return (*out)->impl_->Open();
Result<std::unique_ptr<Message>> Message::Open(std::shared_ptr<Buffer> metadata,
std::shared_ptr<Buffer> body) {
std::unique_ptr<Message> result(new Message(std::move(metadata), std::move(body)));
RETURN_NOT_OK(result->impl_->Open());
return std::move(result);
}

Message::~Message() {}
Expand Down Expand Up @@ -182,8 +181,8 @@ Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_lengt
return Status::OK();
}

Status Message::ReadFrom(std::shared_ptr<Buffer> metadata, io::InputStream* stream,
std::unique_ptr<Message>* out) {
Result<std::unique_ptr<Message>> Message::ReadFrom(std::shared_ptr<Buffer> metadata,
io::InputStream* stream) {
RETURN_NOT_OK(MaybeAlignMetadata(&metadata));
int64_t body_length = -1;
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
Expand All @@ -194,11 +193,12 @@ Status Message::ReadFrom(std::shared_ptr<Buffer> metadata, io::InputStream* stre
" bytes for message body, got ", body->size());
}

return Message::Open(metadata, body, out);
return Message::Open(metadata, body);
}

Status Message::ReadFrom(const int64_t offset, std::shared_ptr<Buffer> metadata,
io::RandomAccessFile* file, std::unique_ptr<Message>* out) {
Result<std::unique_ptr<Message>> Message::ReadFrom(const int64_t offset,
std::shared_ptr<Buffer> metadata,
io::RandomAccessFile* file) {
RETURN_NOT_OK(MaybeAlignMetadata(&metadata));
int64_t body_length = -1;
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
Expand All @@ -209,7 +209,7 @@ Status Message::ReadFrom(const int64_t offset, std::shared_ptr<Buffer> metadata,
" bytes for message body, got ", body->size());
}

return Message::Open(metadata, body, out);
return Message::Open(metadata, body);
}

Status WritePadding(io::OutputStream* stream, int64_t nbytes) {
Expand Down Expand Up @@ -261,8 +261,8 @@ std::string FormatMessageType(Message::Type type) {
return "unknown";
}

Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
std::unique_ptr<Message>* message) {
Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file) {
if (static_cast<size_t>(metadata_length) < sizeof(int32_t)) {
return Status::Invalid("metadata_length should be at least 4");
}
Expand Down Expand Up @@ -309,7 +309,7 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile

std::shared_ptr<Buffer> metadata =
SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size);
return Message::ReadFrom(offset + metadata_length, metadata, file, message);
return Message::ReadFrom(offset + metadata_length, metadata, file);
}

Status AlignStream(io::InputStream* stream, int32_t alignment) {
Expand All @@ -336,9 +336,7 @@ Status CheckAligned(io::FileInterface* stream, int32_t alignment) {
}
}

namespace {

Result<std::unique_ptr<Message>> DoReadMessage(io::InputStream* file, MemoryPool* pool) {
Result<std::unique_ptr<Message>> ReadMessage(io::InputStream* file, MemoryPool* pool) {
int32_t continuation = 0;
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, file->Read(sizeof(int32_t), &continuation));

Expand Down Expand Up @@ -374,19 +372,7 @@ Result<std::unique_ptr<Message>> DoReadMessage(io::InputStream* file, MemoryPool
ARROW_ASSIGN_OR_RAISE(metadata,
Buffer::ViewOrCopy(metadata, CPUDevice::memory_manager(pool)));

std::unique_ptr<Message> message;
RETURN_NOT_OK(Message::ReadFrom(metadata, file, &message));
return std::move(message);
}

} // namespace

Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) {
return DoReadMessage(file, default_memory_pool()).Value(out);
}

Result<std::unique_ptr<Message>> ReadMessage(io::InputStream* file, MemoryPool* pool) {
return DoReadMessage(file, pool);
return Message::ReadFrom(metadata, file);
}

Status WriteMessage(const Buffer& message, const IpcWriteOptions& options,
Expand Down Expand Up @@ -436,9 +422,7 @@ class InputStreamMessageReader : public MessageReader {

~InputStreamMessageReader() {}

Status ReadNextMessage(std::unique_ptr<Message>* message) {
return ReadMessage(stream_, message);
}
Result<std::unique_ptr<Message>> ReadNextMessage() { return ReadMessage(stream_); }

private:
io::InputStream* stream_;
Expand All @@ -454,5 +438,17 @@ std::unique_ptr<MessageReader> MessageReader::Open(
return std::unique_ptr<MessageReader>(new InputStreamMessageReader(owned_stream));
}

// ----------------------------------------------------------------------
// Deprecated functions

Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
std::unique_ptr<Message>* message) {
return ReadMessage(offset, metadata_length, file).Value(message);
}

Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) {
return ReadMessage(file, default_memory_pool()).Value(out);
}

} // namespace ipc
} // namespace arrow
Loading

0 comments on commit 4593754

Please sign in to comment.