Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bigtable: Handle dev instances and use storage utilization in metric scaler #3119

Merged
merged 16 commits into from
May 1, 2020
Merged
73 changes: 55 additions & 18 deletions bigtable/metricscaler/metricscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from google.cloud import bigtable
from google.cloud import monitoring_v3
from google.cloud.bigtable import enums
from google.cloud.monitoring_v3 import query

PROJECT = os.environ['GCLOUD_PROJECT']
Expand All @@ -39,12 +40,29 @@ def get_cpu_load():
metric_type='bigtable.googleapis.com/'
'cluster/cpu_load',
minutes=5)
time_series = list(cpu_query)
recent_time_series = time_series[0]
return recent_time_series.points[0].value.double_value
cpu = next(cpu_query.iter())
return cpu.points[0].value.double_value
# [END bigtable_cpu]


def get_storage_utilization():
"""Returns the most recent Cloud Bigtable storage utilization measurement.

Returns:
float: The most recent Cloud Bigtable storage utilization metric
"""
# [START bigtable_metric_scaler_storage_utilization]
client = monitoring_v3.MetricServiceClient()
utilization_query = query.Query(client,
project=PROJECT,
metric_type='bigtable.googleapis.com/'
'cluster/storage_utilization',
minutes=5)
utilization = next(utilization_query.iter())
return utilization.points[0].value.double_value
# [END bigtable_metric_scaler_storage_utilization]


def scale_bigtable(bigtable_instance, bigtable_cluster, scale_up):
"""Scales the number of Cloud Bigtable nodes up or down.

Expand Down Expand Up @@ -79,6 +97,9 @@ def scale_bigtable(bigtable_instance, bigtable_cluster, scale_up):
instance = bigtable_client.instance(bigtable_instance)
instance.reload()

if instance.type_ == enums.Instance.Type.DEVELOPMENT:
raise ValueError("Development instances cannot be scaled.")

cluster = instance.cluster(bigtable_cluster)
cluster.reload()

Expand All @@ -104,33 +125,43 @@ def scale_bigtable(bigtable_instance, bigtable_cluster, scale_up):


def main(
bigtable_instance,
bigtable_cluster,
high_cpu_threshold,
low_cpu_threshold,
short_sleep,
long_sleep):
bigtable_instance,
bigtable_cluster,
high_cpu_threshold,
low_cpu_threshold,
high_storage_threshold,
short_sleep,
long_sleep
):
"""Main loop runner that autoscales Cloud Bigtable.

Args:
bigtable_instance (str): Cloud Bigtable instance ID to autoscale
high_cpu_threshold (float): If CPU is higher than this, scale up.
low_cpu_threshold (float): If CPU is lower than this, scale down.
high_storage_threshold (float): If storage is higher than this,
scale up.
short_sleep (int): How long to sleep after no operation
long_sleep (int): How long to sleep after the number of nodes is
changed
"""
cluster_cpu = get_cpu_load()
cluster_storage = get_storage_utilization()
print('Detected cpu of {}'.format(cluster_cpu))
if cluster_cpu > high_cpu_threshold:
scale_bigtable(bigtable_instance, bigtable_cluster, True)
time.sleep(long_sleep)
elif cluster_cpu < low_cpu_threshold:
scale_bigtable(bigtable_instance, bigtable_cluster, False)
time.sleep(long_sleep)
else:
print('CPU within threshold, sleeping.')
time.sleep(short_sleep)
print('Detected storage utilization of {}'.format(cluster_storage))
try:
if cluster_cpu > high_cpu_threshold or cluster_storage > high_storage_threshold:
scale_bigtable(bigtable_instance, bigtable_cluster, True)
time.sleep(long_sleep)
elif cluster_cpu < low_cpu_threshold:
if cluster_storage < high_storage_threshold:
scale_bigtable(bigtable_instance, bigtable_cluster, False)
time.sleep(long_sleep)
else:
print('CPU within threshold, sleeping.')
time.sleep(short_sleep)
except Exception as e:
print("Error during scaling: %s", e)


if __name__ == '__main__':
Expand All @@ -150,6 +181,11 @@ def main(
'--low_cpu_threshold',
help='If Cloud Bigtable CPU usage is below this threshold, scale down',
default=0.2)
parser.add_argument(
'--high_storage_threshold',
help='If Cloud Bigtable storage utilization is above this threshold, '
'scale up',
default=0.6)
parser.add_argument(
'--short_sleep',
help='How long to sleep in seconds between checking metrics after no '
Expand All @@ -168,5 +204,6 @@ def main(
args.bigtable_cluster,
float(args.high_cpu_threshold),
float(args.low_cpu_threshold),
float(args.high_storage_threshold),
int(args.short_sleep),
int(args.long_sleep))
85 changes: 75 additions & 10 deletions bigtable/metricscaler/metricscaler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,24 @@

import os
import time

import random
import uuid

import pytest
from google.cloud import bigtable
from google.cloud.bigtable import enums
from mock import patch

from metricscaler import get_cpu_load
from metricscaler import get_storage_utilization
from metricscaler import main
from metricscaler import scale_bigtable

PROJECT = os.environ['GCLOUD_PROJECT']
BIGTABLE_ZONE = os.environ['BIGTABLE_ZONE']
SIZE_CHANGE_STEP = 3
INSTANCE_ID_FORMAT = 'metric-scale-test-{}'
INSTANCE_ID_RANGE = 10000
BIGTABLE_INSTANCE = INSTANCE_ID_FORMAT.format(
random.randrange(INSTANCE_ID_RANGE))
BIGTABLE_INSTANCE = INSTANCE_ID_FORMAT.format(str(uuid.uuid4())[:10])
BIGTABLE_DEV_INSTANCE = INSTANCE_ID_FORMAT.format(str(uuid.uuid4())[:10])


# System tests to verify API calls succeed
Expand All @@ -44,6 +43,10 @@ def test_get_cpu_load():
assert float(get_cpu_load()) > 0.0


def test_get_storage_utilization():
assert float(get_storage_utilization()) > 0.0


@pytest.fixture()
def instance():
cluster_id = BIGTABLE_INSTANCE
Expand All @@ -68,6 +71,29 @@ def instance():
instance.delete()


@pytest.fixture()
def dev_instance():
cluster_id = BIGTABLE_DEV_INSTANCE

client = bigtable.Client(project=PROJECT, admin=True)

storage_type = enums.StorageType.SSD
development = enums.Instance.Type.DEVELOPMENT
labels = {'dev-label': 'dev-label'}
instance = client.instance(BIGTABLE_DEV_INSTANCE,
instance_type=development,
labels=labels)

if not instance.exists():
cluster = instance.cluster(cluster_id, location_id=BIGTABLE_ZONE,
default_storage_type=storage_type)
instance.create(clusters=[cluster])

yield

instance.delete()


def test_scale_bigtable(instance):
bigtable_client = bigtable.Client(admin=True)

Expand Down Expand Up @@ -103,31 +129,70 @@ def test_scale_bigtable(instance):
raise


# Unit test for logic
def test_handle_dev_instance(capsys, dev_instance):
with pytest.raises(ValueError):
scale_bigtable(BIGTABLE_DEV_INSTANCE, BIGTABLE_DEV_INSTANCE, True)


@patch('time.sleep')
@patch('metricscaler.get_storage_utilization')
@patch('metricscaler.get_cpu_load')
@patch('metricscaler.scale_bigtable')
def test_main(scale_bigtable, get_cpu_load, sleep):
def test_main(scale_bigtable, get_cpu_load, get_storage_utilization, sleep):
SHORT_SLEEP = 5
LONG_SLEEP = 10

# Test okay CPU, okay storage utilization
get_cpu_load.return_value = 0.5
get_storage_utilization.return_value = 0.5

main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, SHORT_SLEEP,
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_not_called()
scale_bigtable.reset_mock()

# Test high CPU, okay storage utilization
get_cpu_load.return_value = 0.7
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, SHORT_SLEEP,
get_storage_utilization.return_value = 0.5
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, True)
scale_bigtable.reset_mock()

# Test low CPU, okay storage utilization
get_storage_utilization.return_value = 0.5
get_cpu_load.return_value = 0.2
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, SHORT_SLEEP,
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, False)
scale_bigtable.reset_mock()

# Test okay CPU, high storage utilization
get_cpu_load.return_value = 0.5
get_storage_utilization.return_value = 0.7

main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, True)
scale_bigtable.reset_mock()

# Test high CPU, high storage utilization
get_cpu_load.return_value = 0.7
get_storage_utilization.return_value = 0.7
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, True)
scale_bigtable.reset_mock()

# Test low CPU, high storage utilization
get_cpu_load.return_value = 0.2
get_storage_utilization.return_value = 0.7
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, True)
scale_bigtable.reset_mock()