diff --git a/google/cloud/bigtable/app_profile.py b/google/cloud/bigtable/app_profile.py index 5d6dbdb81..8cde66146 100644 --- a/google/cloud/bigtable/app_profile.py +++ b/google/cloud/bigtable/app_profile.py @@ -59,6 +59,11 @@ class AppProfile(object): when routing_policy_type is ROUTING_POLICY_TYPE_SINGLE. + :type: multi_cluster_ids: list + :param: multi_cluster_ids: (Optional) The set of clusters to route to. + The order is ignored; clusters will be tried in order of distance. + If left empty, all clusters are eligible. + :type: allow_transactional_writes: bool :param: allow_transactional_writes: (Optional) If true, allow transactional writes for @@ -72,6 +77,7 @@ def __init__( routing_policy_type=None, description=None, cluster_id=None, + multi_cluster_ids=None, allow_transactional_writes=None, ): self.app_profile_id = app_profile_id @@ -79,6 +85,7 @@ def __init__( self.routing_policy_type = routing_policy_type self.description = description self.cluster_id = cluster_id + self.multi_cluster_ids = multi_cluster_ids self.allow_transactional_writes = allow_transactional_writes @property @@ -184,13 +191,17 @@ def _update_from_pb(self, app_profile_pb): self.routing_policy_type = None self.allow_transactional_writes = None self.cluster_id = None - + self.multi_cluster_ids = None self.description = app_profile_pb.description routing_policy_type = None if app_profile_pb._pb.HasField("multi_cluster_routing_use_any"): routing_policy_type = RoutingPolicyType.ANY self.allow_transactional_writes = False + if app_profile_pb.multi_cluster_routing_use_any.cluster_ids: + self.multi_cluster_ids = ( + app_profile_pb.multi_cluster_routing_use_any.cluster_ids + ) else: routing_policy_type = RoutingPolicyType.SINGLE self.cluster_id = app_profile_pb.single_cluster_routing.cluster_id @@ -215,7 +226,9 @@ def _to_pb(self): if self.routing_policy_type == RoutingPolicyType.ANY: multi_cluster_routing_use_any = ( - instance.AppProfile.MultiClusterRoutingUseAny() + instance.AppProfile.MultiClusterRoutingUseAny( + cluster_ids=self.multi_cluster_ids + ) ) else: single_cluster_routing = instance.AppProfile.SingleClusterRouting( @@ -312,6 +325,7 @@ def update(self, ignore_warnings=None): ``routing_policy_type`` ``description`` ``cluster_id`` + ``multi_cluster_ids`` ``allow_transactional_writes`` For example: diff --git a/google/cloud/bigtable/instance.py b/google/cloud/bigtable/instance.py index e838ec9ad..6d092cefd 100644 --- a/google/cloud/bigtable/instance.py +++ b/google/cloud/bigtable/instance.py @@ -711,6 +711,7 @@ def app_profile( routing_policy_type=None, description=None, cluster_id=None, + multi_cluster_ids=None, allow_transactional_writes=None, ): """Factory to create AppProfile associated with this instance. @@ -742,6 +743,11 @@ def app_profile( when routing_policy_type is ROUTING_POLICY_TYPE_SINGLE. + :type: multi_cluster_ids: list + :param: multi_cluster_ids: (Optional) The set of clusters to route to. + The order is ignored; clusters will be tried in order of distance. + If left empty, all clusters are eligible. + :type: allow_transactional_writes: bool :param: allow_transactional_writes: (Optional) If true, allow transactional writes for @@ -756,6 +762,7 @@ def app_profile( routing_policy_type=routing_policy_type, description=description, cluster_id=cluster_id, + multi_cluster_ids=multi_cluster_ids, allow_transactional_writes=allow_transactional_writes, ) diff --git a/tests/system/test_instance_admin.py b/tests/system/test_instance_admin.py index 8c09f6d87..a2ad229af 100644 --- a/tests/system/test_instance_admin.py +++ b/tests/system/test_instance_admin.py @@ -24,6 +24,7 @@ def _create_app_profile_helper( routing_policy_type, description=None, cluster_id=None, + multi_cluster_ids=None, allow_transactional_writes=None, ignore_warnings=None, ): @@ -33,6 +34,7 @@ def _create_app_profile_helper( routing_policy_type=routing_policy_type, description=description, cluster_id=cluster_id, + multi_cluster_ids=multi_cluster_ids, allow_transactional_writes=allow_transactional_writes, ) assert app_profile.allow_transactional_writes == allow_transactional_writes @@ -40,7 +42,7 @@ def _create_app_profile_helper( app_profile.create(ignore_warnings=ignore_warnings) # Load a different app_profile objec form the server and - # verrify that it is the same + # verify that it is the same alt_app_profile = instance.app_profile(app_profile_id) alt_app_profile.reload() @@ -50,6 +52,7 @@ def _create_app_profile_helper( app_profile.description == alt_app_profile.description assert not app_profile.allow_transactional_writes assert not alt_app_profile.allow_transactional_writes + assert app_profile.multi_cluster_ids == alt_app_profile.multi_cluster_ids return app_profile @@ -67,6 +70,7 @@ def _modify_app_profile_helper( routing_policy_type, description=None, cluster_id=None, + multi_cluster_ids=None, allow_transactional_writes=None, ignore_warnings=None, ): @@ -75,6 +79,7 @@ def _modify_app_profile_helper( routing_policy_type=routing_policy_type, description=description, cluster_id=cluster_id, + multi_cluster_ids=multi_cluster_ids, allow_transactional_writes=allow_transactional_writes, ) @@ -87,6 +92,7 @@ def _modify_app_profile_helper( assert alt_profile.description == description assert alt_profile.routing_policy_type == routing_policy_type assert alt_profile.cluster_id == cluster_id + assert alt_profile.multi_cluster_ids == multi_cluster_ids assert alt_profile.allow_transactional_writes == allow_transactional_writes @@ -395,6 +401,99 @@ def test_instance_create_w_two_clusters( _delete_app_profile_helper(app_profile) +def test_instance_create_app_profile_create_with_multi_cluster_ids( + admin_client, + unique_suffix, + admin_instance_populated, + admin_cluster, + location_id, + instance_labels, + instances_to_delete, + skip_on_emulator, +): + alt_instance_id = f"dif{unique_suffix}" + instance = admin_client.instance( + alt_instance_id, + instance_type=enums.Instance.Type.PRODUCTION, + labels=instance_labels, + ) + + serve_nodes = 1 + + alt_cluster_id_1 = f"{alt_instance_id}-c1" + cluster_1 = instance.cluster( + alt_cluster_id_1, + location_id=location_id, + serve_nodes=serve_nodes, + default_storage_type=enums.StorageType.HDD, + ) + + alt_cluster_id_2 = f"{alt_instance_id}-c2" + location_id_2 = "us-central1-f" + cluster_2 = instance.cluster( + alt_cluster_id_2, + location_id=location_id_2, + serve_nodes=serve_nodes, + default_storage_type=enums.StorageType.HDD, + ) + operation = instance.create(clusters=[cluster_1, cluster_2]) + instances_to_delete.append(instance) + operation.result(timeout=240) # Ensure the operation completes. + + # Create a new instance and make sure it is the same. + instance_alt = admin_client.instance(alt_instance_id) + instance_alt.reload() + + assert instance == instance_alt + assert instance.display_name == instance_alt.display_name + assert instance.type_ == instance_alt.type_ + + clusters, failed_locations = instance_alt.list_clusters() + assert failed_locations == [] + alt_cluster_1, alt_cluster_2 = sorted(clusters, key=lambda x: x.name) + + assert cluster_1.location_id == alt_cluster_1.location_id + assert cluster_2.location_id == alt_cluster_2.location_id + + # Test create app profile with multi_cluster_routing policy + app_profiles_to_delete = [] + description = "routing policy-multi" + app_profile_id_1 = "app_profile_id_1" + routing = enums.RoutingPolicyType.ANY + + multi_cluster_ids = [alt_cluster_id_1, alt_cluster_id_2] + app_profile_1 = _create_app_profile_helper( + app_profile_id_1, + instance, + routing_policy_type=routing, + description=description, + ignore_warnings=True, + multi_cluster_ids=multi_cluster_ids, + ) + assert len(app_profile_1.multi_cluster_ids) == len(multi_cluster_ids) + assert app_profile_1.multi_cluster_ids == multi_cluster_ids + + # remove a cluster from the multi_cluster_ids + app_profile_1.multi_cluster_ids.pop() + app_profile_1.update() + + assert len(app_profile_1.multi_cluster_ids) == 1 + assert app_profile_1.multi_cluster_ids == [alt_cluster_id_1] + + # add a cluster from the multi_cluster_ids + app_profile_1.multi_cluster_ids.append(alt_cluster_id_2) + app_profile_1.update() + + assert len(app_profile_1.multi_cluster_ids) == 2 + assert app_profile_1.multi_cluster_ids == [alt_cluster_id_1, alt_cluster_id_2] + + app_profiles_to_delete.append(app_profile_1) + + # # Test delete app profiles + for app_profile in app_profiles_to_delete: + _delete_app_profile_helper(app_profile) + + def test_instance_create_w_two_clusters_cmek( admin_client, unique_suffix, diff --git a/tests/unit/test_app_profile.py b/tests/unit/test_app_profile.py index d45c9ca0a..660ee7899 100644 --- a/tests/unit/test_app_profile.py +++ b/tests/unit/test_app_profile.py @@ -24,6 +24,7 @@ PROJECT, INSTANCE_ID, APP_PROFILE_ID ) CLUSTER_ID = "cluster-id" +CLUSTER_ID_2 = "cluster-id-2" OP_ID = 8765 OP_NAME = "operations/projects/{}/instances/{}/appProfiles/{}/operations/{}".format( PROJECT, INSTANCE_ID, APP_PROFILE_ID, OP_ID @@ -54,6 +55,7 @@ def test_app_profile_constructor_defaults(): assert app_profile.routing_policy_type is None assert app_profile.description is None assert app_profile.cluster_id is None + assert app_profile.multi_cluster_ids is None assert app_profile.allow_transactional_writes is None @@ -92,9 +94,32 @@ def test_app_profile_constructor_explicit(): assert app_profile2.routing_policy_type == SINGLE assert app_profile2.description == DESCRIPTION_2 assert app_profile2.cluster_id == CLUSTER_ID + assert app_profile2.multi_cluster_ids is None assert app_profile2.allow_transactional_writes == ALLOW_WRITES +def test_app_profile_constructor_multi_cluster_ids(): + from google.cloud.bigtable.enums import RoutingPolicyType + + ANY = RoutingPolicyType.ANY + DESCRIPTION_1 = "routing policy any" + client = _Client(PROJECT) + instance = _Instance(INSTANCE_ID, client) + + app_profile1 = _make_app_profile( + APP_PROFILE_ID, + instance, + routing_policy_type=ANY, + description=DESCRIPTION_1, + multi_cluster_ids=[CLUSTER_ID, CLUSTER_ID_2], + ) + assert app_profile1.app_profile_id == APP_PROFILE_ID + assert app_profile1._instance is instance + assert app_profile1.routing_policy_type == ANY + assert app_profile1.description == DESCRIPTION_1 + assert app_profile1.multi_cluster_ids == [CLUSTER_ID, CLUSTER_ID_2] + + def test_app_profile_name(): credentials = _make_credentials() client = _make_client(project=PROJECT, credentials=credentials, admin=True) @@ -147,13 +172,44 @@ def test_app_profile_from_pb_success_w_routing_any(): client = _Client(PROJECT) instance = _Instance(INSTANCE_ID, client) - desctiption = "routing any" + description = "routing any" routing = RoutingPolicyType.ANY multi_cluster_routing_use_any = data_v2_pb2.AppProfile.MultiClusterRoutingUseAny() app_profile_pb = data_v2_pb2.AppProfile( name=APP_PROFILE_NAME, - description=desctiption, + description=description, + multi_cluster_routing_use_any=multi_cluster_routing_use_any, + ) + + app_profile = AppProfile.from_pb(app_profile_pb, instance) + assert isinstance(app_profile, AppProfile) + assert app_profile._instance is instance + assert app_profile.app_profile_id == APP_PROFILE_ID + assert app_profile.description == description + assert app_profile.routing_policy_type == routing + assert app_profile.cluster_id is None + assert app_profile.multi_cluster_ids is None + assert app_profile.allow_transactional_writes is False + + +def test_app_profile_from_pb_success_w_routing_any_multi_cluster_ids(): + from google.cloud.bigtable_admin_v2.types import instance as data_v2_pb2 + from google.cloud.bigtable.app_profile import AppProfile + from google.cloud.bigtable.enums import RoutingPolicyType + + client = _Client(PROJECT) + instance = _Instance(INSTANCE_ID, client) + + description = "routing any" + routing = RoutingPolicyType.ANY + multi_cluster_routing_use_any = data_v2_pb2.AppProfile.MultiClusterRoutingUseAny( + cluster_ids=[CLUSTER_ID, CLUSTER_ID_2] + ) + + app_profile_pb = data_v2_pb2.AppProfile( + name=APP_PROFILE_NAME, + description=description, multi_cluster_routing_use_any=multi_cluster_routing_use_any, ) @@ -161,10 +217,11 @@ def test_app_profile_from_pb_success_w_routing_any(): assert isinstance(app_profile, AppProfile) assert app_profile._instance is instance assert app_profile.app_profile_id == APP_PROFILE_ID - assert app_profile.description == desctiption + assert app_profile.description == description assert app_profile.routing_policy_type == routing assert app_profile.cluster_id is None assert app_profile.allow_transactional_writes is False + assert app_profile.multi_cluster_ids == [CLUSTER_ID, CLUSTER_ID_2] def test_app_profile_from_pb_success_w_routing_single(): @@ -175,7 +232,7 @@ def test_app_profile_from_pb_success_w_routing_single(): client = _Client(PROJECT) instance = _Instance(INSTANCE_ID, client) - desctiption = "routing single" + description = "routing single" allow_transactional_writes = True routing = RoutingPolicyType.SINGLE single_cluster_routing = data_v2_pb2.AppProfile.SingleClusterRouting( @@ -185,7 +242,7 @@ def test_app_profile_from_pb_success_w_routing_single(): app_profile_pb = data_v2_pb2.AppProfile( name=APP_PROFILE_NAME, - description=desctiption, + description=description, single_cluster_routing=single_cluster_routing, ) @@ -193,9 +250,10 @@ def test_app_profile_from_pb_success_w_routing_single(): assert isinstance(app_profile, AppProfile) assert app_profile._instance is instance assert app_profile.app_profile_id == APP_PROFILE_ID - assert app_profile.description == desctiption + assert app_profile.description == description assert app_profile.routing_policy_type == routing assert app_profile.cluster_id == CLUSTER_ID + assert app_profile.multi_cluster_ids is None assert app_profile.allow_transactional_writes == allow_transactional_writes @@ -290,6 +348,7 @@ def test_app_profile_reload_w_routing_any(): assert app_profile.routing_policy_type == routing assert app_profile.description == description assert app_profile.cluster_id is None + assert app_profile.multi_cluster_ids is None assert app_profile.allow_transactional_writes is None # Perform the method and check the result. @@ -298,6 +357,7 @@ def test_app_profile_reload_w_routing_any(): assert app_profile.routing_policy_type == RoutingPolicyType.SINGLE assert app_profile.description == description_from_server assert app_profile.cluster_id == cluster_id_from_server + assert app_profile.multi_cluster_ids is None assert app_profile.allow_transactional_writes == allow_transactional_writes @@ -394,6 +454,7 @@ def test_app_profile_create_w_routing_any(): assert result.description == description assert result.allow_transactional_writes is False assert result.cluster_id is None + assert result.multi_cluster_ids is None def test_app_profile_create_w_routing_single(): @@ -454,6 +515,7 @@ def test_app_profile_create_w_routing_single(): assert result.description == description assert result.allow_transactional_writes == allow_writes assert result.cluster_id == CLUSTER_ID + assert result.multi_cluster_ids is None def test_app_profile_create_w_wrong_routing_policy(): @@ -540,6 +602,82 @@ def test_app_profile_update_w_routing_any(): ) +def test_app_profile_update_w_routing_any_multi_cluster_ids(): + from google.longrunning import operations_pb2 + from google.protobuf.any_pb2 import Any + from google.cloud.bigtable_admin_v2.types import ( + bigtable_instance_admin as messages_v2_pb2, + ) + from google.cloud.bigtable.enums import RoutingPolicyType + from google.cloud.bigtable_admin_v2.services.bigtable_instance_admin import ( + BigtableInstanceAdminClient, + ) + from google.protobuf import field_mask_pb2 + + credentials = _make_credentials() + client = _make_client(project=PROJECT, credentials=credentials, admin=True) + instance = client.instance(INSTANCE_ID) + + routing = RoutingPolicyType.SINGLE + description = "to routing policy single" + allow_writes = True + app_profile = _make_app_profile( + APP_PROFILE_ID, + instance, + routing_policy_type=routing, + description=description, + cluster_id=CLUSTER_ID, + allow_transactional_writes=allow_writes, + multi_cluster_ids=[CLUSTER_ID, CLUSTER_ID_2], + ) + + # Create response_pb + metadata = messages_v2_pb2.UpdateAppProfileMetadata() + type_url = "type.googleapis.com/{}".format( + messages_v2_pb2.UpdateAppProfileMetadata._meta._pb.DESCRIPTOR.full_name + ) + response_pb = operations_pb2.Operation( + name=OP_NAME, + metadata=Any(type_url=type_url, value=metadata._pb.SerializeToString()), + ) + + # Patch the stub used by the API method. + instance_api = mock.create_autospec(BigtableInstanceAdminClient) + # Mock api calls + instance_api.app_profile_path.return_value = ( + "projects/project/instances/instance-id/appProfiles/app-profile-id" + ) + + client._instance_admin_client = instance_api + + # Perform the method and check the result. + ignore_warnings = True + expected_request_update_mask = field_mask_pb2.FieldMask( + paths=["description", "single_cluster_routing"] + ) + + expected_request = { + "request": { + "app_profile": app_profile._to_pb(), + "update_mask": expected_request_update_mask, + "ignore_warnings": ignore_warnings, + } + } + + instance_api.update_app_profile.return_value = response_pb + app_profile._instance._client._instance_admin_client = instance_api + result = app_profile.update(ignore_warnings=ignore_warnings) + actual_request = client._instance_admin_client.update_app_profile.call_args_list[ + 0 + ].kwargs + + assert actual_request == expected_request + assert ( + result.metadata.type_url + == "type.googleapis.com/google.bigtable.admin.v2.UpdateAppProfileMetadata" + ) + + def test_app_profile_update_w_routing_single(): from google.longrunning import operations_pb2 from google.protobuf.any_pb2 import Any