Skip to content

Commit

Permalink
Precreate certain outputs for upload 2.0 API.
Browse files Browse the repository at this point in the history
Trying to improve the user experience of this rule based uploader by placing HDAs and HDCAs in the history at the outset that the history panel can poll and that we can turn red if the upload fails.

From Marius' PR review:

> I can see that a job launched in my logs, but it failed and there were no visual indications of this in the UI

Not every HDA for instance can be created, for example if reading them from a zip file for instance that happens on the backend still. Likewise if HDCAs don't define a collection type up front they cannot be pre-created (if for instance that is inferred from a folder structure). Library things aren't precreated at all in this commit. There is room to pre-create more but I think this is an atomic commit as it is now and it will hopefully improve the user experience for the rule based uploader considerably.
  • Loading branch information
jmchilton committed Mar 8, 2018
1 parent 54ee573 commit 60f632b
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 34 deletions.
65 changes: 64 additions & 1 deletion lib/galaxy/tools/actions/upload.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import json
import logging
import os

from galaxy.dataset_collections.structure import UnitializedTree
from galaxy.exceptions import RequestParameterMissingException
from galaxy.tools.actions import upload_common
from galaxy.util import ExecutionTimer
from galaxy.util.bunch import Bunch
from . import ToolAction

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -79,7 +82,67 @@ def replace_file_srcs(request_part):

replace_file_srcs(request)

outputs = []
for target in request.get("targets", []):
destination = target.get("destination")
destination_type = destination.get("type")
# Start by just pre-creating HDAs.
if destination_type == "hdas":
if target.get("elements_from"):
# Dynamic collection required I think.
continue
_precreate_fetched_hdas(trans, history, target, outputs)

if destination_type == "hdca":
_precreate_fetched_collection_instance(trans, history, target, outputs)

incoming["request_json"] = json.dumps(request)
return self._create_job(
trans, incoming, tool, None, [], history=history
trans, incoming, tool, None, outputs, history=history
)


def _precreate_fetched_hdas(trans, history, target, outputs):
for item in target.get("elements", []):
name = item.get("name", None)
if name is None:
src = item.get("src", None)
if src == "url":
url = item.get("url")
if name is None:
name = url.split("/")[-1]
elif src == "path":
path = item["path"]
if name is None:
name = os.path.basename(path)

file_type = item.get("ext", "auto")
dbkey = item.get("dbkey", "?")
uploaded_dataset = Bunch(
type='file', name=name, file_type=file_type, dbkey=dbkey
)
data = upload_common.new_upload(trans, '', uploaded_dataset, library_bunch=None, history=history)
outputs.append(data)
item["object_id"] = data.id


def _precreate_fetched_collection_instance(trans, history, target, outputs):
collection_type = target.get("collection_type")
if not collection_type:
# Can't precreate collections of unknown type at this time.
return

name = target.get("name")
if not name:
return

collections_service = trans.app.dataset_collections_service
collection_type_description = collections_service.collection_type_descriptions.for_collection_type(collection_type)
structure = UnitializedTree(collection_type_description)
hdca = collections_service.precreate_dataset_collection_instance(
trans, history, name, structure=structure
)
outputs.append(hdca)
# Following flushed needed for an ID.
trans.sa_session.flush()
target["destination"]["object_id"] = hdca.id
42 changes: 25 additions & 17 deletions lib/galaxy/tools/actions/upload_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def _chown(path):
return json_file_path


def create_job(trans, params, tool, json_file_path, data_list, folder=None, history=None, job_params=None):
def create_job(trans, params, tool, json_file_path, outputs, folder=None, history=None, job_params=None):
"""
Create the upload job.
"""
Expand Down Expand Up @@ -412,21 +412,28 @@ def create_job(trans, params, tool, json_file_path, data_list, folder=None, hist
job.add_parameter(name, value)
job.add_parameter('paramfile', dumps(json_file_path))
object_store_id = None
for i, dataset in enumerate(data_list):
if folder:
job.add_output_library_dataset('output%i' % i, dataset)
for i, output_object in enumerate(outputs):
output_name = "output%i" % i
if hasattr(output_object, "collection"):
job.add_output_dataset_collection(output_name, output_object)
output_object.job = job
else:
job.add_output_dataset('output%i' % i, dataset)
# Create an empty file immediately
if not dataset.dataset.external_filename:
dataset.dataset.object_store_id = object_store_id
try:
trans.app.object_store.create(dataset.dataset)
except ObjectInvalid:
raise Exception('Unable to create output dataset: object store is full')
object_store_id = dataset.dataset.object_store_id
trans.sa_session.add(dataset)
# open( dataset.file_name, "w" ).close()
dataset = output_object
if folder:
job.add_output_library_dataset(output_name, dataset)
else:
job.add_output_dataset(output_name, dataset)
# Create an empty file immediately
if not dataset.dataset.external_filename:
dataset.dataset.object_store_id = object_store_id
try:
trans.app.object_store.create(dataset.dataset)
except ObjectInvalid:
raise Exception('Unable to create output dataset: object store is full')
object_store_id = dataset.dataset.object_store_id

trans.sa_session.add(output_object)

job.object_store_id = object_store_id
job.set_state(job.states.NEW)
job.set_handler(tool.get_job_handler(None))
Expand All @@ -440,8 +447,9 @@ def create_job(trans, params, tool, json_file_path, data_list, folder=None, hist
trans.app.job_manager.job_queue.put(job.id, job.tool_id)
trans.log_event("Added job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id)
output = odict()
for i, v in enumerate(data_list):
output['output%i' % i] = v
for i, v in enumerate(outputs):
if not hasattr(output_object, "collection_type"):
output['output%i' % i] = v
return job, output


Expand Down
3 changes: 3 additions & 0 deletions lib/galaxy/tools/data_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def _resolve_src(item):
dbkey = item.get("dbkey", "?")
requested_ext = item.get("ext", "auto")
info = item.get("info", None)
object_id = item.get("object_id", None)
link_data_only = upload_config.link_data_only
if "link_data_only" in item:
# Allow overriding this on a per file basis.
Expand Down Expand Up @@ -170,6 +171,8 @@ def _resolve_src(item):
rval = {"name": name, "filename": path, "dbkey": dbkey, "ext": ext, "link_data_only": link_data_only}
if info is not None:
rval["info"] = info
if object_id is not None:
rval["object_id"] = object_id
return rval

elements = elements_tree_map(_resolve_src, items)
Expand Down
42 changes: 30 additions & 12 deletions lib/galaxy/tools/parameters/output_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,18 @@ def add_elements_to_folder(elements, library_folder):
elif destination_type == "hdca":
history = job.history
assert "collection_type" in unnamed_output_dict
name = unnamed_output_dict.get("name", "unnamed collection")
collection_type = unnamed_output_dict["collection_type"]
collection_type_description = collections_service.collection_type_descriptions.for_collection_type(collection_type)
structure = UnitializedTree(collection_type_description)
hdca = collections_service.precreate_dataset_collection_instance(
trans, history, name, structure=structure
)
object_id = destination.get("object_id")
if object_id:
sa_session = tool.app.model.context
hdca = sa_session.query(app.model.HistoryDatasetCollectionAssociation).get(int(object_id))
else:
name = unnamed_output_dict.get("name", "unnamed collection")
collection_type = unnamed_output_dict["collection_type"]
collection_type_description = collections_service.collection_type_descriptions.for_collection_type(collection_type)
structure = UnitializedTree(collection_type_description)
hdca = collections_service.precreate_dataset_collection_instance(
trans, history, name, structure=structure
)
filenames = odict.odict()

def add_to_discovered_files(elements, parent_identifiers=[]):
Expand Down Expand Up @@ -268,6 +273,12 @@ def collect_elements_for_history(elements):
# Create new primary dataset
name = fields_match.name or designation

hda_id = discovered_file.match.object_id
primary_dataset = None
if hda_id:
sa_session = tool.app.model.context
primary_dataset = sa_session.query(app.model.HistoryDatasetAssociation).get(int(hda_id))

dataset = job_context.create_dataset(
ext=ext,
designation=designation,
Expand All @@ -276,7 +287,8 @@ def collect_elements_for_history(elements):
name=name,
filename=discovered_file.path,
info=info,
link_data=link_data
link_data=link_data,
primary_data=primary_dataset,
)
dataset.raw_set_dataset_state('ok')
datasets.append(dataset)
Expand Down Expand Up @@ -451,14 +463,16 @@ def create_dataset(
info=None,
library_folder=None,
link_data=False,
primary_data=None,
):
app = self.app
sa_session = self.sa_session

if not library_folder:
primary_data = _new_hda(app, sa_session, ext, designation, visible, dbkey, self.permissions)
else:
primary_data = _new_ldda(self.work_context, name, ext, visible, dbkey, library_folder)
if primary_data is None:
if not library_folder:
primary_data = _new_hda(app, sa_session, ext, designation, visible, dbkey, self.permissions)
else:
primary_data = _new_ldda(self.work_context, name, ext, visible, dbkey, library_folder)

# Copy metadata from one of the inputs if requested.
metadata_source = None
Expand Down Expand Up @@ -843,6 +857,10 @@ def visible(self):
def link_data(self):
return bool(self.as_dict.get("link_data_only", False))

@property
def object_id(self):
return self.as_dict.get("object_id", None)


class RegexCollectedDatasetMatch(JsonCollectedDatasetMatch):

Expand Down
6 changes: 6 additions & 0 deletions lib/galaxy/webapps/galaxy/api/_fetch_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def validate_and_normalize_targets(trans, payload):
for target in targets:
destination = _get_required_item(target, "destination", "Each target must specify a 'destination'")
destination_type = _get_required_item(destination, "type", "Each target destination must specify a 'type'")
if "object_id" in destination:
raise RequestParameterInvalidException("object_id not allowed to appear in the request.")

if destination_type not in VALID_DESTINATION_TYPES:
template = "Invalid target destination type [%s] encountered, must be one of %s"
msg = template % (destination_type, VALID_DESTINATION_TYPES)
Expand All @@ -63,6 +66,9 @@ def validate_and_normalize_targets(trans, payload):
payload["check_content"] = trans.app.config.check_upload_content

def check_src(item):
if "object_id" in item:
raise RequestParameterInvalidException("object_id not allowed to appear in the request.")

# Normalize file:// URLs into paths.
if item["src"] == "url" and item["url"].startswith("file://"):
item["src"] = "path"
Expand Down
18 changes: 14 additions & 4 deletions test/integration/test_upload_configuration_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,14 @@ def test_ftp_fetch(self):
"destination": {"type": "hdca"},
"elements": elements,
"collection_type": "list",
"name": "cool collection",
}
response = self.fetch_target(target)
self._assert_status_code_is(response, 200)
response_object = response.json()
assert "output_collections" in response_object
output_collections = response_object["output_collections"]
assert len(output_collections) == 1, response_object
dataset = self.dataset_populator.get_history_dataset_details(self.history_id, hid=2)
self._check_content(dataset, content)

Expand Down Expand Up @@ -787,13 +792,18 @@ def test_fetch_history_compressed_type(self):
"history_id": self.history_id, # TODO: Shouldn't be needed :(
"targets": json.dumps(targets),
}
self.dataset_populator.fetch(payload)
fetch_response = self.dataset_populator.fetch(payload)
self._assert_status_code_is(fetch_response, 200)
outputs = fetch_response.json()["outputs"]
assert len(outputs) == 1
output = outputs[0]
assert output["name"] == "1.fastqsanger.gz"
contents_response = self.dataset_populator._get_contents_request(self.history_id)
assert contents_response.status_code == 200
contents = contents_response.json()
assert len(contents) == 1
print(contents)
contents[0]["extension"] == "fastqsanger.gz"
assert len(contents) == 1, contents
assert contents[0]["extension"] == "fastqsanger.gz", contents[0]
assert contents[0]["name"] == "1.fastqsanger.gz", contents[0]

def test_fetch_recursive_archive_history(self):
destination = {"type": "hdas"}
Expand Down

0 comments on commit 60f632b

Please sign in to comment.