Skip to content
Permalink
Browse files

ref(scheduler): split up the scheduler code into individual resources…

… and to be modular (#1016)

This adds a resources package which contains all Kubernetes resources as their own individual classes.
Each resource is self registering via the ResourceRegistry metaclass, this allows the base Resource class to know what resources are available and expose that up into KubeHTTPClient

Each resource can specify their own api prefix and version, which allows us to support resources moving between API endpoints (see HPA, a new resource).
To make it easy for resource to interact with one another each resource and the KubeHTTPClient can access other resources via `self.hpa.create()` or `self.pod.get()`, the short form (if a resource supports that) works, singular and plural also work via some intelligent mapping

`version()` was added to let Resources find out what Kubernetes version they are dealing with.

Closes #875
  • Loading branch information...
helgi committed Aug 26, 2016
1 parent 899e008 commit 7306202c34d67218c9080f8545de660da32723ff
Showing with 2,320 additions and 1,786 deletions.
  1. +3 −3 rootfs/api/models/__init__.py
  2. +22 −22 rootfs/api/models/app.py
  3. +5 −5 rootfs/api/models/certificate.py
  4. +1 −1 rootfs/api/models/config.py
  5. +11 −11 rootfs/api/models/release.py
  6. +4 −4 rootfs/api/tests/test_app.py
  7. +4 −6 rootfs/api/tests/test_app_settings.py
  8. +4 −4 rootfs/api/tests/test_domain.py
  9. +2 −2 rootfs/api/tests/test_pods.py
  10. +146 −1,598 rootfs/scheduler/__init__.py
  11. +16 −0 rootfs/scheduler/exceptions.py
  12. +111 −29 rootfs/scheduler/mock.py
  13. +10 −0 rootfs/scheduler/resources/__init__.py
  14. +34 −0 rootfs/scheduler/resources/__resource.py
  15. +344 −0 rootfs/scheduler/resources/deployment.py
  16. +154 −0 rootfs/scheduler/resources/horizontalpodautoscaler.py
  17. +62 −0 rootfs/scheduler/resources/namespace.py
  18. +27 −0 rootfs/scheduler/resources/node.py
  19. +709 −0 rootfs/scheduler/resources/pod.py
  20. +29 −0 rootfs/scheduler/resources/replicaset.py
  21. +135 −0 rootfs/scheduler/resources/replicationcontroller.py
  22. +115 −0 rootfs/scheduler/resources/secret.py
  23. +88 −0 rootfs/scheduler/resources/service.py
  24. +3 −2 rootfs/scheduler/tests/__init__.py
  25. +20 −23 rootfs/scheduler/tests/test_deployments.py
  26. +203 −0 rootfs/scheduler/tests/test_horizontalpodautoscaler.py
  27. +5 −5 rootfs/scheduler/tests/test_namespaces.py
  28. +5 −14 rootfs/scheduler/tests/test_nodes.py
  29. +10 −10 rootfs/scheduler/tests/test_replicationcontrollers.py
  30. +13 −22 rootfs/scheduler/tests/test_scheduler.py
  31. +13 −13 rootfs/scheduler/tests/test_secrets.py
  32. +12 −12 rootfs/scheduler/tests/test_services.py
@@ -47,12 +47,12 @@ class Meta:
@property
def _scheduler(self):
mod = importlib.import_module(settings.SCHEDULER_MODULE)
return mod.SchedulerClient()
return mod.SchedulerClient(settings.SCHEDULER_URL)

def _fetch_service_config(self, app):
try:
# Get the service from k8s to attach the domain correctly
svc = self._scheduler.get_service(app, app).json()
svc = self._scheduler.svc.get(app, app).json()
except KubeException as e:
raise ServiceUnavailable('Could not fetch Kubernetes Service {}'.format(app)) from e

@@ -93,7 +93,7 @@ def _save_service_config(self, app, component, data):

# Update the k8s service for the application with new service information
try:
self._scheduler.update_service(app, app, svc)
self._scheduler.svc.update(app, app, svc)
except KubeException as e:
raise ServiceUnavailable('Could not update Kubernetes Service {}'.format(app)) from e

@@ -104,7 +104,7 @@ def save(self, *args, **kwargs):
self.release_set.latest()
except Release.DoesNotExist:
try:
if self._scheduler.get_namespace(self.id).status_code == 200:
if self._scheduler.ns.get(self.id).status_code == 200:
# Namespace already exists
err = "{} already exists as a namespace in this kuberenetes setup".format(self.id) # noqa
self.log(err, logging.INFO)
@@ -203,18 +203,18 @@ def create(self, *args, **kwargs): # noqa
self.log('creating Namespace {} and services'.format(namespace), level=logging.DEBUG)
# Create essential resources
try:
self._scheduler.get_namespace(namespace)
self._scheduler.ns.get(namespace)
except KubeException:
self._scheduler.create_namespace(namespace)
self._scheduler.ns.create(namespace)

try:
self._scheduler.get_service(namespace, service)
self._scheduler.svc.get(namespace, service)
except KubeException:
self._scheduler.create_service(namespace, service)
self._scheduler.svc.create(namespace, service)
except KubeException as e:
# Blow it all away only if something horrible happens
try:
self._scheduler.delete_namespace(namespace)
self._scheduler.ns.delete(namespace)
except KubeException as e:
# Just feed into the item below
raise ServiceUnavailable('Could not delete the Namespace in Kubernetes') from e
@@ -234,12 +234,12 @@ def delete(self, *args, **kwargs):
"""Delete this application including all containers"""
self.log("deleting environment")
try:
self._scheduler.delete_namespace(self.id)
self._scheduler.ns.delete(self.id)

# wait 30 seconds for termination
for _ in range(30):
try:
self._scheduler.get_namespace(self.id)
self._scheduler.ns.get(self.id)
except KubeException:
break
except KubeException as e:
@@ -264,7 +264,7 @@ def restart(self, **kwargs): # noqa
desired = 0
labels = self._scheduler_filter(**kwargs)
# fetch RS (which represent Deployments)
controllers = self._scheduler.get_replicasets(kwargs['id'], labels=labels)
controllers = self._scheduler.rs.get(kwargs['id'], labels=labels)

for controller in controllers.json()['items']:
desired += controller['spec']['replicas']
@@ -275,7 +275,7 @@ def restart(self, **kwargs): # noqa
try:
tasks = [
functools.partial(
self._scheduler.delete_pod,
self._scheduler.pod.delete,
self.id,
pod['name']
) for pod in self.list_pods(**kwargs)
@@ -577,7 +577,7 @@ def _check_deployment_in_progress(self, deploys, force_deploy=False):
for scale_type, kwargs in deploys.items():
# Is there an existing deployment in progress?
name = self._get_job_id(scale_type)
in_progress, deploy_okay = self._scheduler.deployment_in_progress(
in_progress, deploy_okay = self._scheduler.deployment.in_progress(
self.id, name, kwargs.get("deploy_timeout"), kwargs.get("deploy_batches"),
kwargs.get("replicas"), kwargs.get("tags")
)
@@ -768,9 +768,9 @@ def list_pods(self, *args, **kwargs):

# in case a singular pod is requested
if 'name' in kwargs:
pods = [self._scheduler.get_pod(self.id, kwargs['name']).json()]
pods = [self._scheduler.pod.get(self.id, kwargs['name']).json()]
else:
pods = self._scheduler.get_pods(self.id, labels=labels).json()['items']
pods = self._scheduler.pod.get(self.id, labels=labels).json()['items']

data = []
for p in pods:
@@ -779,14 +779,14 @@ def list_pods(self, *args, **kwargs):
if labels['type'] == 'run':
continue

state = str(self._scheduler.pod_state(p))
state = str(self._scheduler.pod.state(p))

# follows kubelete convention - these are hidden unless show-all is set
if state in ['down', 'crashed']:
continue

# hide pod if it is passed the graceful termination period
if self._scheduler.pod_deleted(p):
if self._scheduler.pod.deleted(p):
continue

item = Pod()
@@ -862,9 +862,9 @@ def maintenance_mode(self, mode):

try:
service['metadata']['annotations']['router.deis.io/maintenance'] = str(mode).lower()
self._scheduler.update_service(self.id, self.id, data=service)
self._scheduler.svc.update(self.id, self.id, data=service)
except KubeException as e:
self._scheduler.update_service(self.id, self.id, data=old_service)
self._scheduler.svc.update(self.id, self.id, data=old_service)
raise ServiceUnavailable(str(e)) from e

def routable(self, routable):
@@ -876,9 +876,9 @@ def routable(self, routable):

try:
service['metadata']['labels']['router.deis.io/routable'] = str(routable).lower()
self._scheduler.update_service(self.id, self.id, data=service)
self._scheduler.svc.update(self.id, self.id, data=service)
except KubeException as e:
self._scheduler.update_service(self.id, self.id, data=old_service)
self._scheduler.svc.update(self.id, self.id, data=old_service)
raise ServiceUnavailable(str(e)) from e

def _update_application_service(self, namespace, app_type, port, routable=False, annotations={}): # noqa
@@ -907,10 +907,10 @@ def _update_application_service(self, namespace, app_type, port, routable=False,
# port 80 is the only one we care about right now
service['spec']['ports'][pos]['targetPort'] = int(port)

self._scheduler.update_service(namespace, namespace, data=service)
self._scheduler.svc.update(namespace, namespace, data=service)
except Exception as e:
# Fix service to old port and app type
self._scheduler.update_service(namespace, namespace, data=old_service)
self._scheduler.svc.update(namespace, namespace, data=old_service)
raise KubeException(str(e)) from e

def whitelist(self, whitelist):
@@ -922,6 +922,6 @@ def whitelist(self, whitelist):
try:
addresses = ",".join(address for address in whitelist)
service['metadata']['annotations']['router.deis.io/whitelist'] = addresses
self._scheduler.update_service(self.id, self.id, data=service)
self._scheduler.svc.update(self.id, self.id, data=service)
except KubeException as e:
raise ServiceUnavailable(str(e)) from e
@@ -183,14 +183,14 @@ def attach_in_kubernetes(self, domain):
'tls.key': self.key
}

secret = self._scheduler.get_secret(namespace, name).json()['data']
secret = self._scheduler.secret.get(namespace, name).json()['data']
except KubeException:
self._scheduler.create_secret(namespace, name, data)
self._scheduler.secret.create(namespace, name, data)
else:
# update cert secret to the TLS Ingress format if required
if secret != data:
try:
self._scheduler.update_secret(namespace, name, data)
self._scheduler.secret.update(namespace, name, data)
except KubeException as e:
msg = 'There was a problem updating the certificate secret ' \
'{} for {}'.format(name, namespace)
@@ -225,8 +225,8 @@ def detach(self, *args, **kwargs):
if len(self.domains) == 0:
try:
# We raise an exception when a secret doesn't exist
self._scheduler.get_secret(namespace, name)
self._scheduler.delete_secret(namespace, name)
self._scheduler.secret.get(namespace, name)
self._scheduler.secret.delete(namespace, name)
except KubeException as e:
raise ServiceUnavailable("Could not delete certificate secret {} for application {}".format(name, namespace)) from e # noqa

@@ -94,7 +94,7 @@ def set_tags(self, previous_config):
return

# Get all nodes with label selectors
nodes = self._scheduler.get_nodes(labels=self.tags).json()
nodes = self._scheduler.node.get(labels=self.tags).json()
if nodes['items']:
return

@@ -259,7 +259,7 @@ def cleanup_old(self): # noqa
# Cleanup controllers
labels = {'heritage': 'deis'}
controller_removal = []
controllers = self._scheduler.get_rcs(self.app.id, labels=labels).json()
controllers = self._scheduler.rc.get(self.app.id, labels=labels).json()
for controller in controllers['items']:
current_version = controller['metadata']['labels']['version']
# skip the latest release
@@ -286,20 +286,20 @@ def cleanup_old(self): # noqa
'app': self.app.id,
'type': 'env',
}
secrets = self._scheduler.get_secrets(self.app.id, labels=labels).json()
secrets = self._scheduler.secret.get(self.app.id, labels=labels).json()
for secret in secrets['items']:
current_version = secret['metadata']['labels']['version']
# skip the latest release
if current_version == latest_version:
continue

self._scheduler.delete_secret(self.app.id, secret['metadata']['name'])
self._scheduler.secret.delete(self.app.id, secret['metadata']['name'])

# Remove stray pods
labels = {'heritage': 'deis'}
pods = self._scheduler.get_pods(self.app.id, labels=labels).json()
pods = self._scheduler.pod.get(self.app.id, labels=labels).json()
for pod in pods['items']:
if self._scheduler.pod_deleted(pod):
if self._scheduler.pod.deleted(pod):
continue

current_version = pod['metadata']['labels']['version']
@@ -308,7 +308,7 @@ def cleanup_old(self): # noqa
continue

try:
self._scheduler.delete_pod(self.app.id, pod['metadata']['name'])
self._scheduler.pod.delete(self.app.id, pod['metadata']['name'])
except KubeHTTPException as e:
# Sometimes k8s will manage to remove the pod from under us
if e.response.status_code == 404:
@@ -329,7 +329,7 @@ def _cleanup_deployment_secrets_and_configs(self, namespace):
# Find all ReplicaSets
versions = []
labels = {'heritage': 'deis', 'app': namespace}
replicasets = self._scheduler.get_replicasets(namespace, labels=labels).json()
replicasets = self._scheduler.rs.get(namespace, labels=labels).json()
for replicaset in replicasets['items']:
if (
'version' not in replicaset['metadata']['labels'] or
@@ -348,9 +348,9 @@ def _cleanup_deployment_secrets_and_configs(self, namespace):
'version__notin': versions
}
self.app.log('Cleaning up orphaned env var secrets for application {}'.format(namespace), level=logging.DEBUG) # noqa
secrets = self._scheduler.get_secrets(namespace, labels=labels).json()
secrets = self._scheduler.secret.get(namespace, labels=labels).json()
for secret in secrets['items']:
self._scheduler.delete_secret(namespace, secret['metadata']['name'])
self._scheduler.secret.delete(namespace, secret['metadata']['name'])

def _delete_release_in_scheduler(self, namespace, version):
"""
@@ -368,14 +368,14 @@ def _delete_release_in_scheduler(self, namespace, version):
# see if the app config has deploy timeout preference, otherwise use global
deploy_timeout = self.config.values.get('DEIS_DEPLOY_TIMEOUT', settings.DEIS_DEPLOY_TIMEOUT) # noqa

controllers = self._scheduler.get_rcs(namespace, labels=labels).json()
controllers = self._scheduler.rc.get(namespace, labels=labels).json()
for controller in controllers['items']:
self._scheduler.cleanup_release(namespace, controller, deploy_timeout)

# remove secret that contains env vars for the release
try:
secret_name = "{}-{}-env".format(namespace, version)
self._scheduler.delete_secret(namespace, secret_name)
self._scheduler.secret.delete(namespace, secret_name)
except KubeHTTPException:
pass

@@ -373,22 +373,22 @@ def test_app_exists_in_kubernetes(self, mock_requests):

def test_app_create_failure_kubernetes_create(self, mock_requests):
"""
Create an app but have scheduler.create_service fail with an exception
Create an app but have scheduler.svc.create fail with an exception
"""
with mock.patch('scheduler.KubeHTTPClient.create_service') as mock_kube:
with mock.patch('scheduler.resources.service.Service.create') as mock_kube:
mock_kube.side_effect = KubeException('Boom!')
response = self.client.post('/v2/apps')
self.assertEqual(response.status_code, 503, response.data)

def test_app_delete_failure_kubernetes_destroy(self, mock_requests):
"""
Create an app and then delete but have scheduler.delete_namespace
Create an app and then delete but have scheduler.ns.delete
fail with an exception
"""
# create
app_id = self.create_app()

with mock.patch('scheduler.KubeHTTPClient.delete_namespace') as mock_kube:
with mock.patch('scheduler.resources.namespace.Namespace.delete') as mock_kube:
# delete
mock_kube.side_effect = KubeException('Boom!')
response = self.client.delete('/v2/apps/{}'.format(app_id))
@@ -72,10 +72,8 @@ def test_settings_routable(self, mock_requests):
Create an application with the routable flag turned on or off
"""
# create app, expecting routable to be true
body = {'id': 'myid'}
response = self.client.post('/v2/apps', body)
self.assertEqual(response.status_code, 201, response.data)
app = App.objects.get(id='myid')
app_id = self.create_app()
app = App.objects.get(id=app_id)
self.assertTrue(app.appsettings_set.latest().routable)
# Set routable to false
response = self.client.post(
@@ -163,8 +161,8 @@ def test_kubernetes_service_failure(self, mock_requests):
"""
app_id = self.create_app()

# scheduler.update_service exception
with mock.patch('scheduler.KubeHTTPClient.update_service') as mock_kube:
# scheduler.svc.update exception
with mock.patch('scheduler.resources.service.Service.update') as mock_kube:
mock_kube.side_effect = KubeException('Boom!')
addresses = ["2.3.4.5"]
url = '/v2/apps/{}/whitelist'.format(app_id)
@@ -325,16 +325,16 @@ def test_kubernetes_service_failure(self):
"""
app_id = self.create_app()

# scheduler.get_service exception
with mock.patch('scheduler.KubeHTTPClient.get_service') as mock_kube:
# scheduler.svc.get exception
with mock.patch('scheduler.resources.service.Service.get') as mock_kube:
mock_kube.side_effect = KubeException('Boom!')
domain = 'foo.com'
url = '/v2/apps/{}/domains'.format(app_id)
response = self.client.post(url, {'domain': domain})
self.assertEqual(response.status_code, 503, response.data)

# scheduler.update_service exception
with mock.patch('scheduler.KubeHTTPClient.update_service') as mock_kube:
# scheduler.svc.update exception
with mock.patch('scheduler.resources.service.Service.update') as mock_kube:
domain = 'foo.com'
url = '/v2/apps/{}/domains'.format(app_id)
response = self.client.post(url, {'domain': domain})
@@ -792,8 +792,8 @@ def test_list_pods_failure(self, mock_requests):

app_id = self.create_app()

with mock.patch('scheduler.KubeHTTPClient.get_pod') as kube_pod:
with mock.patch('scheduler.KubeHTTPClient.get_pods') as kube_pods:
with mock.patch('scheduler.resources.pod.Pod.get') as kube_pod:
with mock.patch('scheduler.resources.pod.Pod.get') as kube_pods:
kube_pod.side_effect = KubeException('boom!')
kube_pods.side_effect = KubeException('boom!')
url = "/v2/apps/{app_id}/pods".format(**locals())

0 comments on commit 7306202

Please sign in to comment.
You can’t perform that action at this time.