diff --git a/google/cloud/pubsub/schema_admin_client.cc b/google/cloud/pubsub/schema_admin_client.cc index 58363b336d67e..60b032c94dda8 100644 --- a/google/cloud/pubsub/schema_admin_client.cc +++ b/google/cloud/pubsub/schema_admin_client.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/pubsub/schema_admin_client.h" +#include "google/cloud/pubsub/internal/defaults.h" namespace google { namespace cloud { @@ -20,37 +21,42 @@ namespace pubsub { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN SchemaAdminClient::SchemaAdminClient( - std::shared_ptr connection) - : connection_(std::move(connection)) {} + std::shared_ptr connection, Options opts) + : connection_(std::move(connection)), + options_(internal::MergeOptions( + std::move(opts), + pubsub_internal::DefaultCommonOptions(connection_->options()))) {} StatusOr SchemaAdminClient::CreateAvroSchema( - Schema const& schema, std::string schema_definition) { + Schema const& schema, std::string schema_definition, Options opts) { google::pubsub::v1::CreateSchemaRequest request; request.set_parent("projects/" + schema.project_id()); request.set_schema_id(schema.schema_id()); request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO); request.mutable_schema()->set_definition(std::move(schema_definition)); - return CreateSchema(request); + return CreateSchema(request, std::move(opts)); } StatusOr SchemaAdminClient::CreateProtobufSchema( - Schema const& schema, std::string schema_definition) { + Schema const& schema, std::string schema_definition, Options opts) { google::pubsub::v1::CreateSchemaRequest request; request.set_parent("projects/" + schema.project_id()); request.set_schema_id(schema.schema_id()); request.mutable_schema()->set_type( google::pubsub::v1::Schema::PROTOCOL_BUFFER); request.mutable_schema()->set_definition(std::move(schema_definition)); - return CreateSchema(request); + return CreateSchema(request, std::move(opts)); } StatusOr SchemaAdminClient::CreateSchema( - google::pubsub::v1::CreateSchemaRequest const& request) { + google::pubsub::v1::CreateSchemaRequest const& request, Options opts) { + internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_)); return connection_->CreateSchema(request); } StatusOr SchemaAdminClient::GetSchema( - Schema const& schema, google::pubsub::v1::SchemaView view) { + Schema const& schema, google::pubsub::v1::SchemaView view, Options opts) { + internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_)); google::pubsub::v1::GetSchemaRequest request; request.set_name(schema.FullName()); request.set_view(view); @@ -58,14 +64,17 @@ StatusOr SchemaAdminClient::GetSchema( } ListSchemasRange SchemaAdminClient::ListSchemas( - std::string const& project_id, google::pubsub::v1::SchemaView view) { + std::string const& project_id, google::pubsub::v1::SchemaView view, + Options opts) { + internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_)); google::pubsub::v1::ListSchemasRequest request; request.set_parent("projects/" + project_id); request.set_view(view); return connection_->ListSchemas(request); } -Status SchemaAdminClient::DeleteSchema(Schema const& schema) { +Status SchemaAdminClient::DeleteSchema(Schema const& schema, Options opts) { + internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_)); google::pubsub::v1::DeleteSchemaRequest request; request.set_name(schema.FullName()); return connection_->DeleteSchema(request); @@ -73,25 +82,29 @@ Status SchemaAdminClient::DeleteSchema(Schema const& schema) { StatusOr SchemaAdminClient::ValidateAvroSchema(std::string const& project_id, - std::string schema_definition) { + std::string schema_definition, + Options opts) { google::pubsub::v1::Schema schema; schema.set_definition(std::move(schema_definition)); schema.set_type(google::pubsub::v1::Schema::AVRO); - return ValidateSchema(project_id, std::move(schema)); + return ValidateSchema(project_id, std::move(schema), std::move(opts)); } StatusOr SchemaAdminClient::ValidateProtobufSchema(std::string const& project_id, - std::string schema_definition) { + std::string schema_definition, + Options opts) { google::pubsub::v1::Schema schema; schema.set_definition(std::move(schema_definition)); schema.set_type(google::pubsub::v1::Schema::PROTOCOL_BUFFER); - return ValidateSchema(project_id, std::move(schema)); + return ValidateSchema(project_id, std::move(schema), std::move(opts)); } StatusOr SchemaAdminClient::ValidateSchema(std::string const& project_id, - google::pubsub::v1::Schema schema) { + google::pubsub::v1::Schema schema, + Options opts) { + internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_)); google::pubsub::v1::ValidateSchemaRequest request; request.set_parent("projects/" + project_id); *request.mutable_schema() = std::move(schema); @@ -101,32 +114,32 @@ SchemaAdminClient::ValidateSchema(std::string const& project_id, StatusOr SchemaAdminClient::ValidateMessageWithNamedSchema( google::pubsub::v1::Encoding encoding, std::string message, - Schema const& named_schema) { + Schema const& named_schema, Options opts) { google::pubsub::v1::ValidateMessageRequest request; request.set_parent("projects/" + named_schema.project_id()); request.set_message(std::move(message)); request.set_encoding(encoding); request.set_name(named_schema.FullName()); - return ValidateMessage(request); + return ValidateMessage(request, std::move(opts)); } StatusOr SchemaAdminClient::ValidateMessageWithAvro( google::pubsub::v1::Encoding encoding, std::string message, - std::string project_id, std::string schema_definition) { + std::string project_id, std::string schema_definition, Options opts) { google::pubsub::v1::ValidateMessageRequest request; request.set_parent("projects/" + std::move(project_id)); request.set_message(std::move(message)); request.set_encoding(encoding); request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO); request.mutable_schema()->set_definition(std::move(schema_definition)); - return ValidateMessage(request); + return ValidateMessage(request, std::move(opts)); } StatusOr SchemaAdminClient::ValidateMessageWithProtobuf( google::pubsub::v1::Encoding encoding, std::string message, - std::string project_id, std::string schema_definition) { + std::string project_id, std::string schema_definition, Options opts) { google::pubsub::v1::ValidateMessageRequest request; request.set_parent("projects/" + std::move(project_id)); request.set_message(std::move(message)); @@ -134,12 +147,13 @@ SchemaAdminClient::ValidateMessageWithProtobuf( request.mutable_schema()->set_type( google::pubsub::v1::Schema::PROTOCOL_BUFFER); request.mutable_schema()->set_definition(std::move(schema_definition)); - return ValidateMessage(request); + return ValidateMessage(request, std::move(opts)); } StatusOr SchemaAdminClient::ValidateMessage( - google::pubsub::v1::ValidateMessageRequest const& request) { + google::pubsub::v1::ValidateMessageRequest const& request, Options opts) { + internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_)); return connection_->ValidateMessage(request); } diff --git a/google/cloud/pubsub/schema_admin_client.h b/google/cloud/pubsub/schema_admin_client.h index 4dd76b9667d4a..e0e47fa050359 100644 --- a/google/cloud/pubsub/schema_admin_client.h +++ b/google/cloud/pubsub/schema_admin_client.h @@ -61,7 +61,8 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN */ class SchemaAdminClient { public: - explicit SchemaAdminClient(std::shared_ptr connection); + explicit SchemaAdminClient(std::shared_ptr connection, + Options opts = {}); /** * The default constructor is deleted. @@ -71,22 +72,22 @@ class SchemaAdminClient { SchemaAdminClient() = delete; /** - * @copydoc CreateSchema(google::pubsub::v1::CreateSchemaRequest const&) + * @copydoc CreateSchema(google::pubsub::v1::CreateSchemaRequest const&,Options) * * @par Example * @snippet samples.cc create-avro-schema */ StatusOr CreateAvroSchema( - Schema const& schema, std::string schema_definition); + Schema const& schema, std::string schema_definition, Options opts = {}); /** - * @copydoc CreateSchema(google::pubsub::v1::CreateSchemaRequest const&) + * @copydoc CreateSchema(google::pubsub::v1::CreateSchemaRequest const&,Options) * * @par Example * @snippet samples.cc create-protobuf-schema */ StatusOr CreateProtobufSchema( - Schema const& schema, std::string schema_definition); + Schema const& schema, std::string schema_definition, Options opts = {}); /** * Creates a new Cloud Pub/Sub schema. @@ -97,7 +98,8 @@ class SchemaAdminClient { * as a consequence of retrying a successful (but reported as failed) request. */ StatusOr CreateSchema( - google::pubsub::v1::CreateSchemaRequest const& request); + google::pubsub::v1::CreateSchemaRequest const& request, + Options opts = {}); /** * Gets information about an existing Cloud Pub/Sub schema. @@ -111,10 +113,13 @@ class SchemaAdminClient { * @param schema the full name of the schema * @param view Use `BASIC` to include the name and type of the schema, but not * the definition. Use `FULL` to include the definition. + * @param opts Override the class-level options, such as retry and backoff + * policies. */ StatusOr GetSchema( Schema const& schema, - google::pubsub::v1::SchemaView view = google::pubsub::v1::BASIC); + google::pubsub::v1::SchemaView view = google::pubsub::v1::BASIC, + Options opts = {}); /** * Lists all the schemas for a given project id. @@ -128,10 +133,13 @@ class SchemaAdminClient { * @param project_id lists the schemas in this project * @param view Use `BASIC` to include the name and type of each schema, but * not the definition. Use `FULL` to include the definition. + * @param opts Override the class-level options, such as retry and backoff + * policies. */ ListSchemasRange ListSchemas( std::string const& project_id, - google::pubsub::v1::SchemaView view = google::pubsub::v1::BASIC); + google::pubsub::v1::SchemaView view = google::pubsub::v1::BASIC, + Options opts = {}); /** * Deletes an existing schema in Cloud Pub/Sub. @@ -146,8 +154,10 @@ class SchemaAdminClient { * @snippet samples.cc delete-schema * * @param schema the name of the schema to be deleted. + * @param opts Override the class-level options, such as retry and backoff + * policies. */ - Status DeleteSchema(Schema const& schema); + Status DeleteSchema(Schema const& schema, Options opts = {}); /** * Validates a schema definition. @@ -159,7 +169,8 @@ class SchemaAdminClient { * @snippet samples.cc validate-avro-schema */ StatusOr ValidateAvroSchema( - std::string const& project_id, std::string schema_definition); + std::string const& project_id, std::string schema_definition, + Options opts = {}); /** * Validates a schema definition. @@ -171,7 +182,8 @@ class SchemaAdminClient { * @snippet samples.cc validate-protobuf-schema */ StatusOr ValidateProtobufSchema( - std::string const& project_id, std::string schema_definition); + std::string const& project_id, std::string schema_definition, + Options opts = {}); /** * Validates a schema definition. @@ -180,10 +192,11 @@ class SchemaAdminClient { * This is a read-only operation and therefore always idempotent and retried. */ StatusOr ValidateSchema( - std::string const& project_id, google::pubsub::v1::Schema schema); + std::string const& project_id, google::pubsub::v1::Schema schema, + Options opts = {}); /** - * @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&) + * @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&,Options) * * @par Example * @snippet samples.cc validate-message-named-schema @@ -192,14 +205,16 @@ class SchemaAdminClient { * support some encodings. * @param message the message to validate * @param named_schema the name of an existing schema to validate against + * @param opts Override the class-level options, such as retry and backoff + * policies. */ StatusOr ValidateMessageWithNamedSchema(google::pubsub::v1::Encoding encoding, std::string message, - Schema const& named_schema); + Schema const& named_schema, Options opts = {}); /** - * @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&) + * @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&,Options) * * @par Example * @snippet samples.cc validate-message-avro @@ -209,13 +224,15 @@ class SchemaAdminClient { * @param message the message to validate * @param project_id the project used to perform the validation * @param schema_definition the schema definition, in AVRO format + * @param opts Override the class-level options, such as retry and backoff + * policies. */ StatusOr ValidateMessageWithAvro( google::pubsub::v1::Encoding encoding, std::string message, - std::string project_id, std::string schema_definition); + std::string project_id, std::string schema_definition, Options opts = {}); /** - * @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&) + * @copydoc ValidateMessage(google::pubsub::v1::ValidateMessageRequest const&,Options) * * @par Example * @snippet samples.cc validate-message-protobuf @@ -225,11 +242,13 @@ class SchemaAdminClient { * @param message the message to validate * @param project_id the project used to perform the validation * @param schema_definition the schema definition, in protocol buffers format + * @param opts Override the class-level options, such as retry and backoff + * policies. */ StatusOr ValidateMessageWithProtobuf(google::pubsub::v1::Encoding encoding, std::string message, std::string project_id, - std::string schema_definition); + std::string schema_definition, Options opts = {}); /** * Validates a message against a schema. @@ -238,10 +257,12 @@ class SchemaAdminClient { * This is a read-only operation and therefore always idempotent and retried. */ StatusOr ValidateMessage( - google::pubsub::v1::ValidateMessageRequest const& request); + google::pubsub::v1::ValidateMessageRequest const& request, + Options opts = {}); private: std::shared_ptr connection_; + Options options_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/schema_admin_client_test.cc b/google/cloud/pubsub/schema_admin_client_test.cc index 9dfda818dd265..446fc658313fb 100644 --- a/google/cloud/pubsub/schema_admin_client_test.cc +++ b/google/cloud/pubsub/schema_admin_client_test.cc @@ -30,6 +30,14 @@ using ::google::cloud::testing_util::IsProtoEqual; using ::google::protobuf::TextFormat; using ::testing::ElementsAre; +std::string CurrentOptionsProbe() { + return internal::CurrentOptions().get(); +} + +Options TestOptions() { + return Options{}.set("test-options"); +} + TEST(SchemaAdminClient, CreateAvroProtobuf) { auto mock = std::make_shared(); Schema const schema("test-project", "test-schema"); @@ -44,6 +52,7 @@ TEST(SchemaAdminClient, CreateAvroProtobuf) { EXPECT_CALL(*mock, CreateSchema) .WillOnce([&](google::pubsub::v1::CreateSchemaRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(expected)); google::pubsub::v1::Schema response = r.schema(); response.set_name(schema.FullName()); @@ -51,7 +60,7 @@ TEST(SchemaAdminClient, CreateAvroProtobuf) { }); SchemaAdminClient client(mock); auto const response = - client.CreateAvroSchema(schema, "test-only-invalid-avro"); + client.CreateAvroSchema(schema, "test-only-invalid-avro", TestOptions()); EXPECT_THAT(response, IsOk()); EXPECT_EQ(schema.FullName(), response->name()); } @@ -70,14 +79,15 @@ TEST(SchemaAdminClient, CreateSchemaProtobuf) { EXPECT_CALL(*mock, CreateSchema) .WillOnce([&](google::pubsub::v1::CreateSchemaRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(expected)); google::pubsub::v1::Schema response = r.schema(); response.set_name(schema.FullName()); return make_status_or(response); }); SchemaAdminClient client(mock); - auto const response = - client.CreateProtobufSchema(schema, "test-only-invalid-protobuf"); + auto const response = client.CreateProtobufSchema( + schema, "test-only-invalid-protobuf", TestOptions()); EXPECT_THAT(response, IsOk()); EXPECT_EQ(schema.FullName(), response->name()); } @@ -95,13 +105,15 @@ TEST(SchemaAdminClient, GetSchemaDefault) { EXPECT_CALL(*mock, GetSchema) .WillOnce([&](google::pubsub::v1::GetSchemaRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(request)); google::pubsub::v1::Schema response; response.set_name(schema.FullName()); return make_status_or(response); }); SchemaAdminClient client(mock); - auto const response = client.GetSchema(schema); + auto const response = + client.GetSchema(schema, google::pubsub::v1::BASIC, TestOptions()); EXPECT_THAT(response, IsOk()); EXPECT_EQ(schema.FullName(), response->name()); } @@ -119,13 +131,15 @@ TEST(SchemaAdminClient, GetSchemaFull) { EXPECT_CALL(*mock, GetSchema) .WillOnce([&](google::pubsub::v1::GetSchemaRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(request)); google::pubsub::v1::Schema response; response.set_name(schema.FullName()); return make_status_or(response); }); SchemaAdminClient client(mock); - auto const response = client.GetSchema(schema, google::pubsub::v1::FULL); + auto const response = + client.GetSchema(schema, google::pubsub::v1::FULL, TestOptions()); EXPECT_THAT(response, IsOk()); EXPECT_EQ(schema.FullName(), response->name()); } @@ -144,6 +158,7 @@ TEST(SchemaAdminClient, ListSchemasDefault) { EXPECT_CALL(*mock, ListSchemas) .WillOnce([&](google::pubsub::v1::ListSchemasRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(request)); return internal::MakePaginationRange( google::pubsub::v1::ListSchemasRequest{}, @@ -161,7 +176,8 @@ TEST(SchemaAdminClient, ListSchemasDefault) { }); SchemaAdminClient client(mock); std::vector names; - for (auto const& t : client.ListSchemas("test-project")) { + for (auto const& t : client.ListSchemas( + "test-project", google::pubsub::v1::BASIC, TestOptions())) { ASSERT_THAT(t, IsOk()); names.push_back(t->name()); } @@ -181,6 +197,7 @@ TEST(SchemaAdminClient, ListSchemasFull) { EXPECT_CALL(*mock, ListSchemas) .WillOnce([&](google::pubsub::v1::ListSchemasRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(request)); return internal::MakePaginationRange( google::pubsub::v1::ListSchemasRequest{}, @@ -198,8 +215,8 @@ TEST(SchemaAdminClient, ListSchemasFull) { }); SchemaAdminClient client(mock); std::vector names; - for (auto const& t : - client.ListSchemas("test-project", google::pubsub::v1::FULL)) { + for (auto const& t : client.ListSchemas( + "test-project", google::pubsub::v1::FULL, TestOptions())) { ASSERT_THAT(t, IsOk()); names.push_back(t->name()); } @@ -222,7 +239,7 @@ TEST(SchemaAdminClient, DeleteSchema) { return Status{}; }); SchemaAdminClient client(mock); - auto const response = client.DeleteSchema(schema); + auto const response = client.DeleteSchema(schema, TestOptions()); EXPECT_THAT(response, IsOk()); } @@ -238,13 +255,14 @@ TEST(SchemaAdminClient, ValidateSchemaAvro) { EXPECT_CALL(*mock, ValidateSchema) .WillOnce([&](google::pubsub::v1::ValidateSchemaRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(expected)); google::pubsub::v1::ValidateSchemaResponse response; return make_status_or(response); }); SchemaAdminClient client(mock); - auto const response = - client.ValidateAvroSchema("test-project", "test-only-invalid-avro"); + auto const response = client.ValidateAvroSchema( + "test-project", "test-only-invalid-avro", TestOptions()); EXPECT_THAT(response, IsOk()); } @@ -260,13 +278,14 @@ TEST(SchemaAdminClient, ValidateSchemaProtobuf) { EXPECT_CALL(*mock, ValidateSchema) .WillOnce([&](google::pubsub::v1::ValidateSchemaRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(expected)); google::pubsub::v1::ValidateSchemaResponse response; return make_status_or(response); }); SchemaAdminClient client(mock); auto const response = client.ValidateProtobufSchema( - "test-project", "test-only-invalid-protobuf"); + "test-project", "test-only-invalid-protobuf", TestOptions()); EXPECT_THAT(response, IsOk()); } @@ -285,6 +304,7 @@ TEST(SchemaAdminClient, ValidateMessageWithNamedSchema) { EXPECT_CALL(*mock, ValidateMessage) .WillOnce([&](google::pubsub::v1::ValidateMessageRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(expected)); google::pubsub::v1::ValidateMessageResponse response; return make_status_or(response); @@ -292,7 +312,7 @@ TEST(SchemaAdminClient, ValidateMessageWithNamedSchema) { SchemaAdminClient client(mock); auto const response = client.ValidateMessageWithNamedSchema( google::pubsub::v1::BINARY, "test-only-invalid-message", - Schema("test-project", "test-schema")); + Schema("test-project", "test-schema"), TestOptions()); EXPECT_THAT(response, IsOk()); } @@ -310,6 +330,7 @@ TEST(SchemaAdminClient, ValidateMessageWithAvro) { EXPECT_CALL(*mock, ValidateMessage) .WillOnce([&](google::pubsub::v1::ValidateMessageRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(expected)); google::pubsub::v1::ValidateMessageResponse response; return make_status_or(response); @@ -317,7 +338,7 @@ TEST(SchemaAdminClient, ValidateMessageWithAvro) { SchemaAdminClient client(mock); auto const response = client.ValidateMessageWithAvro( google::pubsub::v1::BINARY, "test-only-invalid-message", "test-project", - "test-only-invalid-schema"); + "test-only-invalid-schema", TestOptions()); EXPECT_THAT(response, IsOk()); } @@ -335,6 +356,7 @@ TEST(SchemaAdminClient, ValidateMessageWithProtobuf) { EXPECT_CALL(*mock, ValidateMessage) .WillOnce([&](google::pubsub::v1::ValidateMessageRequest const& r) { + EXPECT_EQ(CurrentOptionsProbe(), "test-options"); EXPECT_THAT(r, IsProtoEqual(expected)); google::pubsub::v1::ValidateMessageResponse response; return make_status_or(response); @@ -342,7 +364,7 @@ TEST(SchemaAdminClient, ValidateMessageWithProtobuf) { SchemaAdminClient client(mock); auto const response = client.ValidateMessageWithProtobuf( google::pubsub::v1::BINARY, "test-only-invalid-message", "test-project", - "test-only-invalid-schema"); + "test-only-invalid-schema", TestOptions()); EXPECT_THAT(response, IsOk()); } diff --git a/google/cloud/pubsub/schema_admin_connection.cc b/google/cloud/pubsub/schema_admin_connection.cc index 52401804c3443..2105262a3b3d2 100644 --- a/google/cloud/pubsub/schema_admin_connection.cc +++ b/google/cloud/pubsub/schema_admin_connection.cc @@ -37,21 +37,17 @@ class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection { public: explicit SchemaAdminConnectionImpl( std::unique_ptr background, - std::shared_ptr stub, - std::unique_ptr retry_policy, - std::unique_ptr backoff_policy) + std::shared_ptr stub, Options options) : background_(std::move(background)), stub_(std::move(stub)), - retry_policy_(std::move(retry_policy)), - backoff_policy_(std::move(backoff_policy)) {} + options_(std::move(options)) {} ~SchemaAdminConnectionImpl() override = default; StatusOr CreateSchema( google::pubsub::v1::CreateSchemaRequest const& request) override { return RetryLoop( - retry_policy_->clone(), backoff_policy_->clone(), - Idempotency::kIdempotent, + retry_policy(), backoff_policy(), Idempotency::kIdempotent, [this](grpc::ClientContext& context, google::pubsub::v1::CreateSchemaRequest const& request) { return stub_->CreateSchema(context, request); @@ -61,8 +57,7 @@ class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection { StatusOr GetSchema( google::pubsub::v1::GetSchemaRequest const& request) override { return RetryLoop( - retry_policy_->clone(), backoff_policy_->clone(), - Idempotency::kIdempotent, + retry_policy(), backoff_policy(), Idempotency::kIdempotent, [this](grpc::ClientContext& context, google::pubsub::v1::GetSchemaRequest const& request) { return stub_->GetSchema(context, request); @@ -75,10 +70,9 @@ class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection { auto& stub = stub_; // Because we do not have C++14 generalized lambda captures we cannot just // use the unique_ptr<> here, so convert to shared_ptr<> instead. - auto retry = - std::shared_ptr(retry_policy_->clone()); + auto retry = std::shared_ptr(retry_policy()); auto backoff = - std::shared_ptr(backoff_policy_->clone()); + std::shared_ptr(backoff_policy()); char const* function_name = __func__; auto list_functor = [stub, retry, backoff, @@ -106,8 +100,7 @@ class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection { Status DeleteSchema( google::pubsub::v1::DeleteSchemaRequest const& request) override { return RetryLoop( - retry_policy_->clone(), backoff_policy_->clone(), - Idempotency::kIdempotent, + retry_policy(), backoff_policy(), Idempotency::kIdempotent, [this](grpc::ClientContext& context, google::pubsub::v1::DeleteSchemaRequest const& request) { return stub_->DeleteSchema(context, request); @@ -117,8 +110,7 @@ class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection { StatusOr ValidateSchema( google::pubsub::v1::ValidateSchemaRequest const& request) override { return RetryLoop( - retry_policy_->clone(), backoff_policy_->clone(), - Idempotency::kIdempotent, + retry_policy(), backoff_policy(), Idempotency::kIdempotent, [this](grpc::ClientContext& context, google::pubsub::v1::ValidateSchemaRequest const& request) { return stub_->ValidateSchema(context, request); @@ -128,8 +120,7 @@ class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection { StatusOr ValidateMessage( google::pubsub::v1::ValidateMessageRequest const& request) override { return RetryLoop( - retry_policy_->clone(), backoff_policy_->clone(), - Idempotency::kIdempotent, + retry_policy(), backoff_policy(), Idempotency::kIdempotent, [this](grpc::ClientContext& context, google::pubsub::v1::ValidateMessageRequest const& request) { return stub_->ValidateMessage(context, request); @@ -137,11 +128,28 @@ class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection { request, __func__); } + Options options() const override { return options_; } + private: + std::unique_ptr retry_policy() { + auto const& options = internal::CurrentOptions(); + if (options.has()) { + return options.get()->clone(); + } + return options_.get()->clone(); + } + + std::unique_ptr backoff_policy() { + auto const& options = internal::CurrentOptions(); + if (options.has()) { + return options.get()->clone(); + } + return options_.get()->clone(); + } + std::unique_ptr background_; std::shared_ptr stub_; - std::unique_ptr retry_policy_; - std::unique_ptr backoff_policy_; + Options options_; }; // Decorates a SchemaAdminStub. This works for both mock and real stubs. @@ -185,19 +193,7 @@ std::shared_ptr MakeSchemaAdminConnection(Options opts) { stub = DecorateSchemaAdminStub(opts, std::move(auth), std::move(stub)); return std::make_shared( - std::move(background), std::move(stub), - opts.get()->clone(), - opts.get()->clone()); -} - -std::shared_ptr MakeSchemaAdminConnection( - pubsub::ConnectionOptions const& options, - std::unique_ptr retry_policy, - std::unique_ptr backoff_policy) { - auto opts = internal::MakeOptions(options); - if (retry_policy) opts.set(retry_policy->clone()); - if (backoff_policy) opts.set(backoff_policy->clone()); - return MakeSchemaAdminConnection(std::move(opts)); + std::move(background), std::move(stub), std::move(opts)); } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END @@ -214,9 +210,7 @@ std::shared_ptr MakeTestSchemaAdminConnection( stub = pubsub::DecorateSchemaAdminStub(opts, std::move(auth), std::move(stub)); return std::make_shared( - std::move(background), std::move(stub), - opts.get()->clone(), - opts.get()->clone()); + std::move(background), std::move(stub), opts); } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/schema_admin_connection.h b/google/cloud/pubsub/schema_admin_connection.h index 4a0b5594ee35e..afd2f7220dd98 100644 --- a/google/cloud/pubsub/schema_admin_connection.h +++ b/google/cloud/pubsub/schema_admin_connection.h @@ -89,6 +89,9 @@ class SchemaAdminConnection { /// Defines the interface for `SchemaAdminClient::ValidateMessage()` virtual StatusOr ValidateMessage( google::pubsub::v1::ValidateMessageRequest const&) = 0; + + /// Return the options used to create the connection. + virtual Options options() const { return Options{}; } }; /**