Skip to content

Commit

Permalink
apacheGH-34852: [C++][Go][Java][FlightRPC] Add support for ordered da…
Browse files Browse the repository at this point in the history
…ta (apache#35178)

### Rationale for this change

No ordering is unnecessarily limiting.  Systems can and do implement distributed sorts, but they can’t reflect this in Flight RPC.

### What changes are included in this PR?

These changes add `FlightInfo.ordered`.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.

**This PR includes breaking changes to public APIs.**

* Closes: apache#34852
* Closes: apache#35085

Lead-authored-by: Sutou Kouhei <kou@clear-code.com>
Co-authored-by: David Li <li.davidm96@gmail.com>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
4 people authored and ArgusLi committed May 15, 2023
1 parent 2f707e2 commit 39c4c8c
Show file tree
Hide file tree
Showing 24 changed files with 751 additions and 215 deletions.
21 changes: 11 additions & 10 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,26 +213,27 @@ TEST(FlightTypes, FlightInfo) {
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}};
auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}};
std::vector<FlightInfo> values = {
MakeFlightInfo(schema1, desc1, {}, -1, -1),
MakeFlightInfo(schema1, desc2, {}, -1, -1),
MakeFlightInfo(schema2, desc1, {}, -1, -1),
MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42),
MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1),
MakeFlightInfo(schema1, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc2, {}, -1, -1, true),
MakeFlightInfo(schema2, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42, true),
MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1, false),
};
std::vector<std::string> reprs = {
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[] total_records=-1 total_bytes=-1>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>] "
"total_records=-1 total_bytes=42>",
"total_records=-1 total_bytes=42 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>, "
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1>",
"[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1 "
"ordered=false>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ TEST(FlightIntegration, AuthBasicProto) { ASSERT_OK(RunScenario("auth:basic_prot

TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); }

TEST(FlightIntegration, Ordered) { ASSERT_OK(RunScenario("ordered")); }

TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }

TEST(FlightIntegration, FlightSqlExtension) {
Expand Down
156 changes: 149 additions & 7 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "arrow/array/array_binary.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/array_primitive.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/flight/client_middleware.h"
#include "arrow/flight/server_middleware.h"
#include "arrow/flight/sql/client.h"
Expand All @@ -37,6 +38,8 @@
#include "arrow/flight/types.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/table_builder.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/checked_cast.h"

Expand Down Expand Up @@ -210,8 +213,8 @@ class MiddlewareServer : public FlightServerBase {
// Return a fake location - the test doesn't read it
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 10010));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{"foo"}, {location}}};
ARROW_ASSIGN_OR_RAISE(auto info,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
ARROW_ASSIGN_OR_RAISE(
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
*result = std::make_unique<FlightInfo>(info);
return Status::OK();
}
Expand Down Expand Up @@ -271,6 +274,142 @@ class MiddlewareScenario : public Scenario {
std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
};

/// \brief The server used for testing FlightInfo.ordered.
///
/// If the given command is "ordered", the server sets
/// FlightInfo.ordered. The client that supports FlightInfo.ordered
/// must read data from endpoints from front to back. The client that
/// doesn't support FlightInfo.ordered may read data from endpoints in
/// random order.
///
/// This scenario is passed only when the client supports
/// FlightInfo.ordered.
class OrderedServer : public FlightServerBase {
Status GetFlightInfo(const ServerCallContext& context,
const FlightDescriptor& descriptor,
std::unique_ptr<FlightInfo>* result) override {
const auto ordered = (descriptor.type == FlightDescriptor::DescriptorType::CMD &&
descriptor.cmd == "ordered");
auto schema = BuildSchema();
std::vector<FlightEndpoint> endpoints;
if (ordered) {
endpoints.push_back(FlightEndpoint{{"1"}, {}});
endpoints.push_back(FlightEndpoint{{"2"}, {}});
endpoints.push_back(FlightEndpoint{{"3"}, {}});
} else {
endpoints.push_back(FlightEndpoint{{"1"}, {}});
endpoints.push_back(FlightEndpoint{{"3"}, {}});
endpoints.push_back(FlightEndpoint{{"2"}, {}});
}
ARROW_ASSIGN_OR_RAISE(
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, ordered));
*result = std::make_unique<FlightInfo>(info);
return Status::OK();
}

Status DoGet(const ServerCallContext& context, const Ticket& request,
std::unique_ptr<FlightDataStream>* stream) override {
ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
BuildSchema(), arrow::default_memory_pool()));
auto number_builder = builder->GetFieldAs<Int32Builder>(0);
if (request.ticket == "1") {
ARROW_RETURN_NOT_OK(number_builder->Append(1));
ARROW_RETURN_NOT_OK(number_builder->Append(2));
ARROW_RETURN_NOT_OK(number_builder->Append(3));
} else if (request.ticket == "2") {
ARROW_RETURN_NOT_OK(number_builder->Append(10));
ARROW_RETURN_NOT_OK(number_builder->Append(20));
ARROW_RETURN_NOT_OK(number_builder->Append(30));
} else if (request.ticket == "3") {
ARROW_RETURN_NOT_OK(number_builder->Append(100));
ARROW_RETURN_NOT_OK(number_builder->Append(200));
ARROW_RETURN_NOT_OK(number_builder->Append(300));
} else {
return Status::KeyError("Could not find flight: ", request.ticket);
}
ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
RecordBatchReader::Make(record_batches));
*stream = std::make_unique<RecordBatchStream>(record_batch_reader);
return Status::OK();
}

private:
std::shared_ptr<Schema> BuildSchema() {
return arrow::schema({arrow::field("number", arrow::int32(), false)});
}
};

/// \brief The ordered scenario.
///
/// This tests that the server and client get expected header values.
class OrderedScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
server->reset(new OrderedServer());
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

Status RunClient(std::unique_ptr<FlightClient> client) override {
ARROW_ASSIGN_OR_RAISE(auto info,
client->GetFlightInfo(FlightDescriptor::Command("ordered")));
if (!info->ordered()) {
return Status::Invalid("Server must return FlightInfo.ordered = true");
}
std::vector<std::shared_ptr<arrow::Table>> tables;
for (const auto& endpoint : info->endpoints()) {
if (!endpoint.locations.empty()) {
std::stringstream ss;
ss << "[";
for (const auto& location : endpoint.locations) {
if (ss.str().size() != 1) {
ss << ", ";
}
ss << location.ToString();
}
ss << "]";
return Status::Invalid(
"Expected to receive empty locations to use the original service: ",
ss.str());
}
ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
tables.push_back(table);
}
ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(tables));

// Build expected table
auto schema = arrow::schema({arrow::field("number", arrow::int32(), false)});
ARROW_ASSIGN_OR_RAISE(auto builder,
RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
auto number_builder = builder->GetFieldAs<Int32Builder>(0);
ARROW_RETURN_NOT_OK(number_builder->Append(1));
ARROW_RETURN_NOT_OK(number_builder->Append(2));
ARROW_RETURN_NOT_OK(number_builder->Append(3));
ARROW_RETURN_NOT_OK(number_builder->Append(10));
ARROW_RETURN_NOT_OK(number_builder->Append(20));
ARROW_RETURN_NOT_OK(number_builder->Append(30));
ARROW_RETURN_NOT_OK(number_builder->Append(100));
ARROW_RETURN_NOT_OK(number_builder->Append(200));
ARROW_RETURN_NOT_OK(number_builder->Append(300));
ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
expected_record_batch};
ARROW_ASSIGN_OR_RAISE(auto expected_table,
Table::FromRecordBatches(expected_record_batches));

// Check read data
if (!table->Equals(*expected_table)) {
return Status::Invalid("Read data isn't expected\n", "Expected:\n",
expected_table->ToString(), "Actual:\n", table->ToString());
}
return Status::OK();
}
};

/// \brief Schema to be returned for mocking the statement/prepared statement results.
///
/// Must be the same across all languages.
Expand Down Expand Up @@ -382,8 +521,8 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
}
ARROW_ASSIGN_OR_RAISE(auto handle, sql::CreateStatementQueryTicket(ticket));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
return std::make_unique<FlightInfo>(result);
}

Expand All @@ -407,8 +546,8 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
}
ARROW_ASSIGN_OR_RAISE(auto handle, sql::CreateStatementQueryTicket(ticket));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
return std::make_unique<FlightInfo>(result);
}

Expand Down Expand Up @@ -851,7 +990,7 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down Expand Up @@ -1330,6 +1469,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "middleware") {
*out = std::make_shared<MiddlewareScenario>();
return Status::OK();
} else if (scenario_name == "ordered") {
*out = std::make_shared<OrderedScenario>();
return Status::OK();
} else if (scenario_name == "flight_sql") {
*out = std::make_shared<FlightSqlScenario>();
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/perf_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class FlightPerfServer : public FlightServerBase {
perf_request.stream_count() * perf_request.records_per_stream();

*info = std::make_unique<FlightInfo>(
MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1));
MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1, false));
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info) {

info->total_records = pb_info.total_records();
info->total_bytes = pb_info.total_bytes();
info->ordered = pb_info.ordered();
return Status::OK();
}

Expand Down Expand Up @@ -236,6 +237,7 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {

pb_info->set_total_records(info.total_records());
pb_info->set_total_bytes(info.total_bytes());
pb_info->set_ordered(info.ordered());
return Status::OK();
}

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/flight/sql/example/acero_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ class AceroFlightSqlServer : public FlightSqlServerBase {
ARROW_ASSIGN_OR_RAISE(auto ticket, CreateStatementQueryTicket(plan));
std::vector<FlightEndpoint> endpoints{
FlightEndpoint{Ticket{std::move(ticket)}, /*locations=*/{}}};
ARROW_ASSIGN_OR_RAISE(auto info,
FlightInfo::Make(schema, descriptor, std::move(endpoints),
/*total_records=*/-1, /*total_bytes=*/-1));
ARROW_ASSIGN_OR_RAISE(
auto info,
FlightInfo::Make(schema, descriptor, std::move(endpoints),
/*total_records=*/-1, /*total_bytes=*/-1, /*ordered=*/false));
return std::make_unique<FlightInfo>(std::move(info));
}

Expand Down
11 changes: 7 additions & 4 deletions cpp/src/arrow/flight/sql/example/sqlite_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoForCommand(
const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down Expand Up @@ -304,8 +304,11 @@ class SQLiteFlightSqlServer::Impl {
ARROW_ASSIGN_OR_RAISE(auto ticket,
EncodeTransactionQuery(query, command.transaction_id));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{std::move(ticket), {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
// TODO: Set true only when "ORDER BY" is used in a main "SELECT"
// in the given query.
const bool ordered = false;
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, ordered));

return std::make_unique<FlightInfo>(result);
}
Expand Down Expand Up @@ -392,7 +395,7 @@ class SQLiteFlightSqlServer::Impl {
auto result,
FlightInfo::Make(include_schema ? *SqlSchema::GetTablesSchemaWithIncludedSchema()
: *SqlSchema::GetTablesSchema(),
descriptor, endpoints, -1, -1))
descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(std::move(result));
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,9 @@ arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlServerBase::GetFlightInfoSql
}

std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(),
descriptor, endpoints, -1, -1))
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(), descriptor, endpoints,
-1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,9 @@ std::unique_ptr<FlightServerBase> ExampleTestServer() {

FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes) {
int64_t total_records, int64_t total_bytes, bool ordered) {
EXPECT_OK_AND_ASSIGN(auto info, FlightInfo::Make(schema, descriptor, endpoints,
total_records, total_bytes));
total_records, total_bytes, ordered));
return info;
}

Expand Down Expand Up @@ -619,10 +619,10 @@ std::vector<FlightInfo> ExampleFlightInfo() {
auto schema4 = ExampleFloatSchema();

return {
MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000),
MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000),
MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1),
MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000),
MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, false),
MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, false),
MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1, false),
MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000, false),
};
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ std::vector<ActionType> ExampleActionTypes();
ARROW_FLIGHT_EXPORT
FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes);
int64_t total_records, int64_t total_bytes, bool ordered);

// ----------------------------------------------------------------------
// A pair of authentication handlers that check for a predefined password
Expand Down

0 comments on commit 39c4c8c

Please sign in to comment.