Skip to content

Commit

Permalink
feat: App Profile multi cluster routing support with specified cluste…
Browse files Browse the repository at this point in the history
…r ids (#549)

* feat: App Profile multi cluster routing support with specified cluster ids

Add the `multi_cluster_ids` parameter to AppProfile.
It is used when the routing policy type is set to "any".

Add tests, docstrings.
Fix some typos.

Closes internal issue 213627978

* Add system test for app profile with multi cluster ids.

* Specify longer timeout
  • Loading branch information
Mariatta committed Apr 14, 2022
1 parent 4e50278 commit a0ed5b5
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 9 deletions.
18 changes: 16 additions & 2 deletions google/cloud/bigtable/app_profile.py
Expand Up @@ -59,6 +59,11 @@ class AppProfile(object):
when routing_policy_type is
ROUTING_POLICY_TYPE_SINGLE.
:type: multi_cluster_ids: list
:param: multi_cluster_ids: (Optional) The set of clusters to route to.
The order is ignored; clusters will be tried in order of distance.
If left empty, all clusters are eligible.
:type: allow_transactional_writes: bool
:param: allow_transactional_writes: (Optional) If true, allow
transactional writes for
Expand All @@ -72,13 +77,15 @@ def __init__(
routing_policy_type=None,
description=None,
cluster_id=None,
multi_cluster_ids=None,
allow_transactional_writes=None,
):
self.app_profile_id = app_profile_id
self._instance = instance
self.routing_policy_type = routing_policy_type
self.description = description
self.cluster_id = cluster_id
self.multi_cluster_ids = multi_cluster_ids
self.allow_transactional_writes = allow_transactional_writes

@property
Expand Down Expand Up @@ -184,13 +191,17 @@ def _update_from_pb(self, app_profile_pb):
self.routing_policy_type = None
self.allow_transactional_writes = None
self.cluster_id = None

self.multi_cluster_ids = None
self.description = app_profile_pb.description

routing_policy_type = None
if app_profile_pb._pb.HasField("multi_cluster_routing_use_any"):
routing_policy_type = RoutingPolicyType.ANY
self.allow_transactional_writes = False
if app_profile_pb.multi_cluster_routing_use_any.cluster_ids:
self.multi_cluster_ids = (
app_profile_pb.multi_cluster_routing_use_any.cluster_ids
)
else:
routing_policy_type = RoutingPolicyType.SINGLE
self.cluster_id = app_profile_pb.single_cluster_routing.cluster_id
Expand All @@ -215,7 +226,9 @@ def _to_pb(self):

if self.routing_policy_type == RoutingPolicyType.ANY:
multi_cluster_routing_use_any = (
instance.AppProfile.MultiClusterRoutingUseAny()
instance.AppProfile.MultiClusterRoutingUseAny(
cluster_ids=self.multi_cluster_ids
)
)
else:
single_cluster_routing = instance.AppProfile.SingleClusterRouting(
Expand Down Expand Up @@ -312,6 +325,7 @@ def update(self, ignore_warnings=None):
``routing_policy_type``
``description``
``cluster_id``
``multi_cluster_ids``
``allow_transactional_writes``
For example:
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/bigtable/instance.py
Expand Up @@ -711,6 +711,7 @@ def app_profile(
routing_policy_type=None,
description=None,
cluster_id=None,
multi_cluster_ids=None,
allow_transactional_writes=None,
):
"""Factory to create AppProfile associated with this instance.
Expand Down Expand Up @@ -742,6 +743,11 @@ def app_profile(
when routing_policy_type is
ROUTING_POLICY_TYPE_SINGLE.
:type: multi_cluster_ids: list
:param: multi_cluster_ids: (Optional) The set of clusters to route to.
The order is ignored; clusters will be tried in order of distance.
If left empty, all clusters are eligible.
:type: allow_transactional_writes: bool
:param: allow_transactional_writes: (Optional) If true, allow
transactional writes for
Expand All @@ -756,6 +762,7 @@ def app_profile(
routing_policy_type=routing_policy_type,
description=description,
cluster_id=cluster_id,
multi_cluster_ids=multi_cluster_ids,
allow_transactional_writes=allow_transactional_writes,
)

Expand Down
101 changes: 100 additions & 1 deletion tests/system/test_instance_admin.py
Expand Up @@ -24,6 +24,7 @@ def _create_app_profile_helper(
routing_policy_type,
description=None,
cluster_id=None,
multi_cluster_ids=None,
allow_transactional_writes=None,
ignore_warnings=None,
):
Expand All @@ -33,14 +34,15 @@ def _create_app_profile_helper(
routing_policy_type=routing_policy_type,
description=description,
cluster_id=cluster_id,
multi_cluster_ids=multi_cluster_ids,
allow_transactional_writes=allow_transactional_writes,
)
assert app_profile.allow_transactional_writes == allow_transactional_writes

app_profile.create(ignore_warnings=ignore_warnings)

# Load a different app_profile objec form the server and
# verrify that it is the same
# verify that it is the same
alt_app_profile = instance.app_profile(app_profile_id)
alt_app_profile.reload()

Expand All @@ -50,6 +52,7 @@ def _create_app_profile_helper(
app_profile.description == alt_app_profile.description
assert not app_profile.allow_transactional_writes
assert not alt_app_profile.allow_transactional_writes
assert app_profile.multi_cluster_ids == alt_app_profile.multi_cluster_ids

return app_profile

Expand All @@ -67,6 +70,7 @@ def _modify_app_profile_helper(
routing_policy_type,
description=None,
cluster_id=None,
multi_cluster_ids=None,
allow_transactional_writes=None,
ignore_warnings=None,
):
Expand All @@ -75,6 +79,7 @@ def _modify_app_profile_helper(
routing_policy_type=routing_policy_type,
description=description,
cluster_id=cluster_id,
multi_cluster_ids=multi_cluster_ids,
allow_transactional_writes=allow_transactional_writes,
)

Expand All @@ -87,6 +92,7 @@ def _modify_app_profile_helper(
assert alt_profile.description == description
assert alt_profile.routing_policy_type == routing_policy_type
assert alt_profile.cluster_id == cluster_id
assert alt_profile.multi_cluster_ids == multi_cluster_ids
assert alt_profile.allow_transactional_writes == allow_transactional_writes


Expand Down Expand Up @@ -395,6 +401,99 @@ def test_instance_create_w_two_clusters(
_delete_app_profile_helper(app_profile)


def test_instance_create_app_profile_create_with_multi_cluster_ids(
admin_client,
unique_suffix,
admin_instance_populated,
admin_cluster,
location_id,
instance_labels,
instances_to_delete,
skip_on_emulator,
):
alt_instance_id = f"dif{unique_suffix}"
instance = admin_client.instance(
alt_instance_id,
instance_type=enums.Instance.Type.PRODUCTION,
labels=instance_labels,
)

serve_nodes = 1

alt_cluster_id_1 = f"{alt_instance_id}-c1"
cluster_1 = instance.cluster(
alt_cluster_id_1,
location_id=location_id,
serve_nodes=serve_nodes,
default_storage_type=enums.StorageType.HDD,
)

alt_cluster_id_2 = f"{alt_instance_id}-c2"
location_id_2 = "us-central1-f"
cluster_2 = instance.cluster(
alt_cluster_id_2,
location_id=location_id_2,
serve_nodes=serve_nodes,
default_storage_type=enums.StorageType.HDD,
)
operation = instance.create(clusters=[cluster_1, cluster_2])
instances_to_delete.append(instance)
operation.result(timeout=240) # Ensure the operation completes.

# Create a new instance and make sure it is the same.
instance_alt = admin_client.instance(alt_instance_id)
instance_alt.reload()

assert instance == instance_alt
assert instance.display_name == instance_alt.display_name
assert instance.type_ == instance_alt.type_

clusters, failed_locations = instance_alt.list_clusters()
assert failed_locations == []
alt_cluster_1, alt_cluster_2 = sorted(clusters, key=lambda x: x.name)

assert cluster_1.location_id == alt_cluster_1.location_id
assert cluster_2.location_id == alt_cluster_2.location_id

# Test create app profile with multi_cluster_routing policy
app_profiles_to_delete = []
description = "routing policy-multi"
app_profile_id_1 = "app_profile_id_1"
routing = enums.RoutingPolicyType.ANY

multi_cluster_ids = [alt_cluster_id_1, alt_cluster_id_2]
app_profile_1 = _create_app_profile_helper(
app_profile_id_1,
instance,
routing_policy_type=routing,
description=description,
ignore_warnings=True,
multi_cluster_ids=multi_cluster_ids,
)
assert len(app_profile_1.multi_cluster_ids) == len(multi_cluster_ids)
assert app_profile_1.multi_cluster_ids == multi_cluster_ids

# remove a cluster from the multi_cluster_ids
app_profile_1.multi_cluster_ids.pop()
app_profile_1.update()

assert len(app_profile_1.multi_cluster_ids) == 1
assert app_profile_1.multi_cluster_ids == [alt_cluster_id_1]

# add a cluster from the multi_cluster_ids
app_profile_1.multi_cluster_ids.append(alt_cluster_id_2)
app_profile_1.update()

assert len(app_profile_1.multi_cluster_ids) == 2
assert app_profile_1.multi_cluster_ids == [alt_cluster_id_1, alt_cluster_id_2]

app_profiles_to_delete.append(app_profile_1)

# # Test delete app profiles
for app_profile in app_profiles_to_delete:
_delete_app_profile_helper(app_profile)


def test_instance_create_w_two_clusters_cmek(
admin_client,
unique_suffix,
Expand Down

0 comments on commit a0ed5b5

Please sign in to comment.