Skip to content

Commit

Permalink
bigtable: Handle dev instances and use storage utilization in metric …
Browse files Browse the repository at this point in the history
…scaler (#3119)

* WIP handle development instances in metric scaler

* use storage utilization and tested

* Fix metric queries

* remove tests for low storage util

* cleanup metric query

* EOF new line

* use uuid instead of random

* lint

* fix uuid length

* fix uuid length

* fix uuid length (again)

Co-authored-by: Christopher Wilcox <crwilcox@google.com>
Co-authored-by: Takashi Matsuo <tmatsuo@google.com>
  • Loading branch information
3 people committed May 1, 2020
1 parent 8b569cb commit 298bbc1
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 28 deletions.
73 changes: 55 additions & 18 deletions bigtable/metricscaler/metricscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from google.cloud import bigtable
from google.cloud import monitoring_v3
from google.cloud.bigtable import enums
from google.cloud.monitoring_v3 import query

PROJECT = os.environ['GCLOUD_PROJECT']
Expand All @@ -39,12 +40,29 @@ def get_cpu_load():
metric_type='bigtable.googleapis.com/'
'cluster/cpu_load',
minutes=5)
time_series = list(cpu_query)
recent_time_series = time_series[0]
return recent_time_series.points[0].value.double_value
cpu = next(cpu_query.iter())
return cpu.points[0].value.double_value
# [END bigtable_cpu]


def get_storage_utilization():
"""Returns the most recent Cloud Bigtable storage utilization measurement.
Returns:
float: The most recent Cloud Bigtable storage utilization metric
"""
# [START bigtable_metric_scaler_storage_utilization]
client = monitoring_v3.MetricServiceClient()
utilization_query = query.Query(client,
project=PROJECT,
metric_type='bigtable.googleapis.com/'
'cluster/storage_utilization',
minutes=5)
utilization = next(utilization_query.iter())
return utilization.points[0].value.double_value
# [END bigtable_metric_scaler_storage_utilization]


def scale_bigtable(bigtable_instance, bigtable_cluster, scale_up):
"""Scales the number of Cloud Bigtable nodes up or down.
Expand Down Expand Up @@ -79,6 +97,9 @@ def scale_bigtable(bigtable_instance, bigtable_cluster, scale_up):
instance = bigtable_client.instance(bigtable_instance)
instance.reload()

if instance.type_ == enums.Instance.Type.DEVELOPMENT:
raise ValueError("Development instances cannot be scaled.")

cluster = instance.cluster(bigtable_cluster)
cluster.reload()

Expand All @@ -104,33 +125,43 @@ def scale_bigtable(bigtable_instance, bigtable_cluster, scale_up):


def main(
bigtable_instance,
bigtable_cluster,
high_cpu_threshold,
low_cpu_threshold,
short_sleep,
long_sleep):
bigtable_instance,
bigtable_cluster,
high_cpu_threshold,
low_cpu_threshold,
high_storage_threshold,
short_sleep,
long_sleep
):
"""Main loop runner that autoscales Cloud Bigtable.
Args:
bigtable_instance (str): Cloud Bigtable instance ID to autoscale
high_cpu_threshold (float): If CPU is higher than this, scale up.
low_cpu_threshold (float): If CPU is lower than this, scale down.
high_storage_threshold (float): If storage is higher than this,
scale up.
short_sleep (int): How long to sleep after no operation
long_sleep (int): How long to sleep after the number of nodes is
changed
"""
cluster_cpu = get_cpu_load()
cluster_storage = get_storage_utilization()
print('Detected cpu of {}'.format(cluster_cpu))
if cluster_cpu > high_cpu_threshold:
scale_bigtable(bigtable_instance, bigtable_cluster, True)
time.sleep(long_sleep)
elif cluster_cpu < low_cpu_threshold:
scale_bigtable(bigtable_instance, bigtable_cluster, False)
time.sleep(long_sleep)
else:
print('CPU within threshold, sleeping.')
time.sleep(short_sleep)
print('Detected storage utilization of {}'.format(cluster_storage))
try:
if cluster_cpu > high_cpu_threshold or cluster_storage > high_storage_threshold:
scale_bigtable(bigtable_instance, bigtable_cluster, True)
time.sleep(long_sleep)
elif cluster_cpu < low_cpu_threshold:
if cluster_storage < high_storage_threshold:
scale_bigtable(bigtable_instance, bigtable_cluster, False)
time.sleep(long_sleep)
else:
print('CPU within threshold, sleeping.')
time.sleep(short_sleep)
except Exception as e:
print("Error during scaling: %s", e)


if __name__ == '__main__':
Expand All @@ -150,6 +181,11 @@ def main(
'--low_cpu_threshold',
help='If Cloud Bigtable CPU usage is below this threshold, scale down',
default=0.2)
parser.add_argument(
'--high_storage_threshold',
help='If Cloud Bigtable storage utilization is above this threshold, '
'scale up',
default=0.6)
parser.add_argument(
'--short_sleep',
help='How long to sleep in seconds between checking metrics after no '
Expand All @@ -168,5 +204,6 @@ def main(
args.bigtable_cluster,
float(args.high_cpu_threshold),
float(args.low_cpu_threshold),
float(args.high_storage_threshold),
int(args.short_sleep),
int(args.long_sleep))
85 changes: 75 additions & 10 deletions bigtable/metricscaler/metricscaler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,24 @@

import os
import time

import random
import uuid

import pytest
from google.cloud import bigtable
from google.cloud.bigtable import enums
from mock import patch

from metricscaler import get_cpu_load
from metricscaler import get_storage_utilization
from metricscaler import main
from metricscaler import scale_bigtable

PROJECT = os.environ['GCLOUD_PROJECT']
BIGTABLE_ZONE = os.environ['BIGTABLE_ZONE']
SIZE_CHANGE_STEP = 3
INSTANCE_ID_FORMAT = 'metric-scale-test-{}'
INSTANCE_ID_RANGE = 10000
BIGTABLE_INSTANCE = INSTANCE_ID_FORMAT.format(
random.randrange(INSTANCE_ID_RANGE))
BIGTABLE_INSTANCE = INSTANCE_ID_FORMAT.format(str(uuid.uuid4())[:10])
BIGTABLE_DEV_INSTANCE = INSTANCE_ID_FORMAT.format(str(uuid.uuid4())[:10])


# System tests to verify API calls succeed
Expand All @@ -44,6 +43,10 @@ def test_get_cpu_load():
assert float(get_cpu_load()) > 0.0


def test_get_storage_utilization():
assert float(get_storage_utilization()) > 0.0


@pytest.fixture()
def instance():
cluster_id = BIGTABLE_INSTANCE
Expand All @@ -68,6 +71,29 @@ def instance():
instance.delete()


@pytest.fixture()
def dev_instance():
cluster_id = BIGTABLE_DEV_INSTANCE

client = bigtable.Client(project=PROJECT, admin=True)

storage_type = enums.StorageType.SSD
development = enums.Instance.Type.DEVELOPMENT
labels = {'dev-label': 'dev-label'}
instance = client.instance(BIGTABLE_DEV_INSTANCE,
instance_type=development,
labels=labels)

if not instance.exists():
cluster = instance.cluster(cluster_id, location_id=BIGTABLE_ZONE,
default_storage_type=storage_type)
instance.create(clusters=[cluster])

yield

instance.delete()


def test_scale_bigtable(instance):
bigtable_client = bigtable.Client(admin=True)

Expand Down Expand Up @@ -103,31 +129,70 @@ def test_scale_bigtable(instance):
raise


# Unit test for logic
def test_handle_dev_instance(capsys, dev_instance):
with pytest.raises(ValueError):
scale_bigtable(BIGTABLE_DEV_INSTANCE, BIGTABLE_DEV_INSTANCE, True)


@patch('time.sleep')
@patch('metricscaler.get_storage_utilization')
@patch('metricscaler.get_cpu_load')
@patch('metricscaler.scale_bigtable')
def test_main(scale_bigtable, get_cpu_load, sleep):
def test_main(scale_bigtable, get_cpu_load, get_storage_utilization, sleep):
SHORT_SLEEP = 5
LONG_SLEEP = 10

# Test okay CPU, okay storage utilization
get_cpu_load.return_value = 0.5
get_storage_utilization.return_value = 0.5

main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, SHORT_SLEEP,
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_not_called()
scale_bigtable.reset_mock()

# Test high CPU, okay storage utilization
get_cpu_load.return_value = 0.7
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, SHORT_SLEEP,
get_storage_utilization.return_value = 0.5
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, True)
scale_bigtable.reset_mock()

# Test low CPU, okay storage utilization
get_storage_utilization.return_value = 0.5
get_cpu_load.return_value = 0.2
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, SHORT_SLEEP,
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, False)
scale_bigtable.reset_mock()

# Test okay CPU, high storage utilization
get_cpu_load.return_value = 0.5
get_storage_utilization.return_value = 0.7

main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, True)
scale_bigtable.reset_mock()

# Test high CPU, high storage utilization
get_cpu_load.return_value = 0.7
get_storage_utilization.return_value = 0.7
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, True)
scale_bigtable.reset_mock()

# Test low CPU, high storage utilization
get_cpu_load.return_value = 0.2
get_storage_utilization.return_value = 0.7
main(BIGTABLE_INSTANCE, BIGTABLE_INSTANCE, 0.6, 0.3, 0.6, SHORT_SLEEP,
LONG_SLEEP)
scale_bigtable.assert_called_once_with(BIGTABLE_INSTANCE,
BIGTABLE_INSTANCE, True)
scale_bigtable.reset_mock()

0 comments on commit 298bbc1

Please sign in to comment.