diff --git a/CHANGELOG.md b/CHANGELOG.md index 44d00307..af6d07ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](http://semver.org/). ### Changed - test files no longer use a test class. +- `upload_artifacts` now uploads files in subdirectories of `artifact_dir`, preserving the relative paths. ### Removed - Removed unneeded creds file generation. diff --git a/scriptworker/task.py b/scriptworker/task.py index b263320e..c5ca424e 100644 --- a/scriptworker/task.py +++ b/scriptworker/task.py @@ -4,7 +4,6 @@ import aiohttp.hdrs import arrow import asyncio -import glob import logging import mimetypes import os @@ -16,9 +15,9 @@ import taskcluster.exceptions from taskcluster.async import Queue -from scriptworker.exceptions import ScriptWorkerRetryException +from scriptworker.exceptions import ScriptWorkerRetryException, ScriptWorkerTaskException from scriptworker.log import get_log_fhs, get_log_filenames, log_errors, read_stdout -from scriptworker.utils import raise_future_exceptions, retry_async +from scriptworker.utils import filepaths_in_dir, raise_future_exceptions, retry_async log = logging.getLogger(__name__) @@ -114,20 +113,18 @@ def guess_content_type(path): return content_type or "application/binary" -async def create_artifact(context, path, storage_type='s3', expires=None, - content_type=None): +async def create_artifact(context, path, target_path, storage_type='s3', + expires=None, content_type=None): """Create an artifact and upload it. This should support s3 and azure out of the box; we'll need some tweaking if we want to support redirect/error artifacts. """ temp_queue = get_temp_queue(context) - filename = os.path.basename(path) payload = { "storageType": storage_type, "expires": expires or get_expiration_arrow(context).isoformat(), "contentType": content_type or guess_content_type(path), } - target_path = "public/env/{}".format(filename) args = [context.claim_task['status']['taskId'], context.claim_task['runId'], target_path, payload] tc_response = await temp_queue.createArtifact(*args) @@ -160,17 +157,54 @@ async def retry_create_artifact(*args, **kwargs): ) +def _update_upload_file_list(file_list, upload_config): + """Helper function to update `file_list` with upload_config, while + making sure that only one file will be uploaded per `target_path` + """ + target_path = upload_config['target_path'] + value = file_list.setdefault(target_path, upload_config) + if value != upload_config: + raise ScriptWorkerTaskException( + "Conflict in upload_artifacts target_paths: {} and {} are both {}!".format( + value['path'], upload_config['path'], target_path + ), + exit_code=STATUSES['malformed-payload'] + ) + + async def upload_artifacts(context): """Upload the task logs and any files in `artifact_dir`. Currently we do not support recursing into subdirectories. """ - files = dict([(k, None) for k in glob.glob(os.path.join(context.config['artifact_dir'], '*'))]) - files.update(dict([(k, 'text/plain') for k in get_log_filenames(context)])) + file_list = {} + for target_path in filepaths_in_dir(context.config['artifact_dir']): + path = os.path.join(context.config['artifact_dir'], target_path) + if not target_path.startswith('public/'): + target_path = 'public/{}'.format(target_path) + upload_config = { + 'path': path, + 'target_path': target_path, + 'content_type': None, + } + _update_upload_file_list(file_list, upload_config) + + for path in get_log_filenames(context): + target_path = 'public/logs/{}'.format(os.path.basename(path)) + upload_config = { + 'path': path, + 'target_path': target_path, + 'content_type': 'text/plain' + } + _update_upload_file_list(file_list, upload_config) tasks = [] - for path, content_type in files.items(): + for upload_config in file_list.values(): tasks.append( asyncio.ensure_future( - retry_create_artifact(context, path, content_type=content_type) + retry_create_artifact( + context, upload_config['path'], + target_path=upload_config['target_path'], + content_type=upload_config['content_type'] + ) ) ) await raise_future_exceptions(tasks) diff --git a/tests/test_task.py b/tests/test_task.py index 45d029bc..672496a1 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -9,7 +9,7 @@ import os import pytest from scriptworker.context import Context -from scriptworker.exceptions import ScriptWorkerRetryException +from scriptworker.exceptions import ScriptWorkerRetryException, ScriptWorkerTaskException import scriptworker.task as task import scriptworker.log as log import sys @@ -150,11 +150,11 @@ async def test_reclaim_task_non_409(context, successful_queue): @pytest.mark.asyncio async def test_upload_artifacts(context): args = [] - os.makedirs(context.config['artifact_dir']) + os.makedirs(os.path.join(context.config['artifact_dir'], 'public')) os.makedirs(context.config['log_dir']) paths = list(log.get_log_filenames(context)) + [ os.path.join(context.config['artifact_dir'], 'one'), - os.path.join(context.config['artifact_dir'], 'two'), + os.path.join(context.config['artifact_dir'], 'public/two'), ] for path in paths: touch(path) @@ -168,6 +168,20 @@ async def foo(_, path, **kwargs): assert sorted(args) == sorted(paths) +def test_bad_update_upload_file_list(): + with pytest.raises(ScriptWorkerTaskException): + task._update_upload_file_list( + {'existing_key': { + 'path': 'one', + 'target_path': 'existing_key', + }}, + { + 'path': 'two', + 'target_path': 'existing_key' + } + ) + + @pytest.mark.asyncio async def test_create_artifact(context, fake_session, successful_queue): path = os.path.join(context.config['artifact_dir'], "one.txt") @@ -177,7 +191,7 @@ async def test_create_artifact(context, fake_session, successful_queue): expires = arrow.utcnow().isoformat() with mock.patch('scriptworker.task.get_temp_queue') as p: p.return_value = successful_queue - await task.create_artifact(context, path, expires=expires) + await task.create_artifact(context, path, "public/env/one.txt", expires=expires) assert successful_queue.info == [ "createArtifact", ('taskId', 'runId', "public/env/one.txt", { "storageType": "s3", @@ -198,7 +212,7 @@ async def test_create_artifact_retry(context, fake_session_500, successful_queue with pytest.raises(ScriptWorkerRetryException): with mock.patch('scriptworker.task.get_temp_queue') as p: p.return_value = successful_queue - await task.create_artifact(context, path, expires=expires) + await task.create_artifact(context, path, "public/env/one.log", expires=expires) context.session.close()