Skip to content

Commit

Permalink
Merge 85604bf into 87b3aaa
Browse files Browse the repository at this point in the history
  • Loading branch information
nuwang committed Aug 23, 2020
2 parents 87b3aaa + 85604bf commit 5fed32f
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 19 deletions.
6 changes: 3 additions & 3 deletions cloudman/clusterman/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def cloudlaunch_url(self):

@property
def cloudlaunch_token(self):
# Always perform internal tasks as the admin user
token_obj, _ = cl_models.AuthToken.objects.get_or_create(
name='cloudman', user=self.user)
return token_obj.key
Expand Down Expand Up @@ -165,11 +164,12 @@ def get(self, node_id):
self.check_permissions('clusternodes.view_clusternode', obj)
return self.to_api_object(obj)

def create(self, vm_type=None, zone=None, autoscaler=None):
def create(self, vm_type=None, zone=None, autoscaler=None, min_vcpus=0, min_ram=0):
self.check_permissions('clusternodes.add_clusternode')
name = "{0}-{1}".format(self.cluster.name, str(uuid.uuid4())[:6])
template = self.cluster.get_cluster_template()
cli_deployment = template.add_node(name, vm_type=vm_type, zone=zone)
cli_deployment = template.add_node(name, vm_type=vm_type, zone=zone,
min_vcpus=min_vcpus, min_ram=min_ram)
deployment = cl_models.ApplicationDeployment.objects.get(
pk=cli_deployment.id)
node = models.CMClusterNode.objects.create(
Expand Down
48 changes: 41 additions & 7 deletions cloudman/clusterman/cluster_templates.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import abc
import os
import yaml
from rest_framework.exceptions import ValidationError
from .clients.rancher import RancherClient
from cloudlaunch import models as cl_models
import subprocess


class CMClusterTemplate(object):
Expand Down Expand Up @@ -74,8 +71,38 @@ def rancher_client(self):
self.rancher_cluster_id,
self.rancher_project_id)

def add_node(self, name, vm_type=None, zone=None):
print("Adding node: {0} of type: {1}".format(name, vm_type))
def _find_matching_vm_type(self, zone_model=None, default_vm_type=None,
min_vcpus=0, min_ram=0, vm_family=""):
"""
Finds the vm_type that best matches the given criteria. If no criteria
is specified, will return the default vm type.
:param zone_model:
:param default_vm_type:
:param min_vcpus:
:param min_ram:
:param vm_family:
:return:
"""
vm_type = default_vm_type or self.cluster.default_vm_type
if min_vcpus > 0 or min_ram > 0 or not vm_type.startswith(vm_family):
cloud = self.context.cloudlaunch_client.infrastructure.clouds.get(
zone_model.region.cloud.id)
region = cloud.regions.get(zone_model.region.region_id)
zone = region.zones.get(zone_model.zone_id)
default_matches = zone.vm_types.list(vm_type_prefix=vm_type)
if default_matches:
default_match = default_matches[0]
min_vcpus = min_vcpus if min_vcpus > int(default_match.vcpus) else default_match.vcpus
min_ram = min_ram if min_ram > float(default_match.ram) else default_match.ram
candidates = zone.vm_types.list(min_vcpus=min_vcpus, min_ram=min_ram,
vm_type_prefix=vm_family)
if candidates:
candidate_type = sorted(candidates, key=lambda x: int(x.vcpus) * float(x.ram))[0]
return candidate_type.name
return vm_type

def add_node(self, name, vm_type=None, zone=None, min_vcpus=0, min_ram=0, vm_family=""):
settings = self.cluster.connection_settings
zone = zone or self.cluster.default_zone
deployment_target = cl_models.CloudDeploymentTarget.objects.get(
Expand Down Expand Up @@ -114,15 +141,22 @@ def add_node(self, name, vm_type=None, zone=None):
}
}
}

params['config_app']['config_cloudlaunch']['vmType'] = \
vm_type or self.cluster.default_vm_type
self._find_matching_vm_type(
zone_model=zone, default_vm_type=vm_type, min_vcpus=min_vcpus,
min_ram=min_ram, vm_family=vm_family)

print("Adding node: {0} of type: {1}".format(
name, params['config_app']['config_cloudlaunch']['vmType']))

# Don't use hostname config
params['config_app']['config_cloudlaunch'].pop('hostnameConfig', None)
try:
print("Launching node with settings: {0}".format(params))
return self.context.cloudlaunch_client.deployments.create(**params)
except Exception as e:
raise ValidationError(str(e))
raise ValidationError("Could not launch node: " + str(e))

def remove_node(self, node):
return self.context.cloudlaunch_client.deployments.tasks.create(
Expand Down
11 changes: 6 additions & 5 deletions cloudman/clusterman/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_cluster_template(self):
def _get_default_scaler(self):
return self.autoscalers.get_or_create_default()

def scaleup(self, zone_name=None):
def scaleup(self, zone_name=None, min_vcpus=None, min_ram=None):
if zone_name:
zone = cb_models.Zone.objects.get(name=zone_name)
else:
Expand All @@ -75,10 +75,10 @@ def scaleup(self, zone_name=None):
for scaler in self.autoscalers.list():
if scaler.match(zone=zone):
matched = True
scaler.scaleup()
scaler.scaleup(min_vcpus=min_vcpus, min_ram=min_ram)
if not matched:
scaler = self._get_default_scaler()
scaler.scaleup()
scaler.scaleup(min_vcpus=min_vcpus, min_ram=min_ram)
else:
log.debug("Autoscale up signal received but autoscaling is disabled.")

Expand Down Expand Up @@ -174,11 +174,12 @@ def match(self, zone=None):
# matches a scaling signal.
return zone == self.db_model.zone

def scaleup(self):
def scaleup(self, min_vcpus=0, min_ram=0):
node_count = self.db_model.nodegroup.count()
if node_count < self.max_nodes:
self.cluster.nodes.create(
vm_type=self.vm_type, zone=self.zone, autoscaler=self)
vm_type=self.vm_type, min_vcpus=min_vcpus, min_ram=min_ram,
zone=self.zone, autoscaler=self)

def scaledown(self):
node_count = self.db_model.nodegroup.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ app_config:
rootStorageType: instance
staticIP: null
subnet: null
vmType: m2.large
vmType: m5.24xlarge
config_cloudman2:
clusterPassword: 123456
cm_boot_image: cloudve/cloudman-boot
Expand Down
96 changes: 94 additions & 2 deletions cloudman/clusterman/tests/test_cluster_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def test_crud_cluster(self):
# Assert that the originally created cluster id is the same as the one
# returned by list
self.assertEquals(response.data['id'], cluster_id)
self.assertEquals(response.data['default_vm_type'], 'm2.large')
self.assertEquals(response.data['default_vm_type'], 'm5.24xlarge')
self.assertEquals(response.data['default_zone']['name'], 'us-east-1b')

# check details
Expand Down Expand Up @@ -185,6 +185,7 @@ class LiveServerSingleThread(LiveServerThread):
"""Runs a single threaded server rather than multi threaded. Reverts https://github.com/django/django/pull/7832"""

def _create_server(self):
QuietWSGIRequestHandler.handle = QuietWSGIRequestHandler.handle_one_request
return WSGIServer((self.host, self.port), QuietWSGIRequestHandler, allow_reuse_address=False)


Expand Down Expand Up @@ -555,6 +556,55 @@ class CMClusterScaleSignalTests(CMClusterNodeTestBase):
"groupKey": "{}/{}:{alertname=\"KubeCPUOvercommit\"}"
}

SCALE_SIGNAL_DATA_POD_UNSCHEDULABLE = {
"receiver": "cloudman",
"status": "firing",
"alerts": [
{
"status": "firing",
"labels": {
"alertname": "PodNotSchedulable",
"container": "job-container",
"cpus": "96",
"instance": "10.42.0.19:8080",
"job": "kube-state-metrics",
"memory": "768000000000",
"pod": "galaxy-galaxy-1596991139-245-lc4mg",
"severity": "warning"
},
"annotations": {
"cpus": "96",
"memory": "768000000000",
"message": "Cluster has unschedulable pods due to insufficient CPU or memory"
},
"startsAt": "2020-08-21T11:47:31.470370261Z",
"endsAt": "0001-01-01T00:00:00Z",
"generatorURL": "http://prometheus.int/graph?g0.expr=up%7Bjob%3D%22node-exporter%22%2Ctier%21%3D%22ephemeral%22%7D+%3D%3D+0&g0.tab=1"
}
],
"groupLabels": {
"alertname": "PodNotSchedulable"
},
"commonLabels": {
"alertname": "PodNotSchedulable",
"container": "job-container",
"cpus": "96",
"instance": "10.42.0.19:8080",
"job": "kube-state-metrics",
"memory": "768000000000",
"pod": "galaxy-galaxy-1596991139-245-lc4mg",
"severity": "warning"
},
"commonAnnotations": {
"cpus": "96",
"memory": "768000000000",
"message": "Cluster has unschedulable pods due to insufficient CPU or memory"
},
"externalURL": "http://cloudman-prometheus-alertmanager.cloudman:9093",
"version": "4",
"groupKey": "{}/{alertname=\"PodNotSchedulable\"}:{alertname=\"PodNotSchedulable\"}"
}

fixtures = ['initial_test_data.json']

def _create_cluster_raw(self):
Expand Down Expand Up @@ -591,6 +641,13 @@ def _count_cluster_nodes(self, cluster_id):
self.assertEqual(response.status_code, status.HTTP_200_OK, response.data)
return response.data['count']

def _get_cluster_node_vm_types(self, cluster_id):
url = reverse('clusterman:node-list', args=[cluster_id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK, response.data)
return [n['deployment']['application_config']['config_cloudlaunch']['vmType']
for n in response.data['results']]

def _signal_scaleup(self, cluster_id, data=SCALE_SIGNAL_DATA):
url = reverse('clusterman:scaleupsignal-list', args=[cluster_id])
response = self.client.post(url, data, format='json')
Expand Down Expand Up @@ -628,6 +685,11 @@ def test_scale_up_default(self):
count = self._count_cluster_nodes(cluster_id)
self.assertEqual(count, 1)

# Ensure that the created node has the correct size
vm_types = self._get_cluster_node_vm_types(cluster_id)
self.assertEqual(len(vm_types), 1)
self.assertTrue("m5.24xlarge" in vm_types)

@responses.activate
def test_scale_down_default(self):
# create the parent cluster
Expand Down Expand Up @@ -696,7 +758,7 @@ def test_scaling_is_within_bounds(self):
count = self._count_cluster_nodes(cluster_id)
self.assertEqual(count, 2)

# Make sure nodes to not shrink below mininimum
# Make sure nodes do not shrink below mininimum
self._signal_scaledown(cluster_id)
self._signal_scaledown(cluster_id)
self._signal_scaledown(cluster_id)
Expand All @@ -705,6 +767,11 @@ def test_scaling_is_within_bounds(self):
count = self._count_cluster_nodes(cluster_id)
self.assertEqual(count, 1)

# Ensure that the created node has the correct size
vm_types = self._get_cluster_node_vm_types(cluster_id)
self.assertEqual(len(vm_types), 1)
self.assertTrue("m1.medium" in vm_types)

@responses.activate
def test_scaling_with_manual_nodes(self):
# create the parent cluster
Expand Down Expand Up @@ -794,6 +861,11 @@ def test_scaling_within_zone_group(self):
count = self._count_cluster_nodes(cluster_id)
self.assertEqual(count, 1)

# Ensure that the created node has the correct size
vm_types = self._get_cluster_node_vm_types(cluster_id)
self.assertEqual(len(vm_types), 1)
self.assertTrue("m5.24xlarge")

def _login_as_autoscaling_user(self, impersonate_user=None):
if impersonate_user:
call_command('create_autoscale_user', "--impersonate_account",
Expand Down Expand Up @@ -891,3 +963,23 @@ def test_create_autoscale_user_impersonate_no_perms(self):
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN, response.data)
count = self._count_cluster_nodes(cluster_id)
self.assertEqual(count, 0)

@responses.activate
def test_scale_up_unschedulable(self):
# create the parent cluster
cluster_id = self._create_cluster()

count = self._count_cluster_nodes(cluster_id)
self.assertEqual(count, 0)

# send autoscale signal with unschedulable pod
self._signal_scaleup(cluster_id, data=self.SCALE_SIGNAL_DATA_POD_UNSCHEDULABLE)

# Ensure that node was created
count = self._count_cluster_nodes(cluster_id)
self.assertEqual(count, 1)

# Ensure that the created node has the correct size
vm_types = self._get_cluster_node_vm_types(cluster_id)
self.assertEqual(len(vm_types), 1)
self.assertTrue("r5.24xlarge" in vm_types or "r5d.24xlarge" in vm_types)
7 changes: 6 additions & 1 deletion cloudman/clusterman/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,18 @@ def perform_create(self, serializer):
# whose profile contains the relevant cloud credentials, usually an admin
zone_name = serializer.validated_data.get(
'commonLabels', {}).get('availability_zone')
vcpus = int(serializer.validated_data.get(
'commonAnnotations', {}).get('cpus', 0))
ram = int(serializer.validated_data.get(
'commonAnnotations', {}).get('memory', 0)) / 1024 / 1024 / 1024
impersonate = (User.objects.filter(
username=GlobalSettings().settings.autoscale_impersonate).first()
or User.objects.filter(is_superuser=True).first())
cmapi = CloudManAPI(CMServiceContext(user=impersonate))
cluster = cmapi.clusters.get(self.kwargs["cluster_pk"])
if cluster:
return cluster.scaleup(zone_name=zone_name)
return cluster.scaleup(zone_name=zone_name, min_vcpus=vcpus,
min_ram=ram)
else:
return None

Expand Down

0 comments on commit 5fed32f

Please sign in to comment.