Skip to content

Commit

Permalink
GH-31521: [C++][Flight] Migrate Flight SQL client to Result (#36559)
Browse files Browse the repository at this point in the history
### Rationale for this change

Arrow introduced a `Result` return object that can be used in place of passing output parameters. Most of Flight has already been migrated. 

### Are these changes tested?

C++ tests pass, which involve the sqlite and acero example servers. Any other testing we would like to see?

```
100% tests passed, 0 tests failed out of 84

Label Time Summary:
arrow-tests         = 168.18 sec*proc (35 tests)
arrow_acero         = 106.32 sec*proc (12 tests)
arrow_compute       =  76.46 sec*proc (13 tests)
arrow_dataset       =  78.09 sec*proc (12 tests)
arrow_flight        = 123.81 sec*proc (2 tests)
arrow_flight_sql    =  19.29 sec*proc (1 test)
filesystem          =  10.39 sec*proc (2 tests)
parquet-tests       =  55.24 sec*proc (9 tests)
unittest            = 627.40 sec*proc (84 tests)

Total Test time (real) = 120.83 sec
```

### Are there any user-facing changes?

Yes. The protected methods DoPut, DoGet, and DoAction for the SQL client have been updated. DoGet already exposed a public API and that has been updated to be virtual.
* Closes: #31521

Authored-by: Dane Pitkin <dane@voltrondata.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
danepitkin committed Jul 8, 2023
1 parent 41e0879 commit 33ddff3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 60 deletions.
67 changes: 25 additions & 42 deletions cpp/src/arrow/flight/sql/client.cc
Expand Up @@ -213,22 +213,20 @@ arrow::Result<int64_t> FlightSqlClient::ExecuteUpdate(const FlightCallOptions& o
ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
GetFlightDescriptorForCommand(command));

std::unique_ptr<FlightStreamWriter> writer;
std::unique_ptr<FlightMetadataReader> reader;

ARROW_RETURN_NOT_OK(DoPut(options, descriptor, arrow::schema({}), &writer, &reader));
ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor, arrow::schema({})))
std::shared_ptr<Buffer> metadata;
ARROW_RETURN_NOT_OK(reader->ReadMetadata(&metadata));
ARROW_RETURN_NOT_OK(writer->Close());
ARROW_RETURN_NOT_OK(result.reader->ReadMetadata(&metadata));
ARROW_RETURN_NOT_OK(result.writer->Close());

if (!metadata) return Status::IOError("Server did not send a response");

flight_sql_pb::DoPutUpdateResult result;
if (!result.ParseFromArray(metadata->data(), static_cast<int>(metadata->size()))) {
flight_sql_pb::DoPutUpdateResult update_result;
if (!update_result.ParseFromArray(metadata->data(),
static_cast<int>(metadata->size()))) {
return Status::Invalid("Unable to parse DoPutUpdateResult");
}

return result.record_count();
return update_result.record_count();
}

arrow::Result<int64_t> FlightSqlClient::ExecuteSubstraitUpdate(
Expand All @@ -243,21 +241,19 @@ arrow::Result<int64_t> FlightSqlClient::ExecuteSubstraitUpdate(
ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
GetFlightDescriptorForCommand(command));

std::unique_ptr<FlightStreamWriter> writer;
std::unique_ptr<FlightMetadataReader> reader;

ARROW_RETURN_NOT_OK(DoPut(options, descriptor, arrow::schema({}), &writer, &reader));
ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor, arrow::schema({})));

std::shared_ptr<Buffer> metadata;
ARROW_RETURN_NOT_OK(reader->ReadMetadata(&metadata));
ARROW_RETURN_NOT_OK(writer->Close());
ARROW_RETURN_NOT_OK(result.reader->ReadMetadata(&metadata));
ARROW_RETURN_NOT_OK(result.writer->Close());

flight_sql_pb::DoPutUpdateResult result;
if (!result.ParseFromArray(metadata->data(), static_cast<int>(metadata->size()))) {
flight_sql_pb::DoPutUpdateResult update_result;
if (!update_result.ParseFromArray(metadata->data(),
static_cast<int>(metadata->size()))) {
return Status::Invalid("Unable to parse DoPutUpdateResult");
}

return result.record_count();
return update_result.record_count();
}

arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetCatalogs(
Expand Down Expand Up @@ -478,10 +474,7 @@ arrow::Result<std::unique_ptr<SchemaResult>> FlightSqlClient::GetSqlInfoSchema(

arrow::Result<std::unique_ptr<FlightStreamReader>> FlightSqlClient::DoGet(
const FlightCallOptions& options, const Ticket& ticket) {
std::unique_ptr<FlightStreamReader> stream;
ARROW_RETURN_NOT_OK(DoGet(options, ticket, &stream));

return std::move(stream);
return impl_->DoGet(options, ticket);
}

arrow::Result<std::shared_ptr<PreparedStatement>> FlightSqlClient::Prepare(
Expand All @@ -493,9 +486,8 @@ arrow::Result<std::shared_ptr<PreparedStatement>> FlightSqlClient::Prepare(
request.set_transaction_id(transaction.transaction_id());
}

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CreatePreparedStatement", request));
ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action))

return PreparedStatement::ParseResponse(this, std::move(results));
}
Expand All @@ -509,9 +501,8 @@ arrow::Result<std::shared_ptr<PreparedStatement>> FlightSqlClient::PrepareSubstr
request.set_transaction_id(transaction.transaction_id());
}

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CreatePreparedSubstraitPlan", request));
ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action))

return PreparedStatement::ParseResponse(this, std::move(results));
}
Expand Down Expand Up @@ -657,9 +648,8 @@ Status PreparedStatement::Close(const FlightCallOptions& options) {
flight_sql_pb::ActionClosePreparedStatementRequest request;
request.set_prepared_statement_handle(handle_);

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("ClosePreparedStatement", request));
ARROW_RETURN_NOT_OK(client_->DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, client_->DoAction(options, action));
ARROW_RETURN_NOT_OK(results->Drain());

is_closed_ = true;
Expand All @@ -670,9 +660,8 @@ ::arrow::Result<Transaction> FlightSqlClient::BeginTransaction(
const FlightCallOptions& options) {
flight_sql_pb::ActionBeginTransactionRequest request;

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("BeginTransaction", request));
ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));

flight_sql_pb::ActionBeginTransactionResult transaction;
ARROW_RETURN_NOT_OK(ReadResult(results.get(), &transaction));
Expand All @@ -695,9 +684,8 @@ ::arrow::Result<Savepoint> FlightSqlClient::BeginSavepoint(
request.set_transaction_id(transaction.transaction_id());
request.set_name(name);

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("BeginSavepoint", request));
ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));

flight_sql_pb::ActionBeginSavepointResult savepoint;
ARROW_RETURN_NOT_OK(ReadResult(results.get(), &savepoint));
Expand All @@ -719,9 +707,8 @@ Status FlightSqlClient::Commit(const FlightCallOptions& options,
request.set_transaction_id(transaction.transaction_id());
request.set_action(flight_sql_pb::ActionEndTransactionRequest::END_TRANSACTION_COMMIT);

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndTransaction", request));
ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));

ARROW_RETURN_NOT_OK(results->Drain());
return Status::OK();
Expand All @@ -737,9 +724,8 @@ Status FlightSqlClient::Release(const FlightCallOptions& options,
request.set_savepoint_id(savepoint.savepoint_id());
request.set_action(flight_sql_pb::ActionEndSavepointRequest::END_SAVEPOINT_RELEASE);

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndSavepoint", request));
ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));

ARROW_RETURN_NOT_OK(results->Drain());
return Status::OK();
Expand All @@ -756,9 +742,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions& options,
request.set_action(
flight_sql_pb::ActionEndTransactionRequest::END_TRANSACTION_ROLLBACK);

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndTransaction", request));
ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));

ARROW_RETURN_NOT_OK(results->Drain());
return Status::OK();
Expand All @@ -774,9 +759,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions& options,
request.set_savepoint_id(savepoint.savepoint_id());
request.set_action(flight_sql_pb::ActionEndSavepointRequest::END_SAVEPOINT_ROLLBACK);

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndSavepoint", request));
ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));

ARROW_RETURN_NOT_OK(results->Drain());
return Status::OK();
Expand All @@ -788,9 +772,8 @@ ::arrow::Result<CancelResult> FlightSqlClient::CancelQuery(
ARROW_ASSIGN_OR_RAISE(auto serialized_info, info.SerializeToString());
request.set_info(std::move(serialized_info));

std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CancelQuery", request));
ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action))

flight_sql_pb::ActionCancelQueryResult result;
ARROW_RETURN_NOT_OK(ReadResult(results.get(), &result));
Expand Down
26 changes: 8 additions & 18 deletions cpp/src/arrow/flight/sql/client.h
Expand Up @@ -131,7 +131,7 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
/// \param[in] options Per-RPC options
/// \param[in] ticket The flight ticket to use
/// \return The returned RecordBatchReader
arrow::Result<std::unique_ptr<FlightStreamReader>> DoGet(
virtual arrow::Result<std::unique_ptr<FlightStreamReader>> DoGet(
const FlightCallOptions& options, const Ticket& ticket);

/// \brief Request a list of tables.
Expand Down Expand Up @@ -364,25 +364,15 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
Status Close();

protected:
virtual Status DoPut(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
const std::shared_ptr<Schema>& schema,
std::unique_ptr<FlightStreamWriter>* writer,
std::unique_ptr<FlightMetadataReader>* reader) {
ARROW_ASSIGN_OR_RAISE(auto result, impl_->DoPut(options, descriptor, schema));
*writer = std::move(result.writer);
*reader = std::move(result.reader);
return Status::OK();
virtual ::arrow::Result<FlightClient::DoPutResult> DoPut(
const FlightCallOptions& options, const FlightDescriptor& descriptor,
const std::shared_ptr<Schema>& schema) {
return impl_->DoPut(options, descriptor, schema);
}

virtual Status DoGet(const FlightCallOptions& options, const Ticket& ticket,
std::unique_ptr<FlightStreamReader>* stream) {
return impl_->DoGet(options, ticket).Value(stream);
}

virtual Status DoAction(const FlightCallOptions& options, const Action& action,
std::unique_ptr<ResultStream>* results) {
return impl_->DoAction(options, action).Value(results);
virtual ::arrow::Result<std::unique_ptr<ResultStream>> DoAction(
const FlightCallOptions& options, const Action& action) {
return impl_->DoAction(options, action);
}
};

Expand Down

0 comments on commit 33ddff3

Please sign in to comment.