Skip to content
This repository has been archived by the owner on Oct 4, 2023. It is now read-only.

Commit

Permalink
Merge pull request #422 from StanfordBioinformatics/environment_used
Browse files Browse the repository at this point in the history
LOOM_DEFAULT_DOCKER_REGISTRY setting, record container info
  • Loading branch information
nhammond committed Jul 15, 2017
2 parents ad4a6e0 + cb20b46 commit cdc91f7
Show file tree
Hide file tree
Showing 15 changed files with 125 additions and 86 deletions.
6 changes: 5 additions & 1 deletion loomengine/client/playbooks/vars/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ container_settings_home: /loom-settings

server_name: "{{lookup('env', 'LOOM_SERVER_NAME')}}"
log_level: "{{lookup('env', 'LOOM_LOG_LEVEL')|upper}}"
loom_docker_image: "{{lookup('env', 'LOOM_DOCKER_IMAGE')}}"
loom_default_docker_registry: "{{lookup('env', 'LOOM_DEFAULT_DOCKER_REGISTRY')}}"

default_registry: "{{lookup('env', 'LOOM_DEFAULT_DOCKER_REGISTRY')}}"
raw_image: "{{lookup('env', 'LOOM_DOCKER_IMAGE')}}"
loom_docker_image: "{% if default_registry %}{% if not ('.' in raw_image.split('/')[0]) %}{{default_registry}}/{% endif %}{% endif %}{{raw_image}}"

admin_files_dir: "{{lookup('env', 'LOOM_ADMIN_FILES_DIR')}}"
loom_playbook_dir: "{{lookup('env', 'LOOM_PLAYBOOK_DIR')}}"
Expand Down
6 changes: 6 additions & 0 deletions loomengine/client/playbooks/vars/gcloud_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ gce_email: "{{lookup('env','LOOM_GCE_EMAIL')}}"
gce_project: "{{lookup('env','LOOM_GCE_PROJECT')}}"
instance_image: "{{lookup('env','LOOM_GCLOUD_WORKER_INSTANCE_IMAGE')}}"
log_level: "{{lookup('env','LOOM_LOG_LEVEL')}}"

default_registry: "{{lookup('env', 'LOOM_DEFAULT_DOCKER_REGISTRY')}}"
raw_image: "{{lookup('env', 'LOOM_DOCKER_IMAGE')}}"
loom_docker_image: "{% if default_registry %}{% if not ('.' in raw_image.split('/')[0]) %}{{default_registry}}/{% endif %}{% endif %}{{raw_image}}"

loom_docker_image: "{{lookup('env','LOOM_DOCKER_IMAGE')}}"
loom_default_docker_registry: "{{lookup('env','LOOM_DEFAULT_DOCKER_REGISTRY')}}"
memory: "{{lookup('env','LOOM_TASK_ATTEMPT_MEMORY')|default('3.75',true)}}"
network: "{{lookup('env','LOOM_GCLOUD_WORKER_NETWORK')}}"
scratch_disk_mount_point: "{{lookup('env','LOOM_STORAGE_ROOT')}}"
Expand Down
3 changes: 2 additions & 1 deletion loomengine/client/settings/gcloud.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ LOOM_STORAGE_ROOT: /loom-data
LOOM_ANSIBLE_HOST_KEY_CHECKING: False
LOOM_ANSIBLE_INVENTORY: gce_wrapper.py

LOOM_DOCKER_IMAGE: registry.hub.docker.com/loomengine/loom:0.4.0
LOOM_DOCKER_IMAGE: loomengine/loom:0.4.0
LOOM_DEFAULT_DOCKER_REGISTRY: registry.hub.docker.com

LOOM_HTTP_PORT: 80
LOOM_HTTPS_PORT: 443
Expand Down
3 changes: 2 additions & 1 deletion loomengine/client/settings/local.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ LOOM_STORAGE_ROOT: ~/loom-data
LOOM_ANSIBLE_INVENTORY: localhost,
LOOM_ANSIBLE_HOST_KEY_CHECKING: False

LOOM_DOCKER_IMAGE: registry.hub.docker.com/loomengine/loom:0.4.0
LOOM_DOCKER_IMAGE: loomengine/loom:0.4.0
LOOM_DEFAULT_DOCKER_REGISTRY: registry.hub.docker.com

LOOM_HTTP_PORT: 80
LOOM_HTTP_PORT_ENABLED: True
Expand Down
16 changes: 8 additions & 8 deletions loomengine/master/api/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ def _run_with_delay(task_function, args, kwargs):
task_function.delay(*args, **kwargs)

@shared_task
def _postprocess_run(run_uuid, request):
def _postprocess_run(run_uuid, context):
from api.models import Run
Run.postprocess(run_uuid, request)
Run.postprocess(run_uuid, context)

def postprocess_run(*args, **kwargs):
if get_setting('TEST_NO_POSTPROCESS'):
Expand All @@ -49,11 +49,11 @@ def postprocess_run(*args, **kwargs):
return _run_with_delay(_postprocess_run, args, kwargs)

@shared_task
def _run_task(task_uuid, request):
def _run_task(task_uuid, context):
# If task has been run before, old TaskAttempt will be rendered inactive
from api.models.tasks import Task
task = Task.objects.get(uuid=task_uuid)
task_attempt = task.create_and_activate_attempt(request)
task_attempt = task.create_and_activate_attempt(context)
if get_setting('TEST_NO_RUN_TASK_ATTEMPT'):
logger.debug('Skipping async._run_execute_task_attempt_playbook because'\
'TEST_NO_RUN_TASK_ATTEMPT is True')
Expand Down Expand Up @@ -194,10 +194,10 @@ def _run_cleanup_task_playbook(task_attempt):
return subprocess.Popen(cmd_list, env=env, stderr=subprocess.STDOUT)

@shared_task
def _finish_task_attempt(task_attempt_uuid, request):
def _finish_task_attempt(task_attempt_uuid, context):
from api.models.tasks import TaskAttempt
task_attempt = TaskAttempt.objects.get(uuid=task_attempt_uuid)
task_attempt.finish(request)
task_attempt.finish(context)

def finish_task_attempt(*args, **kwargs):
return _run_with_delay(_finish_task_attempt, args, kwargs)
Expand All @@ -219,10 +219,10 @@ def kill_task_attempt(*args, **kwargs):
return _run_with_delay(_kill_task_attempt, args, kwargs)

@shared_task
def _send_run_notifications(run_uuid, request):
def _send_run_notifications(run_uuid, context):
from api.models.runs import Run
run = Run.objects.get(uuid=run_uuid)
run.send_notifications(request)
run.send_notifications(context)

def send_run_notifications(*args, **kwargs):
return _run_with_delay(_send_run_notifications, args, kwargs)
4 changes: 3 additions & 1 deletion loomengine/master/api/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-07-14 01:50
# Generated by Django 1.11 on 2017-07-15 02:52
from __future__ import unicode_literals

import api.models
Expand Down Expand Up @@ -200,7 +200,9 @@ class Migration(migrations.Migration):
('interpreter', models.CharField(max_length=1024)),
('command', models.TextField()),
('environment', jsonfield.fields.JSONField()),
('environment_info', jsonfield.fields.JSONField(blank=True)),
('resources', jsonfield.fields.JSONField(blank=True)),
('resources_info', jsonfield.fields.JSONField(blank=True)),
('last_heartbeat', models.DateTimeField(auto_now=True)),
('datetime_created', models.DateTimeField(default=django.utils.timezone.now, editable=False)),
('datetime_finished', models.DateTimeField(blank=True, null=True)),
Expand Down
80 changes: 39 additions & 41 deletions loomengine/master/api/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,7 @@ def get_output(self, channel):
assert len(outputs) == 1, 'missing output for channel %s' % channel
return outputs[0]

def get_topmost_run(self):
try:
self.run_request
except ObjectDoesNotExist:
return self.parent.get_topmost_run()
return self

def is_topmost_run(self):
try:
self.run_request
except ObjectDoesNotExist:
return False
return True

def finish(self, request):
def finish(self, context):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries(
Expand All @@ -137,14 +123,17 @@ def finish(self, request):
'status_is_finished': True})
if self.parent:
if self.parent._are_children_finished():
self.parent.finish(request)
self.parent.finish(context)
else:
# Send notifications only if topmost run
async.send_run_notifications(self.uuid, request)
async.send_run_notifications(self.uuid, context)

def _are_children_finished(self):
return all([step.status_is_finished for step in self.steps.all()])

def are_tasks_finished(self):
return all([task.status_is_finished for task in self.tasks.all()])

@classmethod
def create_from_template(cls, template, name=None,
notification_addresses=[], parent=None):
Expand Down Expand Up @@ -249,7 +238,7 @@ def has_terminal_status(self):
or self.status_is_failed \
or self.status_is_killed

def fail(self, request, detail=''):
def fail(self, context, detail=''):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries({
Expand All @@ -258,14 +247,14 @@ def fail(self, request, detail=''):
'status_is_waiting': False})
self.add_event("Run failed", detail=detail, is_error=True)
if self.parent:
self.parent.fail(request,
self.parent.fail(context,
detail='Failure in step %s@%s' % (
self.name, self.uuid))
else:
# Send kill signal to children
self._kill_children(detail='Automatically killed due to failure')
# Send notifications only if topmost run
async.send_run_notifications(self.uuid, request)
async.send_run_notifications(self.uuid, context)

def kill(self, detail=''):
if self.has_terminal_status():
Expand All @@ -283,18 +272,16 @@ def _kill_children(self, detail=''):
for task in self.tasks.all():
task.kill(detail=detail)

def send_notifications(self, request):
assert request is not None, 'Request missing'
server_url = '%s://%s' % (request.scheme,
request.get_host())
context = {
'server_name': get_setting('SERVER_NAME'),
'server_url': server_url,
def send_notifications(self, context=None):
if not context:
context = {}
server_url = context.get('server_url')
context.update({
'run_url': '%s/#/runs/%s/' % (server_url, self.uuid),
'run_api_url': '%s/api/runs/%s/' % (server_url, self.uuid),
'run_status': self.status,
'run_name_and_id': '%s@%s' % (self.name, self.uuid[0:8])
}
})
notification_addresses = []
if self.notification_addresses:
notification_addresses = self.notification_addresses
Expand Down Expand Up @@ -343,6 +330,17 @@ def _send_http_notifications(self, urls, context):
for url in urls:
requests.post(url, data = data)

@classmethod
def get_notification_context(cls, request):
if not request:
return {}
return {
'server_name': get_setting('SERVER_NAME'),
'server_url': '%s://%s' % (
request.scheme,
request.get_host()),
}

def set_running_status(self):
if self.status_is_running and not self.status_is_waiting:
return
Expand Down Expand Up @@ -385,7 +383,7 @@ def _claim_for_postprocessing(self):
'concurrent modification')

@classmethod
def postprocess(cls, run_uuid, request=None):
def postprocess(cls, run_uuid, context=None):
run = Run.objects.get(uuid=run_uuid)
if run.postprocessing_status == 'complete':
# Nothing more to do
Expand All @@ -397,21 +395,21 @@ def postprocess(cls, run_uuid, request=None):
return

try:
run._push_all_inputs(request)
run._push_all_inputs(context)
for step in run.steps.all():
step.initialize(request)
step.initialize(context)
run.setattrs_and_save_with_retries({
'postprocessing_status': 'complete'})

except Exception as e:
run.setattrs_and_save_with_retries({'postprocessing_status': 'failed'})
run.fail(request, detail='Postprocessing failed with error "%s"' % str(e))
run.fail(context, detail='Postprocessing failed with error "%s"' % str(e))
raise

def initialize(self, request=None):
def initialize(self, context=None):
self.connect_inputs_to_template_data()
self.create_steps()
async.postprocess_run(self.uuid, request)
async.postprocess_run(self.uuid, context)

def initialize_inputs(self):
seen = set()
Expand Down Expand Up @@ -503,17 +501,17 @@ def _create_connector(self, io_node, is_source):
connector.save()
connector.connect(io_node)

def _push_all_inputs(self, request):
def _push_all_inputs(self, context):
if get_setting('TEST_NO_PUSH_INPUTS_ON_RUN_CREATION'):
return
if self.inputs.exists():
for input in self.inputs.all():
self.push(input.channel, [], request)
self.push(input.channel, [], context)
elif self.is_leaf:
# Special case: No inputs on leaf node
self._push_input_set([], request)
self._push_input_set([], context)

def push(self, channel, data_path, request):
def push(self, channel, data_path, context):
"""Called when new data is available at the given data_path
on the given channel. This will trigger creation of new tasks if 1)
other input data for those tasks is available, and 2) the task with
Expand All @@ -525,12 +523,12 @@ def push(self, channel, data_path, request):
return
for input_set in InputCalculator(self.inputs.all(), channel, data_path)\
.get_input_sets():
self._push_input_set(input_set, request)
self._push_input_set(input_set, context)

def _push_input_set(self, input_set, request):
def _push_input_set(self, input_set, context):
try:
task = Task.create_from_input_set(input_set, self)
async.run_task(task.uuid, request)
async.run_task(task.uuid, context)
except TaskAlreadyExistsException:
pass

Expand Down
10 changes: 6 additions & 4 deletions loomengine/master/api/models/task_attempts.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ class TaskAttempt(BaseModel):
interpreter = models.CharField(max_length=1024)
command = models.TextField()
environment = jsonfield.JSONField()
environment_info = jsonfield.JSONField(blank=True)
resources = jsonfield.JSONField(blank=True)
resources_info = jsonfield.JSONField(blank=True)
last_heartbeat = models.DateTimeField(auto_now=True)
datetime_created = models.DateTimeField(default=timezone.now,
editable=False)
Expand Down Expand Up @@ -56,7 +58,7 @@ def heartbeat(self):
def get_output(self, channel):
return self.outputs.get(channel=channel)

def fail(self, request, detail=''):
def fail(self, context, detail=''):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries(
Expand All @@ -65,7 +67,7 @@ def fail(self, request, detail=''):
self.add_event("TaskAttempt failed", detail=detail, is_error=True)
try:
self.active_task.fail(
request,
context,
detail="Child TaskAttempt %s failed" % self.uuid)
except ObjectDoesNotExist:
# This attempt is no longer active
Expand All @@ -77,7 +79,7 @@ def has_terminal_status(self):
or self.status_is_failed \
or self.status_is_killed

def finish(self, request):
def finish(self, context):
if self.has_terminal_status():
return
self.setattrs_and_save_with_retries({
Expand All @@ -90,7 +92,7 @@ def finish(self, request):
# This attempt is no longer active
# and will be ignored.
return
task.finish(request)
task.finish(context)

def add_event(self, event, detail='', is_error=False):
event = TaskAttemptEvent.objects.create(
Expand Down

0 comments on commit cdc91f7

Please sign in to comment.