diff --git a/loomengine/client/playbooks/vars/common.yml b/loomengine/client/playbooks/vars/common.yml index 6221fb88..35ca279f 100644 --- a/loomengine/client/playbooks/vars/common.yml +++ b/loomengine/client/playbooks/vars/common.yml @@ -6,7 +6,11 @@ container_settings_home: /loom-settings server_name: "{{lookup('env', 'LOOM_SERVER_NAME')}}" log_level: "{{lookup('env', 'LOOM_LOG_LEVEL')|upper}}" -loom_docker_image: "{{lookup('env', 'LOOM_DOCKER_IMAGE')}}" +loom_default_docker_registry: "{{lookup('env', 'LOOM_DEFAULT_DOCKER_REGISTRY')}}" + +default_registry: "{{lookup('env', 'LOOM_DEFAULT_DOCKER_REGISTRY')}}" +raw_image: "{{lookup('env', 'LOOM_DOCKER_IMAGE')}}" +loom_docker_image: "{% if default_registry %}{% if not ('.' in raw_image.split('/')[0]) %}{{default_registry}}/{% endif %}{% endif %}{{raw_image}}" admin_files_dir: "{{lookup('env', 'LOOM_ADMIN_FILES_DIR')}}" loom_playbook_dir: "{{lookup('env', 'LOOM_PLAYBOOK_DIR')}}" diff --git a/loomengine/client/playbooks/vars/gcloud_worker.yml b/loomengine/client/playbooks/vars/gcloud_worker.yml index cb6ea3cf..3ae5097e 100644 --- a/loomengine/client/playbooks/vars/gcloud_worker.yml +++ b/loomengine/client/playbooks/vars/gcloud_worker.yml @@ -13,7 +13,13 @@ gce_email: "{{lookup('env','LOOM_GCE_EMAIL')}}" gce_project: "{{lookup('env','LOOM_GCE_PROJECT')}}" instance_image: "{{lookup('env','LOOM_GCLOUD_WORKER_INSTANCE_IMAGE')}}" log_level: "{{lookup('env','LOOM_LOG_LEVEL')}}" + +default_registry: "{{lookup('env', 'LOOM_DEFAULT_DOCKER_REGISTRY')}}" +raw_image: "{{lookup('env', 'LOOM_DOCKER_IMAGE')}}" +loom_docker_image: "{% if default_registry %}{% if not ('.' in raw_image.split('/')[0]) %}{{default_registry}}/{% endif %}{% endif %}{{raw_image}}" + loom_docker_image: "{{lookup('env','LOOM_DOCKER_IMAGE')}}" +loom_default_docker_registry: "{{lookup('env','LOOM_DEFAULT_DOCKER_REGISTRY')}}" memory: "{{lookup('env','LOOM_TASK_ATTEMPT_MEMORY')|default('3.75',true)}}" network: "{{lookup('env','LOOM_GCLOUD_WORKER_NETWORK')}}" scratch_disk_mount_point: "{{lookup('env','LOOM_STORAGE_ROOT')}}" diff --git a/loomengine/client/settings/gcloud.conf b/loomengine/client/settings/gcloud.conf index ad65bba6..8cb25a93 100644 --- a/loomengine/client/settings/gcloud.conf +++ b/loomengine/client/settings/gcloud.conf @@ -14,7 +14,8 @@ LOOM_STORAGE_ROOT: /loom-data LOOM_ANSIBLE_HOST_KEY_CHECKING: False LOOM_ANSIBLE_INVENTORY: gce_wrapper.py -LOOM_DOCKER_IMAGE: registry.hub.docker.com/loomengine/loom:0.4.0 +LOOM_DOCKER_IMAGE: loomengine/loom:0.4.0 +LOOM_DEFAULT_DOCKER_REGISTRY: registry.hub.docker.com LOOM_HTTP_PORT: 80 LOOM_HTTPS_PORT: 443 diff --git a/loomengine/client/settings/local.conf b/loomengine/client/settings/local.conf index adce064b..df779768 100644 --- a/loomengine/client/settings/local.conf +++ b/loomengine/client/settings/local.conf @@ -14,7 +14,8 @@ LOOM_STORAGE_ROOT: ~/loom-data LOOM_ANSIBLE_INVENTORY: localhost, LOOM_ANSIBLE_HOST_KEY_CHECKING: False -LOOM_DOCKER_IMAGE: registry.hub.docker.com/loomengine/loom:0.4.0 +LOOM_DOCKER_IMAGE: loomengine/loom:0.4.0 +LOOM_DEFAULT_DOCKER_REGISTRY: registry.hub.docker.com LOOM_HTTP_PORT: 80 LOOM_HTTP_PORT_ENABLED: True diff --git a/loomengine/master/api/async.py b/loomengine/master/api/async.py index b1de03bf..4edff0ec 100644 --- a/loomengine/master/api/async.py +++ b/loomengine/master/api/async.py @@ -37,9 +37,9 @@ def _run_with_delay(task_function, args, kwargs): task_function.delay(*args, **kwargs) @shared_task -def _postprocess_run(run_uuid, request): +def _postprocess_run(run_uuid, context): from api.models import Run - Run.postprocess(run_uuid, request) + Run.postprocess(run_uuid, context) def postprocess_run(*args, **kwargs): if get_setting('TEST_NO_POSTPROCESS'): @@ -49,11 +49,11 @@ def postprocess_run(*args, **kwargs): return _run_with_delay(_postprocess_run, args, kwargs) @shared_task -def _run_task(task_uuid, request): +def _run_task(task_uuid, context): # If task has been run before, old TaskAttempt will be rendered inactive from api.models.tasks import Task task = Task.objects.get(uuid=task_uuid) - task_attempt = task.create_and_activate_attempt(request) + task_attempt = task.create_and_activate_attempt(context) if get_setting('TEST_NO_RUN_TASK_ATTEMPT'): logger.debug('Skipping async._run_execute_task_attempt_playbook because'\ 'TEST_NO_RUN_TASK_ATTEMPT is True') @@ -194,10 +194,10 @@ def _run_cleanup_task_playbook(task_attempt): return subprocess.Popen(cmd_list, env=env, stderr=subprocess.STDOUT) @shared_task -def _finish_task_attempt(task_attempt_uuid, request): +def _finish_task_attempt(task_attempt_uuid, context): from api.models.tasks import TaskAttempt task_attempt = TaskAttempt.objects.get(uuid=task_attempt_uuid) - task_attempt.finish(request) + task_attempt.finish(context) def finish_task_attempt(*args, **kwargs): return _run_with_delay(_finish_task_attempt, args, kwargs) @@ -219,10 +219,10 @@ def kill_task_attempt(*args, **kwargs): return _run_with_delay(_kill_task_attempt, args, kwargs) @shared_task -def _send_run_notifications(run_uuid, request): +def _send_run_notifications(run_uuid, context): from api.models.runs import Run run = Run.objects.get(uuid=run_uuid) - run.send_notifications(request) + run.send_notifications(context) def send_run_notifications(*args, **kwargs): return _run_with_delay(_send_run_notifications, args, kwargs) diff --git a/loomengine/master/api/migrations/0001_initial.py b/loomengine/master/api/migrations/0001_initial.py index e76ed6ef..9263cfd1 100644 --- a/loomengine/master/api/migrations/0001_initial.py +++ b/loomengine/master/api/migrations/0001_initial.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Generated by Django 1.11 on 2017-07-14 01:50 +# Generated by Django 1.11 on 2017-07-15 02:52 from __future__ import unicode_literals import api.models @@ -200,7 +200,9 @@ class Migration(migrations.Migration): ('interpreter', models.CharField(max_length=1024)), ('command', models.TextField()), ('environment', jsonfield.fields.JSONField()), + ('environment_info', jsonfield.fields.JSONField(blank=True)), ('resources', jsonfield.fields.JSONField(blank=True)), + ('resources_info', jsonfield.fields.JSONField(blank=True)), ('last_heartbeat', models.DateTimeField(auto_now=True)), ('datetime_created', models.DateTimeField(default=django.utils.timezone.now, editable=False)), ('datetime_finished', models.DateTimeField(blank=True, null=True)), diff --git a/loomengine/master/api/models/runs.py b/loomengine/master/api/models/runs.py index d30daa0d..17a9703d 100644 --- a/loomengine/master/api/models/runs.py +++ b/loomengine/master/api/models/runs.py @@ -114,21 +114,7 @@ def get_output(self, channel): assert len(outputs) == 1, 'missing output for channel %s' % channel return outputs[0] - def get_topmost_run(self): - try: - self.run_request - except ObjectDoesNotExist: - return self.parent.get_topmost_run() - return self - - def is_topmost_run(self): - try: - self.run_request - except ObjectDoesNotExist: - return False - return True - - def finish(self, request): + def finish(self, context): if self.has_terminal_status(): return self.setattrs_and_save_with_retries( @@ -137,14 +123,17 @@ def finish(self, request): 'status_is_finished': True}) if self.parent: if self.parent._are_children_finished(): - self.parent.finish(request) + self.parent.finish(context) else: # Send notifications only if topmost run - async.send_run_notifications(self.uuid, request) + async.send_run_notifications(self.uuid, context) def _are_children_finished(self): return all([step.status_is_finished for step in self.steps.all()]) + def are_tasks_finished(self): + return all([task.status_is_finished for task in self.tasks.all()]) + @classmethod def create_from_template(cls, template, name=None, notification_addresses=[], parent=None): @@ -249,7 +238,7 @@ def has_terminal_status(self): or self.status_is_failed \ or self.status_is_killed - def fail(self, request, detail=''): + def fail(self, context, detail=''): if self.has_terminal_status(): return self.setattrs_and_save_with_retries({ @@ -258,14 +247,14 @@ def fail(self, request, detail=''): 'status_is_waiting': False}) self.add_event("Run failed", detail=detail, is_error=True) if self.parent: - self.parent.fail(request, + self.parent.fail(context, detail='Failure in step %s@%s' % ( self.name, self.uuid)) else: # Send kill signal to children self._kill_children(detail='Automatically killed due to failure') # Send notifications only if topmost run - async.send_run_notifications(self.uuid, request) + async.send_run_notifications(self.uuid, context) def kill(self, detail=''): if self.has_terminal_status(): @@ -283,18 +272,16 @@ def _kill_children(self, detail=''): for task in self.tasks.all(): task.kill(detail=detail) - def send_notifications(self, request): - assert request is not None, 'Request missing' - server_url = '%s://%s' % (request.scheme, - request.get_host()) - context = { - 'server_name': get_setting('SERVER_NAME'), - 'server_url': server_url, + def send_notifications(self, context=None): + if not context: + context = {} + server_url = context.get('server_url') + context.update({ 'run_url': '%s/#/runs/%s/' % (server_url, self.uuid), 'run_api_url': '%s/api/runs/%s/' % (server_url, self.uuid), 'run_status': self.status, 'run_name_and_id': '%s@%s' % (self.name, self.uuid[0:8]) - } + }) notification_addresses = [] if self.notification_addresses: notification_addresses = self.notification_addresses @@ -343,6 +330,17 @@ def _send_http_notifications(self, urls, context): for url in urls: requests.post(url, data = data) + @classmethod + def get_notification_context(cls, request): + if not request: + return {} + return { + 'server_name': get_setting('SERVER_NAME'), + 'server_url': '%s://%s' % ( + request.scheme, + request.get_host()), + } + def set_running_status(self): if self.status_is_running and not self.status_is_waiting: return @@ -385,7 +383,7 @@ def _claim_for_postprocessing(self): 'concurrent modification') @classmethod - def postprocess(cls, run_uuid, request=None): + def postprocess(cls, run_uuid, context=None): run = Run.objects.get(uuid=run_uuid) if run.postprocessing_status == 'complete': # Nothing more to do @@ -397,21 +395,21 @@ def postprocess(cls, run_uuid, request=None): return try: - run._push_all_inputs(request) + run._push_all_inputs(context) for step in run.steps.all(): - step.initialize(request) + step.initialize(context) run.setattrs_and_save_with_retries({ 'postprocessing_status': 'complete'}) except Exception as e: run.setattrs_and_save_with_retries({'postprocessing_status': 'failed'}) - run.fail(request, detail='Postprocessing failed with error "%s"' % str(e)) + run.fail(context, detail='Postprocessing failed with error "%s"' % str(e)) raise - def initialize(self, request=None): + def initialize(self, context=None): self.connect_inputs_to_template_data() self.create_steps() - async.postprocess_run(self.uuid, request) + async.postprocess_run(self.uuid, context) def initialize_inputs(self): seen = set() @@ -503,17 +501,17 @@ def _create_connector(self, io_node, is_source): connector.save() connector.connect(io_node) - def _push_all_inputs(self, request): + def _push_all_inputs(self, context): if get_setting('TEST_NO_PUSH_INPUTS_ON_RUN_CREATION'): return if self.inputs.exists(): for input in self.inputs.all(): - self.push(input.channel, [], request) + self.push(input.channel, [], context) elif self.is_leaf: # Special case: No inputs on leaf node - self._push_input_set([], request) + self._push_input_set([], context) - def push(self, channel, data_path, request): + def push(self, channel, data_path, context): """Called when new data is available at the given data_path on the given channel. This will trigger creation of new tasks if 1) other input data for those tasks is available, and 2) the task with @@ -525,12 +523,12 @@ def push(self, channel, data_path, request): return for input_set in InputCalculator(self.inputs.all(), channel, data_path)\ .get_input_sets(): - self._push_input_set(input_set, request) + self._push_input_set(input_set, context) - def _push_input_set(self, input_set, request): + def _push_input_set(self, input_set, context): try: task = Task.create_from_input_set(input_set, self) - async.run_task(task.uuid, request) + async.run_task(task.uuid, context) except TaskAlreadyExistsException: pass diff --git a/loomengine/master/api/models/task_attempts.py b/loomengine/master/api/models/task_attempts.py index be171e0e..28f9ebf2 100644 --- a/loomengine/master/api/models/task_attempts.py +++ b/loomengine/master/api/models/task_attempts.py @@ -24,7 +24,9 @@ class TaskAttempt(BaseModel): interpreter = models.CharField(max_length=1024) command = models.TextField() environment = jsonfield.JSONField() + environment_info = jsonfield.JSONField(blank=True) resources = jsonfield.JSONField(blank=True) + resources_info = jsonfield.JSONField(blank=True) last_heartbeat = models.DateTimeField(auto_now=True) datetime_created = models.DateTimeField(default=timezone.now, editable=False) @@ -56,7 +58,7 @@ def heartbeat(self): def get_output(self, channel): return self.outputs.get(channel=channel) - def fail(self, request, detail=''): + def fail(self, context, detail=''): if self.has_terminal_status(): return self.setattrs_and_save_with_retries( @@ -65,7 +67,7 @@ def fail(self, request, detail=''): self.add_event("TaskAttempt failed", detail=detail, is_error=True) try: self.active_task.fail( - request, + context, detail="Child TaskAttempt %s failed" % self.uuid) except ObjectDoesNotExist: # This attempt is no longer active @@ -77,7 +79,7 @@ def has_terminal_status(self): or self.status_is_failed \ or self.status_is_killed - def finish(self, request): + def finish(self, context): if self.has_terminal_status(): return self.setattrs_and_save_with_retries({ @@ -90,7 +92,7 @@ def finish(self, request): # This attempt is no longer active # and will be ignored. return - task.finish(request) + task.finish(context) def add_event(self, event, detail='', is_error=False): event = TaskAttemptEvent.objects.create( diff --git a/loomengine/master/api/models/tasks.py b/loomengine/master/api/models/tasks.py index c8b4ccbd..9cc6e254 100644 --- a/loomengine/master/api/models/tasks.py +++ b/loomengine/master/api/models/tasks.py @@ -29,13 +29,11 @@ class Task(BaseModel): command = models.TextField(blank=True) environment = jsonfield.JSONField() resources = jsonfield.JSONField(blank=True) - run = models.ForeignKey('Run', related_name='tasks', on_delete=models.CASCADE, null=True, # null for testing only blank=True) - task_attempt = models.OneToOneField('TaskAttempt', related_name='active_task', on_delete=models.CASCADE, @@ -81,7 +79,7 @@ def is_unresponsive(self): # has passed, we have probably missed 2 heartbeats return (timezone.now() - last_heartbeat).total_seconds() > timeout - def fail(self, request, detail=''): + def fail(self, context, detail=''): if self.has_terminal_status(): return self.setattrs_and_save_with_retries( @@ -90,24 +88,26 @@ def fail(self, request, detail=''): 'status_is_waiting': False}) self.add_event("Task failed", detail=detail, is_error=True) self._kill_children(detail=detail) - self.run.fail(request, detail='Task %s failed' % self.uuid) + self.run.fail(context, detail='Task %s failed' % self.uuid) def has_terminal_status(self): return self.status_is_finished \ or self.status_is_failed \ or self.status_is_killed - def finish(self, request): + def finish(self, context): if self.has_terminal_status(): return self.setattrs_and_save_with_retries( { 'datetime_finished': timezone.now(), 'status_is_finished': True, 'status_is_running': False, - 'status_is_waiting': False}) - self.run.finish(request) + 'status_is_waiting': False, + }) + if self.run.are_tasks_finished(): + self.run.finish(context) for output in self.outputs.all(): - output.push_data(self.data_path, request) + output.push_data(self.data_path, context) for task_attempt in self.all_task_attempts.all(): task_attempt.cleanup() @@ -169,7 +169,7 @@ def create_from_input_set(cls, input_set, run): run.set_running_status() return task - def create_and_activate_attempt(self, request): + def create_and_activate_attempt(self, context): try: task_attempt = TaskAttempt.create_from_task(self) self.setattrs_and_save_with_retries({ @@ -178,7 +178,7 @@ def create_and_activate_attempt(self, request): 'status_is_waiting': False}) return task_attempt except Exception as e: - task.fail(request, detail='Error creating TaskAttempt: "%s"' % str(e)) + self.fail(context, detail='Error creating TaskAttempt: "%s"' % str(e)) raise def get_input_context(self): @@ -253,7 +253,7 @@ class TaskOutput(DataChannel): blank=True) as_channel = models.CharField(max_length=255, null=True, blank=True) - def push_data(self, data_path, request): + def push_data(self, data_path, context): # Copy data from the TaskAttemptOutput to the TaskOutput # From there, it is already connected to downstream runs. attempt_output = self.task.task_attempt.get_output(self.channel) @@ -265,7 +265,7 @@ def push_data(self, data_path, request): run_output = self.task.run.get_output(self.channel) data_root = run_output.data_node for input in data_root.downstream_run_inputs.all(): - input.run.push(input.channel, data_path, request) + input.run.push(input.channel, data_path, context) class TaskEvent(BaseModel): diff --git a/loomengine/master/api/serializers/runs.py b/loomengine/master/api/serializers/runs.py index 7e7d420f..fa5a7537 100644 --- a/loomengine/master/api/serializers/runs.py +++ b/loomengine/master/api/serializers/runs.py @@ -214,7 +214,8 @@ def create(self, validated_data): s.save() run.initialize_inputs() run.initialize_outputs() - run.initialize(self.context.get('request')) + run.initialize(Run.get_notification_context( + self.context.get('request'))) return run @classmethod diff --git a/loomengine/master/api/serializers/task_attempts.py b/loomengine/master/api/serializers/task_attempts.py index 00586602..a43ec4f1 100644 --- a/loomengine/master/api/serializers/task_attempts.py +++ b/loomengine/master/api/serializers/task_attempts.py @@ -105,7 +105,9 @@ class Meta: 'interpreter', 'command', 'environment', + 'environment_info', 'resources', + 'resources_info', 'events' ] @@ -123,7 +125,9 @@ class Meta: events = TaskAttemptEventSerializer( many=True, allow_null=True, required=False) resources = serializers.JSONField(required=False) + resources_info = serializers.JSONField(required=False) environment = serializers.JSONField(required=False) + environment_info = serializers.JSONField(required=False) datetime_created = serializers.DateTimeField(required=False, format='iso-8601') datetime_finished = serializers.DateTimeField(required=False, format='iso-8601') last_heartbeat = serializers.DateTimeField(required=False, format='iso-8601') @@ -141,7 +145,9 @@ def update(self, instance, validated_data): status_is_finished = validated_data.pop('status_is_finished', None) status_is_failed = validated_data.pop('status_is_failed', None) status_is_running = validated_data.pop('status_is_running', None) - + environment_info = validated_data.pop('environment_info', None) + resources_info = validated_data.pop('resources_info', None) + attributes = {} if status_is_finished is not None: attributes['status_is_finished'] = status_is_finished @@ -149,6 +155,10 @@ def update(self, instance, validated_data): attributes['status_is_failed'] = status_is_failed if status_is_running is not None: attributes['status_is_running'] = status_is_running + if environment_info is not None: + attributes['environment_info'] = environment_info + if resources_info is not None: + attributes['resources_info'] = resources_info instance = instance.setattrs_and_save_with_retries(attributes) return instance @@ -195,7 +205,9 @@ class Meta: events = TaskAttemptEventSerializer( many=True, allow_null=True, required=False, write_only=True) resources = serializers.JSONField(required=False, write_only=True) + resources_info = serializers.JSONField(required=False, write_only=True) environment = serializers.JSONField(required=False, write_only=True) + environment_info = serializers.JSONField(required=False, write_only=True) last_heartbeat = serializers.DateTimeField( required=False, format='iso-8601', write_only=True) status_is_finished = serializers.BooleanField(required=False, write_only=True) diff --git a/loomengine/master/api/test/models/test_tasks.py b/loomengine/master/api/test/models/test_tasks.py index 7d1a88db..13b905fe 100644 --- a/loomengine/master/api/test/models/test_tasks.py +++ b/loomengine/master/api/test/models/test_tasks.py @@ -55,7 +55,7 @@ def testCreate(self): def testCreateAttempt(self): task = get_task() - task_attempt = task.create_and_activate_attempt() + task_attempt = task.create_and_activate_attempt({}) self.assertEqual(task_attempt.command, task.command) def testGetInputContext(self): @@ -71,7 +71,7 @@ class TestTaskAttempt(TestCase): def setUp(self): self.task = get_task() - self.task_attempt = self.task.create_and_activate_attempt() + self.task_attempt = self.task.create_and_activate_attempt({}) def testCreateAttempt(self): self.assertEqual(self.task_attempt.outputs.first().channel, diff --git a/loomengine/master/api/views.py b/loomengine/master/api/views.py index 356f5fa1..ae8f075d 100644 --- a/loomengine/master/api/views.py +++ b/loomengine/master/api/views.py @@ -162,15 +162,16 @@ def create_log_file(self, request, uuid=None): serializer_class=rest_framework.serializers.Serializer) def fail(self, request, uuid): task_attempt = self._get_task_attempt(request, uuid) - task_attempt.fail(request) + task_attempt.fail(models.Run.get_notification_context(request)) return JsonResponse({}, status=201) @detail_route(methods=['post'], url_path='finish', serializer_class=rest_framework.serializers.Serializer) def finish(self, request, uuid=None): task_attempt = self._get_task_attempt(request, uuid) - async.finish_task_attempt(task_attempt.uuid, - request) + async.finish_task_attempt( + task_attempt.uuid, + models.Run.get_notification_context(request)) return JsonResponse({}, status=201) @detail_route(methods=['post'], url_path='events', @@ -199,6 +200,7 @@ def get_task_execution_settings(self, request, uuid=None): 'WORKING_DIR': task_attempt.get_working_dir(), 'STDOUT_LOG_FILE': task_attempt.get_stdout_log_file(), 'STDERR_LOG_FILE': task_attempt.get_stderr_log_file(), + 'DEFAULT_DOCKER_REGISTRY': get_setting('DEFAULT_DOCKER_REGISTRY'), 'HEARTBEAT_INTERVAL_SECONDS': get_setting('TASKRUNNER_HEARTBEAT_INTERVAL_SECONDS'), }, status=200) diff --git a/loomengine/master/master/settings.py b/loomengine/master/master/settings.py index 22111261..9a96cab7 100644 --- a/loomengine/master/master/settings.py +++ b/loomengine/master/master/settings.py @@ -84,6 +84,8 @@ def to_list(value): PRESERVE_ALL = to_boolean(os.getenv('LOOM_PRESERVE_ALL', 'False')) MAXIMUM_TASK_RETRIES = os.getenv('LOOM_MAXIMUM_TASK_RETRIES', '2') +DEFAULT_DOCKER_REGISTRY = os.getenv('LOOM_DEFAULT_DOCKER_REGISTRY', 'docker.io') + # GCP settings GCE_EMAIL = os.getenv('GCE_EMAIL') GCE_PROJECT = os.getenv('GCE_PROJECT', '') diff --git a/loomengine/worker/task_execution_manager.py b/loomengine/worker/task_execution_manager.py index 87e45159..fbe4aceb 100755 --- a/loomengine/worker/task_execution_manager.py +++ b/loomengine/worker/task_execution_manager.py @@ -99,12 +99,15 @@ def _init_loggers(self): utils_logger = get_stdout_logger(loomengine.utils.__name__, log_level) def _init_task_attempt(self): - self.task_attempt = self.connection.get_task_attempt(self.settings['TASK_ATTEMPT_ID']) + self.task_attempt = self.connection.get_task_attempt( + self.settings['TASK_ATTEMPT_ID']) if self.task_attempt is None: - raise TaskAttemptNotFoundError('TaskAttempt ID "%s" not found' % self.settings['TASK_ATTEMPT_ID']) + raise TaskAttemptNotFoundError( + 'TaskAttempt ID "%s" not found' % self.settings['TASK_ATTEMPT_ID']) def _get_settings(self): - settings = self.connection.get_task_attempt_settings(self.settings['TASK_ATTEMPT_ID']) + settings = self.connection.get_task_attempt_settings( + self.settings['TASK_ATTEMPT_ID']) if settings is None: raise WorkerSettingsError('Worker settings not found') return settings @@ -235,11 +238,12 @@ def _try_to_pull_image(self): try: self._pull_image() - image_id = self.docker_client.inspect_image(self._get_docker_image())['Id'] - self._set_image_id(image_id) + container_info = self.docker_client.inspect_image( + self._get_docker_image()) + self._save_environment_info(container_info) self.logger.info( 'Pulled image %s and received image id %s' % ( - self._get_docker_image(), image_id)) + self._get_docker_image(), container_info['Id'])) except Exception as e: self._fail(detail='Pulling Docker image failed with error %s' % str(e)) raise @@ -255,8 +259,12 @@ def _pull_image(self): def _get_docker_image(self): # Tag is required. Otherwise docker-py pull will download all tags. docker_image = self.task_attempt['environment']['docker_image'] - if not ':' in docker_image: - docker_image = docker_image + ':latest' + if not '.' in docker_image.split('/')[0]: + default_registry = self.settings.get('DEFAULT_DOCKER_REGISTRY', None) + if default_registry: + docker_image = '%s/%s' % (default_registry, docker_image) + if not '@' in docker_image and not ':' in docker_image: + docker_image += ':latest' return docker_image def _parse_docker_output(self, data): @@ -424,10 +432,10 @@ def _set_container_id(self, container_id): {'container_id': container_id} ) - def _set_image_id(self, image_id): + def _save_environment_info(self, container_info): self.connection.update_task_attempt( self.settings['TASK_ATTEMPT_ID'], - {'image_id': image_id} + {'environment_info': container_info} ) def _event(self, event, detail='', is_error=False):