Skip to content

Commit

Permalink
preserve relative paths of files in upload_dir
Browse files Browse the repository at this point in the history
`upload_artifacts` now uploads files in subdirectories of
`artifact_dir`, preserving the relative paths.  This allows for
explicitly laying out how the artifact uploads should look like, on
disk, and allows for multiple artifacts with the same filename.
  • Loading branch information
escapewindow committed Aug 19, 2016
1 parent 3f11636 commit 36700db
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).

### Changed
- test files no longer use a test class.
- `upload_artifacts` now uploads files in subdirectories of `artifact_dir`, preserving the relative paths.

### Removed
- Removed unneeded creds file generation.
Expand Down
56 changes: 45 additions & 11 deletions scriptworker/task.py
Expand Up @@ -4,7 +4,6 @@
import aiohttp.hdrs
import arrow
import asyncio
import glob
import logging
import mimetypes
import os
Expand All @@ -16,9 +15,9 @@
import taskcluster.exceptions
from taskcluster.async import Queue

from scriptworker.exceptions import ScriptWorkerRetryException
from scriptworker.exceptions import ScriptWorkerRetryException, ScriptWorkerTaskException
from scriptworker.log import get_log_fhs, get_log_filenames, log_errors, read_stdout
from scriptworker.utils import raise_future_exceptions, retry_async
from scriptworker.utils import filepaths_in_dir, raise_future_exceptions, retry_async

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -114,20 +113,18 @@ def guess_content_type(path):
return content_type or "application/binary"


async def create_artifact(context, path, storage_type='s3', expires=None,
content_type=None):
async def create_artifact(context, path, target_path, storage_type='s3',
expires=None, content_type=None):
"""Create an artifact and upload it. This should support s3 and azure
out of the box; we'll need some tweaking if we want to support
redirect/error artifacts.
"""
temp_queue = get_temp_queue(context)
filename = os.path.basename(path)
payload = {
"storageType": storage_type,
"expires": expires or get_expiration_arrow(context).isoformat(),
"contentType": content_type or guess_content_type(path),
}
target_path = "public/env/{}".format(filename)
args = [context.claim_task['status']['taskId'], context.claim_task['runId'],
target_path, payload]
tc_response = await temp_queue.createArtifact(*args)
Expand Down Expand Up @@ -160,17 +157,54 @@ async def retry_create_artifact(*args, **kwargs):
)


def _update_upload_file_list(file_list, upload_config):
"""Helper function to update `file_list` with upload_config, while
making sure that only one file will be uploaded per `target_path`
"""
target_path = upload_config['target_path']
value = file_list.setdefault(target_path, upload_config)
if value != upload_config:
raise ScriptWorkerTaskException(
"Conflict in upload_artifacts target_paths: {} and {} are both {}!".format(
value['path'], upload_config['path'], target_path
),
exit_code=STATUSES['malformed-payload']
)


async def upload_artifacts(context):
"""Upload the task logs and any files in `artifact_dir`.
Currently we do not support recursing into subdirectories.
"""
files = dict([(k, None) for k in glob.glob(os.path.join(context.config['artifact_dir'], '*'))])
files.update(dict([(k, 'text/plain') for k in get_log_filenames(context)]))
file_list = {}
for target_path in filepaths_in_dir(context.config['artifact_dir']):
path = os.path.join(context.config['artifact_dir'], target_path)
if not target_path.startswith('public/'):
target_path = 'public/{}'.format(target_path)
upload_config = {
'path': path,
'target_path': target_path,
'content_type': None,
}
_update_upload_file_list(file_list, upload_config)

for path in get_log_filenames(context):
target_path = 'public/logs/{}'.format(os.path.basename(path))
upload_config = {
'path': path,
'target_path': target_path,
'content_type': 'text/plain'
}
_update_upload_file_list(file_list, upload_config)
tasks = []
for path, content_type in files.items():
for upload_config in file_list.values():
tasks.append(
asyncio.ensure_future(
retry_create_artifact(context, path, content_type=content_type)
retry_create_artifact(
context, upload_config['path'],
target_path=upload_config['target_path'],
content_type=upload_config['content_type']
)
)
)
await raise_future_exceptions(tasks)
Expand Down
24 changes: 19 additions & 5 deletions tests/test_task.py
Expand Up @@ -9,7 +9,7 @@
import os
import pytest
from scriptworker.context import Context
from scriptworker.exceptions import ScriptWorkerRetryException
from scriptworker.exceptions import ScriptWorkerRetryException, ScriptWorkerTaskException
import scriptworker.task as task
import scriptworker.log as log
import sys
Expand Down Expand Up @@ -150,11 +150,11 @@ async def test_reclaim_task_non_409(context, successful_queue):
@pytest.mark.asyncio
async def test_upload_artifacts(context):
args = []
os.makedirs(context.config['artifact_dir'])
os.makedirs(os.path.join(context.config['artifact_dir'], 'public'))
os.makedirs(context.config['log_dir'])
paths = list(log.get_log_filenames(context)) + [
os.path.join(context.config['artifact_dir'], 'one'),
os.path.join(context.config['artifact_dir'], 'two'),
os.path.join(context.config['artifact_dir'], 'public/two'),
]
for path in paths:
touch(path)
Expand All @@ -168,6 +168,20 @@ async def foo(_, path, **kwargs):
assert sorted(args) == sorted(paths)


def test_bad_update_upload_file_list():
with pytest.raises(ScriptWorkerTaskException):
task._update_upload_file_list(
{'existing_key': {
'path': 'one',
'target_path': 'existing_key',
}},
{
'path': 'two',
'target_path': 'existing_key'
}
)


@pytest.mark.asyncio
async def test_create_artifact(context, fake_session, successful_queue):
path = os.path.join(context.config['artifact_dir'], "one.txt")
Expand All @@ -177,7 +191,7 @@ async def test_create_artifact(context, fake_session, successful_queue):
expires = arrow.utcnow().isoformat()
with mock.patch('scriptworker.task.get_temp_queue') as p:
p.return_value = successful_queue
await task.create_artifact(context, path, expires=expires)
await task.create_artifact(context, path, "public/env/one.txt", expires=expires)
assert successful_queue.info == [
"createArtifact", ('taskId', 'runId', "public/env/one.txt", {
"storageType": "s3",
Expand All @@ -198,7 +212,7 @@ async def test_create_artifact_retry(context, fake_session_500, successful_queue
with pytest.raises(ScriptWorkerRetryException):
with mock.patch('scriptworker.task.get_temp_queue') as p:
p.return_value = successful_queue
await task.create_artifact(context, path, expires=expires)
await task.create_artifact(context, path, "public/env/one.log", expires=expires)
context.session.close()


Expand Down

0 comments on commit 36700db

Please sign in to comment.