Skip to content
This repository has been archived by the owner on Oct 4, 2023. It is now read-only.

Commit

Permalink
merge from development
Browse files Browse the repository at this point in the history
  • Loading branch information
nhammond committed Jul 14, 2017
2 parents 579dee6 + d265446 commit 7d7ad14
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 37 deletions.
7 changes: 6 additions & 1 deletion loomengine/client/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def get_parser(cls, parser=None):
help='ID of data inputs')
parser.add_argument('-n', '--name', metavar='RUN_NAME',
help='run name (default is template name)')
parser.add_argument('--notify', '-e', action='append',
metavar='EMAIL/URL',
help='Recipients of completed run notifications. '\
'Repeat flag for multiple emails or urls')
return parser

@classmethod
Expand All @@ -57,7 +61,8 @@ def _validate_args(cls, args):
def run(self):
run_data = {
'template': self.args.template,
'user_inputs': self._get_inputs()}
'user_inputs': self._get_inputs(),
'notification_addresses': self.args.notify,}
if self.args.name:
run_data['name'] = self.args.name
try:
Expand Down
12 changes: 12 additions & 0 deletions loomengine/client/settings/gcloud.conf
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,15 @@ LOOM_FLOWER_INTERNAL_PORT: 5555
LOOM_FLOWER_CONTAINER_NAME_SUFFIX: -flower

LOOM_SSH_PRIVATE_KEY_NAME: loom_id_rsa

LOOM_EMAIL_HOST:
LOOM_EMAIL_PORT:
LOOM_EMAIL_HOST_USER:
LOOM_EMAIL_HOST_PASSWORD:
LOOM_EMAIL_USE_TLS:
LOOM_EMAIL_USE_SSL:
LOOM_EMAIL_TIMEOUT:
LOOM_EMAIL_SSL_KEYFILE:
LOOM_EMAIL_SSL_CERTFILE:
LOOM_DEFAULT_FROM_EMAIL:
LOOM_NOTIFICATION_ADDRESSES: []
12 changes: 12 additions & 0 deletions loomengine/client/settings/local.conf
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,15 @@ WORKER_TYPE: LOCAL

LOOM_FLOWER_INTERNAL_PORT: 5555
LOOM_FLOWER_CONTAINER_NAME_SUFFIX: -flower

LOOM_EMAIL_HOST:
LOOM_EMAIL_PORT:
LOOM_EMAIL_HOST_USER:
LOOM_EMAIL_HOST_PASSWORD:
LOOM_EMAIL_USE_TLS:
LOOM_EMAIL_USE_SSL:
LOOM_EMAIL_TIMEOUT:
LOOM_EMAIL_SSL_KEYFILE:
LOOM_EMAIL_SSL_CERTFILE:
LOOM_DEFAULT_FROM_EMAIL:
LOOM_NOTIFICATION_ADDRESSES: []
19 changes: 12 additions & 7 deletions loomengine/master/api/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
from django.utils import timezone
import logging
from api import get_setting
import kombu.exceptions
import os
import subprocess
import sys
import threading
import time

Expand Down Expand Up @@ -196,14 +194,12 @@ def _run_cleanup_task_playbook(task_attempt):
return subprocess.Popen(cmd_list, env=env, stderr=subprocess.STDOUT)

@shared_task
def _finish_task_attempt(task_attempt_uuid):
def _finish_task_attempt(task_attempt_uuid, notification_context):
from api.models.tasks import TaskAttempt
task_attempt = TaskAttempt.objects.get(uuid=task_attempt_uuid)
task_attempt.finish()
task_attempt.finish(notification_context)

def finish_task_attempt(task_attempt_uuid):
args = [task_attempt_uuid]
kwargs = {}
def finish_task_attempt(*args, **kwargs):
return _run_with_delay(_finish_task_attempt, args, kwargs)

@shared_task
Expand All @@ -221,3 +217,12 @@ def _kill_task_attempt(task_attempt_uuid, kill_message):

def kill_task_attempt(*args, **kwargs):
return _run_with_delay(_kill_task_attempt, args, kwargs)

@shared_task
def _send_run_notifications(run_uuid, notification_context):
from api.models.runs import Run
run = Run.objects.get(uuid=run_uuid)
run.send_notifications(notification_context)

def send_run_notifications(*args, **kwargs):
return _run_with_delay(_send_run_notifications, args, kwargs)
3 changes: 2 additions & 1 deletion loomengine/master/api/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-07-13 18:16
# Generated by Django 1.11 on 2017-07-14 01:50
from __future__ import unicode_literals

import api.models
Expand Down Expand Up @@ -85,6 +85,7 @@ class Migration(migrations.Migration):
('datetime_finished', models.DateTimeField(blank=True, null=True)),
('environment', jsonfield.fields.JSONField(blank=True, validators=[api.models.validators.validate_environment])),
('resources', jsonfield.fields.JSONField(blank=True, validators=[api.models.validators.validate_resources])),
('notification_addresses', jsonfield.fields.JSONField(blank=True, validators=[api.models.validators.validate_notification_addresses])),
('postprocessing_status', models.CharField(choices=[(b'not_started', b'Not Started'), (b'in_progress', b'In Progress'), (b'complete', b'Complete'), (b'failed', b'Failed')], default=b'not_started', max_length=255)),
('status_is_finished', models.BooleanField(default=False)),
('status_is_failed', models.BooleanField(default=False)),
Expand Down
101 changes: 88 additions & 13 deletions loomengine/master/api/models/runs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from django.core import mail
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned, \
ValidationError
from django.db import models
from django.dispatch import receiver
from django.template.loader import render_to_string
from django.utils import timezone
from mptt.models import MPTTModel, TreeForeignKey
import jsonfield
import requests

from .base import BaseModel
from api import get_setting
Expand Down Expand Up @@ -57,6 +60,8 @@ class Run(MPTTModel, BaseModel):
resources = jsonfield.JSONField(
blank=True,
validators=[validators.validate_resources])
notification_addresses = jsonfield.JSONField(
blank=True, validators=[validators.validate_notification_addresses])
parent = TreeForeignKey('self', null=True, blank=True,
related_name='steps', db_index=True,
on_delete=models.CASCADE)
Expand Down Expand Up @@ -123,7 +128,7 @@ def is_topmost_run(self):
return False
return True

def finish(self):
def finish(self, notification_context):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries(
Expand All @@ -132,13 +137,17 @@ def finish(self):
'status_is_finished': True})
if self.parent:
if self.parent._are_children_finished():
self.parent.finish()
self.parent.finish(notification_context)
else:
# Send notifications only if topmost run
async.send_run_notifications(self.uuid, notification_context)

def _are_children_finished(self):
return all([step.status_is_finished for step in self.steps.all()])

@classmethod
def create_from_template(cls, template, name=None, parent=None):
def create_from_template(cls, template, name=None,
notification_addresses=[], parent=None):
if name is None:
name = template.name
if template.is_leaf:
Expand All @@ -148,12 +157,19 @@ def create_from_template(cls, template, name=None, parent=None):
name=name,
command=template.command,
interpreter=template.interpreter,
environment=template.environment,
resources=template.resources,
notification_addresses=notification_addresses,
parent=parent)
else:
run = Run.objects.create(template=template,
is_leaf=template.is_leaf,
name=name,
parent=parent)
run = Run.objects.create(
template=template,
is_leaf=template.is_leaf,
name=name,
environment=template.environment,
resources=template.resources,
notification_addresses=notification_addresses,
parent=parent)
return run

def _connect_input_to_parent(self, input):
Expand Down Expand Up @@ -231,8 +247,8 @@ def has_terminal_status(self):
return self.status_is_finished \
or self.status_is_failed \
or self.status_is_killed
def fail(self, detail=''):

def fail(self, notification_context, detail=''):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries({
Expand All @@ -241,9 +257,14 @@ def fail(self, detail=''):
'status_is_waiting': False})
self.add_event("Run failed", detail=detail, is_error=True)
if self.parent:
self.parent.fail(detail='Failure in step %s@%s' % (self.name, self.uuid))
self.parent.fail(notification_context,
detail='Failure in step %s@%s' % (
self.name, self.uuid))
else:
# Send kill signal to children
self._kill_children(detail='Automatically killed due to failure')
# Send notifications only if topmost run
async.send_run_notifications(self.uuid, notification_context)

def kill(self, detail=''):
if self.has_terminal_status():
Expand All @@ -261,6 +282,63 @@ def _kill_children(self, detail=''):
for task in self.tasks.all():
task.kill(detail=detail)

def send_notifications(self, notification_context):
notification_context.update({
'run_url': '%s/#/runs/%s/' % (notification_context['server_url'],
self.uuid),
'run_api_url': '%s/api/runs/%s/' % (notification_context['server_url'],
self.uuid),
'run_status': self.status,
'run_name_and_id': '%s@%s' % (self.name, self.uuid[0:8])
})
notification_addresses = []
if self.notification_addresses:
notification_addresses = self.notification_addresses
if get_setting('LOOM_NOTIFICATION_ADDRESSES'):
notification_addresses = notification_addresses\
+ get_setting('LOOM_NOTIFICATION_ADDRESSES')
email_addresses = filter(lambda x: '@' in x, notification_addresses)
urls = filter(lambda x: '@' not in x, notification_addresses)
self._send_email_notifications(email_addresses, notification_context)
self._send_http_notifications(urls, notification_context)

def _send_email_notifications(self, email_addresses, notification_context):
if not email_addresses:
return
text_content = render_to_string('email/notify_run_completed.txt',
notification_context)
html_content = render_to_string('email/notify_run_completed.html',
notification_context)
connection = mail.get_connection()
connection.open()
email = mail.EmailMultiAlternatives(
'Loom run %s@%s is %s' % (self.name, self.uuid[0:8], self.status.lower()),
text_content,
get_setting('DEFAULT_FROM_EMAIL'),
email_addresses,
)
email.attach_alternative(html_content, "text/html")
email.send()
connection.close()

def _send_http_notifications(self, urls, notification_context):
if not urls:
return
data = {
'message': 'Loom run %s is %s' % (
notification_context['run_name_and_id'],
notification_context['run_status']),
'run_uuid': self.uuid,
'run_name': self.name,
'run_status': self.status,
'run_url': notification_context['run_url'],
'run_api_url': notification_context['run_api_url'],
'server_name': notification_context['server_name'],
'server_url': notification_context['server_url'],
}
for url in urls:
requests.post(url, data = data)

def set_running_status(self):
if self.status_is_running and not self.status_is_waiting:
return
Expand Down Expand Up @@ -327,9 +405,6 @@ def postprocess(cls, run_uuid):
raise

def initialize(self):
self.setattrs_and_save_with_retries({
'resources': self.template.resources,
'environment': self.template.environment})
self.connect_inputs_to_template_data()
self.create_steps()
async.postprocess_run(self.uuid)
Expand Down
7 changes: 4 additions & 3 deletions loomengine/master/api/models/task_attempts.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def heartbeat(self):
def get_output(self, channel):
return self.outputs.get(channel=channel)

def fail(self, detail=''):
def fail(self, notification_context, detail=''):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries(
Expand All @@ -65,6 +65,7 @@ def fail(self, detail=''):
self.add_event("TaskAttempt failed", detail=detail, is_error=True)
try:
self.active_task.fail(
notification_context,
detail="Child TaskAttempt %s failed" % self.uuid)
except ObjectDoesNotExist:
# This attempt is no longer active
Expand All @@ -76,7 +77,7 @@ def has_terminal_status(self):
or self.status_is_failed \
or self.status_is_killed

def finish(self):
def finish(self, notification_context):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries({
Expand All @@ -89,7 +90,7 @@ def finish(self):
# This attempt is no longer active
# and will be ignored.
return
task.finish()
task.finish(notification_context)

def add_event(self, event, detail='', is_error=False):
event = TaskAttemptEvent.objects.create(
Expand Down
8 changes: 4 additions & 4 deletions loomengine/master/api/models/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def is_unresponsive(self):
# has passed, we have probably missed 2 heartbeats
return (timezone.now() - last_heartbeat).total_seconds() > timeout

def fail(self, detail=''):
def fail(self, notification_context, detail=''):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries(
Expand All @@ -90,22 +90,22 @@ def fail(self, detail=''):
'status_is_waiting': False})
self.add_event("Task failed", detail=detail, is_error=True)
self._kill_children(detail=detail)
self.run.fail(detail='Task %s failed' % self.uuid)
self.run.fail(notification_context, detail='Task %s failed' % self.uuid)

def has_terminal_status(self):
return self.status_is_finished \
or self.status_is_failed \
or self.status_is_killed

def finish(self):
def finish(self, notification_context):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries(
{ 'datetime_finished': timezone.now(),
'status_is_finished': True,
'status_is_running': False,
'status_is_waiting': False})
self.run.finish()
self.run.finish(notification_context)
for output in self.outputs.all():
output.push_data(self.data_path)
for task_attempt in self.all_task_attempts.all():
Expand Down
11 changes: 11 additions & 0 deletions loomengine/master/api/models/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ def validate_resources(value):
raise ValidationError(
'Invalid value for "memory: "%s". Expected an integer (in GB).')

def validate_notification_addresses(value):
# value should be a list of notification targets,
# either email addresses or http/https URLs.
for target in value:
match = re.match(r'(^\S+@\S+$|^https?://|^HTTPS?://)', target)
if match is None:
raise ValidationError(
'Invalid notification target, must be an email address '\
'or an http/https URL: "%s"' % target)


class OutputParserValidator(object):

@classmethod
Expand Down
Loading

0 comments on commit 7d7ad14

Please sign in to comment.