Skip to content

Commit

Permalink
ARROW-17052: [C++][Python][FlightRPC] expose flight structures serial…
Browse files Browse the repository at this point in the history
…ize (#13986)

Expose serialize and deserialize for flight structures in C++ and Python: Action, ActionType, Criteria, FlightEndpoint, Result and SchemaResult

Notes of no change on Cython binding for:
* Criteria and PutResult (FlightClient::DoPutResult) aren't exposed directly
* ActionType is implemented with named-tuple

Lead-authored-by: Quang Hoang <quanghgx@gmail.com>
Co-authored-by: Quang Hoang Xuan <quanghgx@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
quanghgx committed Sep 15, 2022
1 parent 5e49174 commit 7cfdfbb
Show file tree
Hide file tree
Showing 7 changed files with 466 additions and 12 deletions.
53 changes: 49 additions & 4 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,60 @@ TEST(FlightTypes, LocationUnknownScheme) {
}

TEST(FlightTypes, RoundTripTypes) {
ActionType action_type{"action-type1", "action-type1-description"};
ASSERT_OK_AND_ASSIGN(std::string action_type_serialized,
action_type.SerializeToString());
ASSERT_OK_AND_ASSIGN(ActionType action_type_deserialized,
ActionType::Deserialize(action_type_serialized));
ASSERT_EQ(action_type, action_type_deserialized);

Criteria criteria{"criteria1"};
ASSERT_OK_AND_ASSIGN(std::string criteria_serialized, criteria.SerializeToString());
ASSERT_OK_AND_ASSIGN(Criteria criteria_deserialized,
Criteria::Deserialize(criteria_serialized));
ASSERT_EQ(criteria, criteria_deserialized);

Action action{"action1", Buffer::FromString("action1-content")};
ASSERT_OK_AND_ASSIGN(std::string action_serialized, action.SerializeToString());
ASSERT_OK_AND_ASSIGN(Action action_deserialized,
Action::Deserialize(action_serialized));
ASSERT_EQ(action, action_deserialized);

Result result{Buffer::FromString("result1-content")};
ASSERT_OK_AND_ASSIGN(std::string result_serialized, result.SerializeToString());
ASSERT_OK_AND_ASSIGN(Result result_deserialized,
Result::Deserialize(result_serialized));
ASSERT_EQ(result, result_deserialized);

BasicAuth basic_auth{"username1", "password1"};
ASSERT_OK_AND_ASSIGN(std::string basic_auth_serialized, basic_auth.SerializeToString());
ASSERT_OK_AND_ASSIGN(BasicAuth basic_auth_deserialized,
BasicAuth::Deserialize(basic_auth_serialized));
ASSERT_EQ(basic_auth, basic_auth_deserialized);

SchemaResult schema_result{"schema_result1"};
ASSERT_OK_AND_ASSIGN(std::string schema_result_serialized,
schema_result.SerializeToString());
ASSERT_OK_AND_ASSIGN(SchemaResult schema_result_deserialized,
SchemaResult::Deserialize(schema_result_serialized));
ASSERT_EQ(schema_result, schema_result_deserialized);

Ticket ticket{"foo"};
ASSERT_OK_AND_ASSIGN(std::string ticket_serialized, ticket.SerializeToString());
ASSERT_OK_AND_ASSIGN(Ticket ticket_deserialized,
Ticket::Deserialize(ticket_serialized));
ASSERT_EQ(ticket.ticket, ticket_deserialized.ticket);
ASSERT_EQ(ticket, ticket_deserialized);

FlightDescriptor desc = FlightDescriptor::Command("select * from foo;");
ASSERT_OK_AND_ASSIGN(std::string desc_serialized, desc.SerializeToString());
ASSERT_OK_AND_ASSIGN(FlightDescriptor desc_deserialized,
FlightDescriptor::Deserialize(desc_serialized));
ASSERT_TRUE(desc.Equals(desc_deserialized));
ASSERT_EQ(desc, desc_deserialized);

desc = FlightDescriptor::Path({"a", "b", "test.arrow"});
ASSERT_OK_AND_ASSIGN(desc_serialized, desc.SerializeToString());
ASSERT_OK_AND_ASSIGN(desc_deserialized, FlightDescriptor::Deserialize(desc_serialized));
ASSERT_TRUE(desc.Equals(desc_deserialized));
ASSERT_EQ(desc, desc_deserialized);

FlightInfo::Data data;
std::shared_ptr<Schema> schema =
Expand All @@ -114,10 +152,17 @@ TEST(FlightTypes, RoundTripTypes) {
ASSERT_OK_AND_ASSIGN(std::string info_serialized, info->SerializeToString());
ASSERT_OK_AND_ASSIGN(std::unique_ptr<FlightInfo> info_deserialized,
FlightInfo::Deserialize(info_serialized));
ASSERT_TRUE(info->descriptor().Equals(info_deserialized->descriptor()));
ASSERT_EQ(info->descriptor(), info_deserialized->descriptor());
ASSERT_EQ(info->endpoints(), info_deserialized->endpoints());
ASSERT_EQ(info->total_records(), info_deserialized->total_records());
ASSERT_EQ(info->total_bytes(), info_deserialized->total_bytes());

FlightEndpoint flight_endpoint{ticket, {location1, location2}};
ASSERT_OK_AND_ASSIGN(std::string flight_endpoint_serialized,
flight_endpoint.SerializeToString());
ASSERT_OK_AND_ASSIGN(FlightEndpoint flight_endpoint_deserialized,
FlightEndpoint::Deserialize(flight_endpoint_serialized));
ASSERT_EQ(flight_endpoint, flight_endpoint_deserialized);
}

TEST(FlightTypes, RoundtripStatus) {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/flight/serialization_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Status FromProto(const pb::SchemaResult& pb_result, std::string* result);
Status FromProto(const pb::BasicAuth& pb_basic_auth, BasicAuth* info);

Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr);
Status ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint);
Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info);
Status ToProto(const ActionType& type, pb::ActionType* pb_type);
Status ToProto(const Action& action, pb::Action* pb_action);
Expand Down
185 changes: 181 additions & 4 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,42 @@ Status SchemaResult::GetSchema(ipc::DictionaryMemo* dictionary_memo,
return GetSchema(dictionary_memo).Value(out);
}

bool SchemaResult::Equals(const SchemaResult& other) const {
return raw_schema_ == other.raw_schema_;
}

arrow::Result<std::string> SchemaResult::SerializeToString() const {
pb::SchemaResult pb_schema_result;
RETURN_NOT_OK(internal::ToProto(*this, &pb_schema_result));

std::string out;
if (!pb_schema_result.SerializeToString(&out)) {
return Status::IOError("Serialized SchemaResult exceeded 2 GiB limit");
}
return out;
}

arrow::Result<SchemaResult> SchemaResult::Deserialize(
arrow::util::string_view serialized) {
pb::SchemaResult pb_schema_result;
if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
return Status::Invalid("Serialized SchemaResult size should not exceed 2 GiB");
}
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_schema_result.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid SchemaResult");
}
return SchemaResult{pb_schema_result.schema()};
}

arrow::Result<std::string> FlightDescriptor::SerializeToString() const {
pb::FlightDescriptor pb_descriptor;
RETURN_NOT_OK(internal::ToProto(*this, &pb_descriptor));

std::string out;
if (!pb_descriptor.SerializeToString(&out)) {
return Status::IOError("Serialized descriptor exceeded 2 GiB limit");
return Status::IOError("Serialized FlightDescriptor exceeded 2 GiB limit");
}
return out;
}
Expand All @@ -186,7 +215,7 @@ arrow::Result<FlightDescriptor> FlightDescriptor::Deserialize(
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_descriptor.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid descriptor");
return Status::Invalid("Not a valid FlightDescriptor");
}
FlightDescriptor out;
RETURN_NOT_OK(internal::FromProto(pb_descriptor, &out));
Expand All @@ -206,7 +235,7 @@ arrow::Result<std::string> Ticket::SerializeToString() const {

std::string out;
if (!pb_ticket.SerializeToString(&out)) {
return Status::IOError("Serialized ticket exceeded 2 GiB limit");
return Status::IOError("Serialized Ticket exceeded 2 GiB limit");
}
return out;
}
Expand All @@ -223,7 +252,7 @@ arrow::Result<Ticket> Ticket::Deserialize(arrow::util::string_view serialized) {
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_ticket.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid ticket");
return Status::Invalid("Not a valid Ticket");
}
Ticket out;
RETURN_NOT_OK(internal::FromProto(pb_ticket, &out));
Expand Down Expand Up @@ -370,10 +399,154 @@ bool FlightEndpoint::Equals(const FlightEndpoint& other) const {
return ticket == other.ticket && locations == other.locations;
}

arrow::Result<std::string> FlightEndpoint::SerializeToString() const {
pb::FlightEndpoint pb_flight_endpoint;
RETURN_NOT_OK(internal::ToProto(*this, &pb_flight_endpoint));

std::string out;
if (!pb_flight_endpoint.SerializeToString(&out)) {
return Status::IOError("Serialized FlightEndpoint exceeded 2 GiB limit");
}
return out;
}

arrow::Result<FlightEndpoint> FlightEndpoint::Deserialize(
arrow::util::string_view serialized) {
pb::FlightEndpoint pb_flight_endpoint;
if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
return Status::Invalid("Serialized FlightEndpoint size should not exceed 2 GiB");
}
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_flight_endpoint.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid FlightEndpoint");
}
FlightEndpoint out;
RETURN_NOT_OK(internal::FromProto(pb_flight_endpoint, &out));
return out;
}

bool ActionType::Equals(const ActionType& other) const {
return type == other.type && description == other.description;
}

arrow::Result<std::string> ActionType::SerializeToString() const {
pb::ActionType pb_action_type;
RETURN_NOT_OK(internal::ToProto(*this, &pb_action_type));

std::string out;
if (!pb_action_type.SerializeToString(&out)) {
return Status::IOError("Serialized ActionType exceeded 2 GiB limit");
}
return out;
}

arrow::Result<ActionType> ActionType::Deserialize(arrow::util::string_view serialized) {
pb::ActionType pb_action_type;
if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
return Status::Invalid("Serialized ActionType size should not exceed 2 GiB");
}
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_action_type.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid ActionType");
}
ActionType out;
RETURN_NOT_OK(internal::FromProto(pb_action_type, &out));
return out;
}

bool Criteria::Equals(const Criteria& other) const {
return expression == other.expression;
}

arrow::Result<std::string> Criteria::SerializeToString() const {
pb::Criteria pb_criteria;
RETURN_NOT_OK(internal::ToProto(*this, &pb_criteria));

std::string out;
if (!pb_criteria.SerializeToString(&out)) {
return Status::IOError("Serialized Criteria exceeded 2 GiB limit");
}
return out;
}

arrow::Result<Criteria> Criteria::Deserialize(arrow::util::string_view serialized) {
pb::Criteria pb_criteria;
if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
return Status::Invalid("Serialized Criteria size should not exceed 2 GiB");
}
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_criteria.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid Criteria");
}
Criteria out;
RETURN_NOT_OK(internal::FromProto(pb_criteria, &out));
return out;
}

bool Action::Equals(const Action& other) const {
return (type == other.type) &&
((body == other.body) || (body && other.body && body->Equals(*other.body)));
}

arrow::Result<std::string> Action::SerializeToString() const {
pb::Action pb_action;
RETURN_NOT_OK(internal::ToProto(*this, &pb_action));

std::string out;
if (!pb_action.SerializeToString(&out)) {
return Status::IOError("Serialized Action exceeded 2 GiB limit");
}
return out;
}

arrow::Result<Action> Action::Deserialize(arrow::util::string_view serialized) {
pb::Action pb_action;
if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
return Status::Invalid("Serialized Action size should not exceed 2 GiB");
}
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_action.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid Action");
}
Action out;
RETURN_NOT_OK(internal::FromProto(pb_action, &out));
return out;
}

bool Result::Equals(const Result& other) const {
return (body == other.body) || (body && other.body && body->Equals(*other.body));
}

arrow::Result<std::string> Result::SerializeToString() const {
pb::Result pb_result;
RETURN_NOT_OK(internal::ToProto(*this, &pb_result));

std::string out;
if (!pb_result.SerializeToString(&out)) {
return Status::IOError("Serialized Result exceeded 2 GiB limit");
}
return out;
}

arrow::Result<Result> Result::Deserialize(arrow::util::string_view serialized) {
pb::Result pb_result;
if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
return Status::Invalid("Serialized Result size should not exceed 2 GiB");
}
google::protobuf::io::ArrayInputStream input(serialized.data(),
static_cast<int>(serialized.size()));
if (!pb_result.ParseFromZeroCopyStream(&input)) {
return Status::Invalid("Not a valid Result");
}
Result out;
RETURN_NOT_OK(internal::FromProto(pb_result, &out));
return out;
}

Status ResultStream::Next(std::unique_ptr<Result>* info) { return Next().Value(info); }

Status MetadataRecordBatchReader::Next(FlightStreamChunk* next) {
Expand Down Expand Up @@ -468,6 +641,10 @@ arrow::Result<std::unique_ptr<Result>> SimpleResultStream::Next() {
return std::unique_ptr<Result>(new Result(std::move(results_[position_++])));
}

bool BasicAuth::Equals(const BasicAuth& other) const {
return (username == other.username) && (password == other.password);
}

arrow::Result<BasicAuth> BasicAuth::Deserialize(arrow::util::string_view serialized) {
pb::BasicAuth pb_result;
if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
Expand Down
Loading

0 comments on commit 7cfdfbb

Please sign in to comment.