Skip to content
Permalink
Browse files

feat(scheduler) prepend [namespace] to Scheduler log message for bett…

…er traceability
  • Loading branch information...
helgi committed Jul 14, 2016
1 parent 2bb7724 commit edb0383a6446d048d2a9b14512534caccd74d2d1
Showing with 40 additions and 30 deletions.
  1. +40 −30 rootfs/scheduler/__init__.py
@@ -97,6 +97,16 @@ def __init__(self):
self.url = settings.SCHEDULER_URL
self.session = get_session()

def log(self, namespace, message, level=logging.INFO):
"""Logs a message in the context of this application.
This prefixes log messages with a namespace "tag".
When it's seen, the message-- usually an application event of some
sort like releasing or scaling, will be considered as "belonging" to the application
instead of the controller and will be handled accordingly.
"""
logger.log(level, "[{}]: {}".format(namespace, message))

def deploy(self, namespace, name, image, command, **kwargs): # noqa
"""Scale RC or Deployment depending on what's requested"""
self.deploy_timeout = kwargs.get('deploy_timeout', 120)
@@ -124,7 +134,7 @@ def deploy_deployment(self, namespace, name, image, command, **kwargs): # noqa
}
# this depends on the deployment object having the latest information
if deployment['spec']['template']['metadata']['labels'] == labels:
logger.info('Deployment {} with release {} already exists under Namespace {}. Stopping deploy'.format(name, version, namespace)) # noqa
self.log(namespace, 'Deployment {} with release {} already exists. Stopping deploy'.format(name, version)) # noqa
return
except KubeException:
# create the initial deployment object (and the first revision)
@@ -160,7 +170,7 @@ def deploy_rc(self, namespace, name, image, command, **kwargs): # noqa
# If an RC already exists then stop processing of the deploy
try:
self.get_rc(namespace, name)
logger.info('RC {} already exists under Namespace {}. Stopping deploy'.format(name, namespace)) # noqa
self.log(namespace, 'RC {} already exists. Stopping deploy'.format(name)) # noqa
return
except KubeHTTPException:
# make replicas 0 so scaling handles the work
@@ -173,7 +183,7 @@ def deploy_rc(self, namespace, name, image, command, **kwargs): # noqa
desired = int(old_rc["spec"]["replicas"])
else:
desired = kwargs['replicas']
logger.info('No prior RC could be found for {}-{}'.format(namespace, app_type))
self.log(namespace, 'No prior RC could be found for {}'.format(app_type))

# see if application or global deploy batches are defined
batches = kwargs.get('deploy_batches', None)
@@ -186,14 +196,14 @@ def deploy_rc(self, namespace, name, image, command, **kwargs): # noqa
new_name = new_rc["metadata"]["name"]
for batch in batches:
count += batch
logger.info('scaling release {} to {} out of final {}'.format(
self.log(namespace, 'scaling release {} to {} out of final {}'.format(
new_name, count, desired
))
self._scale_rc(namespace, new_name, count)

if old_rc:
old_name = old_rc["metadata"]["name"]
logger.info('scaling old release {} from original {} to {}'.format(
self.log(namespace, 'scaling old release {} from original {} to {}'.format(
old_name, desired, (desired-count))
)
self._scale_rc(namespace, old_name, (desired-count))
@@ -435,7 +445,7 @@ def _build_pod_manifest(self, namespace, name, image, **kwargs):

def run(self, namespace, name, image, entrypoint, command, **kwargs):
"""Run a one-off command."""
logger.info('run {}, img {}, entrypoint {}, cmd "{}"'.format(
self.log(namespace, 'run {}, img {}, entrypoint {}, cmd "{}"'.format(
name, image, entrypoint, command)
)

@@ -840,7 +850,7 @@ def _wait_until_pods_terminate(self, namespace, labels, current, desired):

timeout = settings.KUBERNETES_POD_TERMINATION_GRACE_PERIOD_SECONDS
delta = current - desired
logger.info("waiting for {} pods in {} namespace to be terminated ({}s timeout)".format(delta, namespace, timeout)) # noqa
self.log(namespace, "waiting for {} pods to be terminated ({}s timeout)".format(delta, timeout)) # noqa
for waited in range(timeout):
pods = self.get_pods(namespace, labels=labels).json()
count = len(pods['items'])
@@ -859,11 +869,11 @@ def _wait_until_pods_terminate(self, namespace, labels, current, desired):
break

if waited > 0 and (waited % 10) == 0:
logger.info("waited {}s and {} pods out of {} are fully terminated".format(waited, (delta - count), delta)) # noqa
self.log(namespace, "waited {}s and {} pods out of {} are fully terminated".format(waited, (delta - count), delta)) # noqa

time.sleep(1)

logger.info("{} pods in namespace {} are terminated".format(delta, namespace))
self.log(namespace, "{} pods are terminated".format(delta))

def _wait_until_pods_are_ready(self, namespace, containers, labels, desired): # noqa
# If desired is 0 then there is no ready state to check on
@@ -881,10 +891,10 @@ def _wait_until_pods_are_ready(self, namespace, containers, labels, desired): #
# this is to account for kubernetes having readiness check report as failure until
# the initial delay period is up
delay = int(container['readinessProbe'].get('initialDelaySeconds', 50))
logger.info("adding {}s on to the original {}s timeout to account for the initial delay specified in the readiness probe".format(delay, timeout)) # noqa
self.log(namespace, "adding {}s on to the original {}s timeout to account for the initial delay specified in the readiness probe".format(delay, timeout)) # noqa
timeout += delay

logger.info("waiting for {} pods in {} namespace to be in services ({}s timeout)".format(desired, namespace, timeout)) # noqa
self.log(namespace, "waiting for {} pods to be in services ({}s timeout)".format(desired, timeout)) # noqa

# Ensure the minimum desired number of pods are available
waited = 0
@@ -916,30 +926,30 @@ def _wait_until_pods_are_ready(self, namespace, containers, labels, desired): #
break

if waited > 0 and (waited % 10) == 0:
logger.info("waited {}s and {} pods are in service".format(waited, count))
self.log(namespace, "waited {}s and {} pods are in service".format(waited, count))

# increase wait time without dealing with jitters from above code
waited += 1
time.sleep(1)

# timed out
if waited > timeout:
logger.info('timed out ({}s) waiting for pods to come up in namespace {}'.format(timeout, namespace)) # noqa
self.log(namespace, 'timed out ({}s) waiting for pods to come up in namespace {}'.format(timeout, namespace)) # noqa

logger.info("{} out of {} pods in namespace {} are in service".format(count, desired, namespace)) # noqa
self.log(namespace, "{} out of {} pods are in service".format(count, desired)) # noqa

def _scale_rc(self, namespace, name, desired):
rc = self.get_rc(namespace, name).json()

current = int(rc['spec']['replicas'])
if desired == current:
logger.info("Not scaling RC {} in Namespace {} to {} replicas. Already at desired replicas".format(name, namespace, desired)) # noqa
self.log(namespace, "Not scaling RC {} to {} replicas. Already at desired replicas".format(name, desired)) # noqa
return
elif desired != rc['spec']['replicas']: # RC needs new replica count
# Set the new desired replica count
rc['spec']['replicas'] = desired

logger.info("scaling RC {} in Namespace {} from {} to {} replicas".format(name, namespace, current, desired)) # noqa
self.log(namespace, "scaling RC {} from {} to {} replicas".format(name, current, desired)) # noqa

self.update_rc(namespace, name, rc)
self._wait_until_rc_is_updated(namespace, name)
@@ -994,7 +1004,7 @@ def create_rc(self, namespace, name, image, command, **kwargs):
resp,
'create ReplicationController "{}" in Namespace "{}"', name, namespace
)
logger.debug('manifest used: {}'.format(ruamel.yaml.dump(manifest)))
self.log(namespace, 'manifest used: {}'.format(ruamel.yaml.dump(manifest)), logging.DEBUG) # noqa

self._wait_until_rc_is_updated(namespace, name)

@@ -1008,15 +1018,15 @@ def _wait_until_rc_is_updated(self, namespace, name):
More information is also available at:
https://github.com/kubernetes/kubernetes/blob/master/docs/devel/api-conventions.md#metadata
"""
logger.debug("waiting for ReplicationController {} to get a newer generation (30s timeout)".format(name)) # noqa
self.log(namespace, "waiting for ReplicationController {} to get a newer generation (30s timeout)".format(name), logging.DEBUG) # noqa
for _ in range(30):
try:
rc = self.get_rc(namespace, name).json()
if (
"observedGeneration" in rc["status"] and
rc["status"]["observedGeneration"] >= rc["metadata"]["generation"]
):
logger.debug("ReplicationController {} got a newer generation (30s timeout)".format(name)) # noqa
self.log(namespace, "ReplicationController {} got a newer generation (30s timeout)".format(name), logging.DEBUG) # noqa
break

time.sleep(1)
@@ -1480,8 +1490,8 @@ def _handle_pod_long_image_pulling(self, reason, pod):
seconds = 60 # time threshold before padding timeout
if (start + timedelta(seconds=seconds)) < datetime.utcnow():
# add 10 minutes to timeout to allow a pull image operation to finish
logger.info('Kubernetes has been pulling the image for {} seconds'.format(seconds)) # noqa
logger.info('Increasing timeout by 10 minutes to allow a pull image operation to finish for pods in namespace {}'.format(namespace)) # noqa
self.log(namespace, 'Kubernetes has been pulling the image for {} seconds'.format(seconds)) # noqa
self.log(namespace, 'Increasing timeout by 10 minutes to allow a pull image operation to finish for pods') # noqa

# make it so function doesn't do processing again
setattr(self, '_handle_pod_long_image_pulling_applied', True)
@@ -1537,15 +1547,15 @@ def _wait_until_deployment_is_updated(self, namespace, name):
More information is also available at:
https://github.com/kubernetes/kubernetes/blob/master/docs/devel/api-conventions.md#metadata
"""
logger.debug("waiting for Deployment {} to get a newer generation (30s timeout)".format(name)) # noqa
self.log(namespace, "waiting for Deployment {} to get a newer generation (30s timeout)".format(name), logging.DEBUG) # noqa
for _ in range(30):
try:
deploy = self.get_deployment(namespace, name).json()
if (
'observedGeneration' in deploy['status'] and
deploy['status']['observedGeneration'] >= deploy['metadata']['generation']
):
logger.debug("A newer generation was found for Deployment {}".format(name))
self.log(namespace, "A newer generation was found for Deployment {}".format(name), logging.DEBUG) # noqa
break

time.sleep(1)
@@ -1596,8 +1606,8 @@ def update_deployment(self, namespace, name, image, command, **kwargs):
url = self._api("/namespaces/{}/deployments/{}", namespace, name)
response = self.session.put(url, json=manifest)
if unhealthy(response.status_code):
self.log(namespace, 'template used: {}'.format(json.dumps(manifest, indent=4)), logging.DEBUG) # noqa
raise KubeHTTPException(response, 'update Deployment "{}"', name)
logger.debug('template used: {}'.format(json.dumps(manifest, indent=4)))

self._wait_until_deployment_is_updated(namespace, name)
self._wait_until_deployment_is_ready(namespace, name, **kwargs)
@@ -1614,7 +1624,7 @@ def create_deployment(self, namespace, name, image, command, **kwargs):
response,
'create Deployment "{}" in Namespace "{}"', name, namespace
)
logger.debug('template used: {}'.format(json.dumps(manifest, indent=4)))
self.log(namespace, 'template used: {}'.format(json.dumps(manifest, indent=4)), logging.DEBUG) # noqa

self._wait_until_deployment_is_updated(namespace, name)
self._wait_until_deployment_is_ready(namespace, name, **kwargs)
@@ -1652,12 +1662,12 @@ def _wait_until_deployment_is_ready(self, namespace, name, **kwargs):
# this is to account for kubernetes having readiness check report as failure until
# the initial delay period is up
delay = int(container['readinessProbe'].get('initialDelaySeconds', 50))
logger.info("adding {}s on to the original {}s timeout to account for the initial delay specified in the readiness probe".format(delay, deploy_timeout)) # noqa
self.log(namespace, "adding {}s on to the original {}s timeout to account for the initial delay specified in the readiness probe".format(delay, deploy_timeout)) # noqa
deploy_timeout += delay

# a rough calculation that figures out an overall timeout
timeout = len(batches) * deploy_timeout
logger.info('This deployments overall timeout is {}s - batch timout is {}s and there are {} batches to deploy with a total of {} pods'.format(timeout, deploy_timeout, len(batches), replicas)) # noqa
self.log(namespace, 'This deployments overall timeout is {}s - batch timout is {}s and there are {} batches to deploy with a total of {} pods'.format(timeout, deploy_timeout, len(batches), replicas)) # noqa

waited = 0
while waited < timeout:
@@ -1679,7 +1689,7 @@ def _wait_until_deployment_is_ready(self, namespace, name, **kwargs):
# handle errors and bubble up if need be
self._handle_pod_image_errors(pod, reason, message)

logger.info("waited {}s and {} pods are in service".format(waited, availablePods))
self.log(namespace, "waited {}s and {} pods are in service".format(waited, availablePods)) # noqa

waited += 1
time.sleep(1)
@@ -1764,12 +1774,12 @@ def _scale_deployment(self, namespace, name, image, command, **kwargs):
desired = int(kwargs.get('replicas'))
current = int(deployment['spec']['replicas'])
if desired == current:
logger.info("Not scaling Deployment {} in Namespace {} to {} replicas. Already at desired replicas".format(name, namespace, desired)) # noqa
self.log(namespace, "Not scaling Deployment {} to {} replicas. Already at desired replicas".format(name, desired)) # noqa
return
elif desired != current:
# set the previous replicas count so the wait logic can deal with terminating pods
kwargs['previous_replicas'] = current
logger.info("scaling Deployment {} in Namespace {} from {} to {} replicas".format(name, namespace, current, desired)) # noqa
self.log(namespace, "scaling Deployment {} from {} to {} replicas".format(name, current, desired)) # noqa
self.update_deployment(namespace, name, image, command, **kwargs)

def get_replicaset(self, namespace, name):

0 comments on commit edb0383

Please sign in to comment.
You can’t perform that action at this time.