Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for autoscaling #509

Merged
merged 7 commits into from Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/snippets.py
Expand Up @@ -401,6 +401,25 @@ def test_bigtable_update_cluster():
assert cluster.serve_nodes == 4


def test_bigtable_cluster_disable_autoscaling():
# [START bigtable_api_cluster_disable_autoscaling]
from google.cloud.bigtable import Client

client = Client(admin=True)
instance = client.instance(INSTANCE_ID)
# Create a cluster with autoscaling enabled
cluster = instance.cluster(
CLUSTER_ID, min_serve_nodes=1, max_serve_nodes=2, cpu_utilization_percent=10
)
instance.create(clusters=[cluster])

# Disable autoscaling
cluster.disable_autoscaling(serve_nodes=4)
# [END bigtable_api_cluster_disable_autoscaling]

assert cluster.serve_nodes == 4


def test_bigtable_create_table():
# [START bigtable_api_create_table]
from google.api_core import exceptions
Expand Down
176 changes: 165 additions & 11 deletions google/cloud/bigtable/cluster.py
Expand Up @@ -18,6 +18,7 @@
import re
from google.cloud.bigtable_admin_v2.types import instance
from google.api_core.exceptions import NotFound
from google.protobuf import field_mask_pb2


_CLUSTER_NAME_RE = re.compile(
Expand All @@ -36,6 +37,7 @@ class Cluster(object):
* :meth:`create` itself
* :meth:`update` itself
* :meth:`delete` itself
* :meth:`disable_autoscaling` itself

:type cluster_id: str
:param cluster_id: The ID of the cluster.
Expand All @@ -52,7 +54,9 @@ class Cluster(object):
https://cloud.google.com/bigtable/docs/locations

:type serve_nodes: int
:param serve_nodes: (Optional) The number of nodes in the cluster.
:param serve_nodes: (Optional) The number of nodes in the cluster for manual scaling. If any of the
autoscaling configuration are specified, then the autoscaling
configuration will take precedent.

:type default_storage_type: int
:param default_storage_type: (Optional) The type of storage
Expand Down Expand Up @@ -85,6 +89,27 @@ class Cluster(object):
:data:`google.cloud.bigtable.enums.Cluster.State.CREATING`.
:data:`google.cloud.bigtable.enums.Cluster.State.RESIZING`.
:data:`google.cloud.bigtable.enums.Cluster.State.DISABLED`.

:type min_serve_nodes: int
:param min_serve_nodes: (Optional) The minimum number of nodes to be set in the cluster for autoscaling.
Must be 1 or greater.
If specified, this configuration takes precedence over
``serve_nodes``.
If specified, then
``max_serve_nodes`` and ``cpu_utilization_percent`` must be
specified too.

:type max_serve_nodes: int
:param max_serve_nodes: (Optional) The maximum number of nodes to be set in the cluster for autoscaling.
If specified, this configuration
takes precedence over ``serve_nodes``. If specified, then
``min_serve_nodes`` and ``cpu_utilization_percent`` must be
specified too.

:param cpu_utilization_percent: (Optional) The CPU utilization target for the cluster's workload for autoscaling.
If specified, this configuration takes precedence over ``serve_nodes``. If specified, then
``min_serve_nodes`` and ``max_serve_nodes`` must be
specified too.
"""

def __init__(
Expand All @@ -96,6 +121,9 @@ def __init__(
default_storage_type=None,
kms_key_name=None,
_state=None,
min_serve_nodes=None,
max_serve_nodes=None,
cpu_utilization_percent=None,
):
self.cluster_id = cluster_id
self._instance = instance
Expand All @@ -104,10 +132,13 @@ def __init__(
self.default_storage_type = default_storage_type
self._kms_key_name = kms_key_name
self._state = _state
self.min_serve_nodes = min_serve_nodes
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
self.max_serve_nodes = max_serve_nodes
self.cpu_utilization_percent = cpu_utilization_percent

@classmethod
def from_pb(cls, cluster_pb, instance):
"""Creates an cluster instance from a protobuf.
"""Creates a cluster instance from a protobuf.

For example:

Expand Down Expand Up @@ -159,6 +190,17 @@ def _update_from_pb(self, cluster_pb):

self.location_id = cluster_pb.location.split("/")[-1]
self.serve_nodes = cluster_pb.serve_nodes

self.min_serve_nodes = (
cluster_pb.cluster_config.cluster_autoscaling_config.autoscaling_limits.min_serve_nodes
)
self.max_serve_nodes = (
cluster_pb.cluster_config.cluster_autoscaling_config.autoscaling_limits.max_serve_nodes
)
self.cpu_utilization_percent = (
cluster_pb.cluster_config.cluster_autoscaling_config.autoscaling_targets.cpu_utilization_percent
)

self.default_storage_type = cluster_pb.default_storage_type
if cluster_pb.encryption_config:
self._kms_key_name = cluster_pb.encryption_config.kms_key_name
Expand Down Expand Up @@ -211,6 +253,42 @@ def kms_key_name(self):
"""str: Customer managed encryption key for the cluster."""
return self._kms_key_name

def _validate_scaling_config(self):
"""Validate auto/manual scaling configuration before creating or updating."""

if (
not self.serve_nodes
and not self.min_serve_nodes
and not self.max_serve_nodes
and not self.cpu_utilization_percent
):
raise ValueError(
"Must specify either serve_nodes or all of the autoscaling configurations (min_serve_nodes, max_serve_nodes, and cpu_utilization_percent)."
)
if self.serve_nodes and (
self.max_serve_nodes or self.min_serve_nodes or self.cpu_utilization_percent
):
raise ValueError(
"Cannot specify both serve_nodes and autoscaling configurations (min_serve_nodes, max_serve_nodes, and cpu_utilization_percent)."
)
if (
(
self.min_serve_nodes
and (not self.max_serve_nodes or not self.cpu_utilization_percent)
)
or (
self.max_serve_nodes
and (not self.min_serve_nodes or not self.cpu_utilization_percent)
)
or (
self.cpu_utilization_percent
and (not self.min_serve_nodes or not self.max_serve_nodes)
)
):
raise ValueError(
"All of autoscaling configurations must be specified at the same time (min_serve_nodes, max_serve_nodes, and cpu_utilization_percent)."
)

def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
Expand Down Expand Up @@ -290,7 +368,15 @@ def create(self):
:rtype: :class:`~google.api_core.operation.Operation`
:returns: The long-running operation corresponding to the
create operation.

:raises: :class:`ValueError <exceptions.ValueError>` if the both ``serve_nodes`` and autoscaling configurations
are set at the same time or if none of the ``serve_nodes`` or autoscaling configurations are set
or if the autoscaling configurations are only partially set.

"""

self._validate_scaling_config()

client = self._instance._client
cluster_pb = self._to_pb()

Expand Down Expand Up @@ -323,20 +409,73 @@ def update(self):

before calling :meth:`update`.

If autoscaling is already enabled, manual scaling will be silently ignored.
To disable autoscaling and enable manual scaling, use the :meth:`disable_autoscaling` instead.

:rtype: :class:`Operation`
:returns: The long-running operation corresponding to the
update operation.

"""

client = self._instance._client
# We are passing `None` for third argument location.
# Location is set only at the time of creation of a cluster
# and can not be changed after cluster has been created.
return client.instance_admin_client.update_cluster(
request={
"serve_nodes": self.serve_nodes,
"name": self.name,
"location": None,
}

update_mask_pb = field_mask_pb2.FieldMask()

if self.serve_nodes:
update_mask_pb.paths.append("serve_nodes")

if self.min_serve_nodes:
update_mask_pb.paths.append(
"cluster_config.cluster_autoscaling_config.autoscaling_limits.min_serve_nodes"
)
if self.max_serve_nodes:
update_mask_pb.paths.append(
"cluster_config.cluster_autoscaling_config.autoscaling_limits.max_serve_nodes"
)
if self.cpu_utilization_percent:
update_mask_pb.paths.append(
"cluster_config.cluster_autoscaling_config.autoscaling_targets.cpu_utilization_percent"
)

cluster_pb = self._to_pb()
cluster_pb.name = self.name

return client.instance_admin_client.partial_update_cluster(
request={"cluster": cluster_pb, "update_mask": update_mask_pb}
)

def disable_autoscaling(self, serve_nodes):
"""
Disable autoscaling by specifying the number of nodes.

For example:

.. literalinclude:: snippets.py
:start-after: [START bigtable_api_cluster_disable_autoscaling]
:end-before: [END bigtable_api_cluster_disable_autoscaling]
:dedent: 4

:type serve_nodes: int
:param serve_nodes: The number of nodes in the cluster.
"""

client = self._instance._client

update_mask_pb = field_mask_pb2.FieldMask()

self.serve_nodes = serve_nodes
self.min_serve_nodes = 0
self.max_serve_nodes = 0
self.cpu_utilization_percent = 0

update_mask_pb.paths.append("serve_nodes")
update_mask_pb.paths.append("cluster_config.cluster_autoscaling_config")
cluster_pb = self._to_pb()
cluster_pb.name = self.name

return client.instance_admin_client.partial_update_cluster(
request={"cluster": cluster_pb, "update_mask": update_mask_pb}
)

def delete(self):
Expand Down Expand Up @@ -375,6 +514,7 @@ def _to_pb(self):
location = client.instance_admin_client.common_location_path(
client.project, self.location_id
)

cluster_pb = instance.Cluster(
location=location,
serve_nodes=self.serve_nodes,
Expand All @@ -384,4 +524,18 @@ def _to_pb(self):
cluster_pb.encryption_config = instance.Cluster.EncryptionConfig(
kms_key_name=self._kms_key_name,
)

if self.min_serve_nodes:
cluster_pb.cluster_config.cluster_autoscaling_config.autoscaling_limits.min_serve_nodes = (
self.min_serve_nodes
)
if self.max_serve_nodes:
cluster_pb.cluster_config.cluster_autoscaling_config.autoscaling_limits.max_serve_nodes = (
self.max_serve_nodes
)
if self.cpu_utilization_percent:
cluster_pb.cluster_config.cluster_autoscaling_config.autoscaling_targets.cpu_utilization_percent = (
self.cpu_utilization_percent
)

return cluster_pb
15 changes: 15 additions & 0 deletions google/cloud/bigtable/instance.py
Expand Up @@ -228,6 +228,9 @@ def create(
serve_nodes=None,
default_storage_type=None,
clusters=None,
min_serve_nodes=None,
max_serve_nodes=None,
cpu_utilization_percent=None,
):
"""Create this instance.

Expand Down Expand Up @@ -303,12 +306,18 @@ def create(
location_id=location_id,
serve_nodes=serve_nodes,
default_storage_type=default_storage_type,
min_serve_nodes=None,
max_serve_nodes=None,
cpu_utilization_percent=None,
)
]
elif (
location_id is not None
or serve_nodes is not None
or default_storage_type is not None
or min_serve_nodes is not None
or max_serve_nodes is not None
or cpu_utilization_percent is not None
):
raise ValueError(
"clusters and one of location_id, serve_nodes, \
Expand Down Expand Up @@ -546,6 +555,9 @@ def cluster(
serve_nodes=None,
default_storage_type=None,
kms_key_name=None,
min_serve_nodes=None,
max_serve_nodes=None,
cpu_utilization_percent=None,
):
"""Factory to create a cluster associated with this instance.

Expand Down Expand Up @@ -605,6 +617,9 @@ def cluster(
serve_nodes=serve_nodes,
default_storage_type=default_storage_type,
kms_key_name=kms_key_name,
min_serve_nodes=min_serve_nodes,
max_serve_nodes=max_serve_nodes,
cpu_utilization_percent=cpu_utilization_percent,
)

def list_clusters(self):
Expand Down
33 changes: 33 additions & 0 deletions tests/system/conftest.py
Expand Up @@ -107,6 +107,24 @@ def admin_cluster(admin_instance, admin_cluster_id, location_id, serve_nodes):
)


@pytest.fixture(scope="session")
def admin_cluster_with_autoscaling(
admin_instance,
admin_cluster_id,
location_id,
min_serve_nodes,
max_serve_nodes,
cpu_utilization_percent,
):
return admin_instance.cluster(
admin_cluster_id,
location_id=location_id,
min_serve_nodes=min_serve_nodes,
max_serve_nodes=max_serve_nodes,
cpu_utilization_percent=cpu_utilization_percent,
)


@pytest.fixture(scope="session")
def admin_instance_populated(admin_instance, admin_cluster, in_emulator):
# Emulator does not support instance admin operations (create / delete).
Expand Down Expand Up @@ -170,3 +188,18 @@ def instances_to_delete():

for instance in instances_to_delete:
_helpers.retry_429(instance.delete)()


@pytest.fixture(scope="session")
def min_serve_nodes(in_emulator):
return 1


@pytest.fixture(scope="session")
def max_serve_nodes(in_emulator):
return 8


@pytest.fixture(scope="session")
def cpu_utilization_percent(in_emulator):
return 10