Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix discarded datasets when importing history from file sources using tasks #14989

Merged
merged 7 commits into from Nov 17, 2022
2 changes: 1 addition & 1 deletion lib/galaxy/managers/model_stores.py
Expand Up @@ -168,7 +168,6 @@ def write_history_to(self, request: WriteHistoryTo):

def import_model_store(self, request: ImportModelStoreTaskRequest):
import_options = ImportOptions(
discarded_data=ImportDiscardedDataType.FORCE,
allow_library_creation=request.for_library,
)
history_id = request.history_id
Expand All @@ -186,6 +185,7 @@ def import_model_store(self, request: ImportModelStoreTaskRequest):
self._app,
galaxy_user,
import_options,
model_store_format=request.model_store_format,
)
new_history = history is None and not request.for_library
if new_history:
Expand Down
33 changes: 24 additions & 9 deletions lib/galaxy/model/store/__init__.py
@@ -1,6 +1,7 @@
import abc
import contextlib
import datetime
import logging
import os
import shutil
import tarfile
Expand Down Expand Up @@ -54,6 +55,7 @@
add_object_to_session,
get_object_session,
)
from galaxy.model.tags import GalaxyTagHandler
from galaxy.objectstore import ObjectStore
from galaxy.schema.bco import (
BioComputeObjectCore,
Expand All @@ -78,6 +80,7 @@
get_contributors,
write_to_file,
)
from galaxy.schema.schema import ModelStoreFormat
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.util import (
FILENAME_VALID_CHARS,
Expand All @@ -101,6 +104,7 @@
if TYPE_CHECKING:
from galaxy.managers.workflows import WorkflowContentsManager

log = logging.getLogger(__name__)

ObjectKeyType = Union[str, int]

Expand Down Expand Up @@ -134,6 +138,7 @@ class StoreAppProtocol(Protocol):
datatypes_registry: Registry
object_store: ObjectStore
security: IdEncodingHelper
tag_handler: GalaxyTagHandler
model: GalaxyModelMapping
file_sources: ConfiguredFileSources
workflow_contents_manager: "WorkflowContentsManager"
Expand Down Expand Up @@ -623,9 +628,18 @@ def handle_dataset_object_edit(dataset_instance, dataset_attrs):
else:
# Need a user to run library jobs to generate metadata...
pass
self.app.datatypes_registry.set_external_metadata_tool.regenerate_imported_metadata_if_needed(
dataset_instance, history, **regenerate_kwds
)
if self.app.datatypes_registry.set_external_metadata_tool:
self.app.datatypes_registry.set_external_metadata_tool.regenerate_imported_metadata_if_needed(
dataset_instance, history, **regenerate_kwds
)
else:
# Try to set metadata directly. @mvdbeek thinks we should only record the datasets
try:
if dataset_instance.has_metadata_files:
dataset_instance.datatype.set_meta(dataset_instance)
except Exception:
log.debug(f"Metadata setting failed on {dataset_instance}", exc_info=True)
dataset_instance.dataset.state = dataset_instance.dataset.states.FAILED_METADATA
davelopez marked this conversation as resolved.
Show resolved Hide resolved

if model_class == "HistoryDatasetAssociation":
if object_key in dataset_attrs:
Expand Down Expand Up @@ -2646,7 +2660,7 @@ def source_to_import_store(
app: StoreAppProtocol,
galaxy_user: Optional[model.User],
import_options: Optional[ImportOptions],
model_store_format: Optional[str] = None,
model_store_format: Optional[ModelStoreFormat] = None,
) -> ModelImportStore:
if isinstance(source, dict):
if model_store_format is not None:
Expand All @@ -2662,6 +2676,7 @@ def source_to_import_store(
else:
source_uri: str = str(source)
delete = False
tag_handler = app.tag_handler.create_tag_handler_session()
if source_uri.startswith("file://"):
source_uri = source_uri[len("file://") :]
if "://" in source_uri:
Expand All @@ -2680,21 +2695,21 @@ def source_to_import_store(
)
elif os.path.isdir(target_path):
model_import_store = get_import_model_store_for_directory(
target_path, import_options=import_options, app=app, user=galaxy_user
target_path, import_options=import_options, app=app, user=galaxy_user, tag_handler=tag_handler
)
else:
model_store_format = model_store_format or "tgz"
if model_store_format in ["tar.gz", "tgz", "tar"]:
model_store_format = model_store_format or ModelStoreFormat.TGZ
if ModelStoreFormat.is_compressed(model_store_format):
try:
temp_dir = mkdtemp()
target_dir = CompressedFile(target_path).extract(temp_dir)
finally:
if delete:
os.remove(target_path)
model_import_store = get_import_model_store_for_directory(
target_dir, import_options=import_options, app=app, user=galaxy_user
target_dir, import_options=import_options, app=app, user=galaxy_user, tag_handler=tag_handler
)
elif model_store_format in ["bag.gz", "bag.tar", "bag.zip"]:
elif ModelStoreFormat.is_bag(model_store_format):
model_import_store = BagArchiveImportModelStore(
target_path, import_options=import_options, app=app, user=galaxy_user
)
Expand Down
8 changes: 8 additions & 0 deletions lib/galaxy/schema/schema.py
Expand Up @@ -1305,6 +1305,14 @@ class ModelStoreFormat(str, Enum):
ROCRATE_ZIP = "rocrate.zip"
BCO_JSON = "bco.json"

@classmethod
def is_compressed(cls, value: "ModelStoreFormat"):
return value in [cls.TAR_DOT_GZ, cls.TGZ, cls.TAR, cls.ROCRATE_ZIP]

@classmethod
def is_bag(cls, value: "ModelStoreFormat"):
return value in [cls.BAG_DOT_TAR, cls.BAG_DOT_TGZ, cls.BAG_DOT_ZIP]


class StoreContentSource(Model):
store_content_uri: Optional[str]
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/schema/tasks.py
Expand Up @@ -10,6 +10,7 @@
BcoGenerationParametersMixin,
DatasetSourceType,
HistoryContentType,
ModelStoreFormat,
StoreExportPayload,
WriteStoreToPayload,
)
Expand Down Expand Up @@ -88,6 +89,7 @@ class ImportModelStoreTaskRequest(BaseModel):
history_id: Optional[int]
source_uri: str
for_library: bool
model_store_format: Optional[ModelStoreFormat]


class MaterializeDatasetInstanceTaskRequest(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/webapps/galaxy/services/histories.py
Expand Up @@ -281,6 +281,7 @@ def create_from_store_async(
user=trans.async_request_user,
source_uri=source_uri,
for_library=False,
model_store_format=payload.model_store_format,
)
result = import_model_store.delay(request=request)
return async_task_summary(result)
Expand Down
37 changes: 33 additions & 4 deletions lib/galaxy_test/base/populators.py
Expand Up @@ -477,17 +477,23 @@ def create_from_store_raw_async(self, payload: Dict[str, Any]) -> Response:
return create_response

def create_from_store(
self, store_dict: Optional[Dict[str, Any]] = None, store_path: Optional[str] = None
self,
store_dict: Optional[Dict[str, Any]] = None,
store_path: Optional[str] = None,
model_store_format: Optional[str] = None,
) -> Dict[str, Any]:
payload = _store_payload(store_dict=store_dict, store_path=store_path)
payload = _store_payload(store_dict=store_dict, store_path=store_path, model_store_format=model_store_format)
create_response = self.create_from_store_raw(payload)
api_asserts.assert_status_code_is_ok(create_response)
return create_response.json()

def create_from_store_async(
self, store_dict: Optional[Dict[str, Any]] = None, store_path: Optional[str] = None
self,
store_dict: Optional[Dict[str, Any]] = None,
store_path: Optional[str] = None,
model_store_format: Optional[str] = None,
) -> Dict[str, Any]:
payload = _store_payload(store_dict=store_dict, store_path=store_path)
payload = _store_payload(store_dict=store_dict, store_path=store_path, model_store_format=model_store_format)
create_response = self.create_from_store_raw_async(payload)
create_response.raise_for_status()
return create_response.json()
Expand Down Expand Up @@ -1223,7 +1229,9 @@ def is_ready():

def wait_on_task(self, async_task_response: Response):
task_id = async_task_response.json()["id"]
return self.wait_on_task_id(task_id)

def wait_on_task_id(self, task_id: str):
def state():
state_response = self._get(f"tasks/{task_id}/state")
state_response.raise_for_status()
Expand Down Expand Up @@ -1324,6 +1332,27 @@ def new_page_payload(
)
return request

def export_history_to_uri_async(
self, history_id: str, target_uri: str, model_store_format: str = "tgz", include_files: bool = True
):
url = f"histories/{history_id}/write_store"
download_response = self._post(
url,
dict(target_uri=target_uri, include_files=include_files, model_store_format=model_store_format),
json=True,
)
api_asserts.assert_status_code_is_ok(download_response)
task_ok = self.wait_on_task(download_response)
assert task_ok, f"Task: Writing history to {target_uri} task failed"

def import_history_from_uri_async(self, target_uri: str, model_store_format: str):
import_async_response = self.create_from_store_async(
store_path=target_uri, model_store_format=model_store_format
)
task_id = import_async_response["id"]
task_ok = self.wait_on_task_id(task_id)
assert task_ok, f"Task: Import history from {target_uri} failed"


class GalaxyInteractorHttpMixin:
galaxy_interactor: ApiTestInteractor
Expand Down
47 changes: 46 additions & 1 deletion test/integration/test_history_import_export.py
@@ -1,4 +1,5 @@
import tarfile
from uuid import uuid4

from galaxy.model.unittest_utils.store_fixtures import (
deferred_hda_model_store_dict,
Expand All @@ -12,6 +13,7 @@
DatasetCollectionPopulator,
DatasetPopulator,
)
from galaxy_test.driver.integration_setup import PosixFileSourceSetup
from galaxy_test.driver.integration_util import IntegrationTestCase


Expand All @@ -29,10 +31,17 @@ def setUp(self):
self._set_up_populators()


class TestImportExportHistoryViaTasksIntegration(ImportExportTests, IntegrationTestCase, UsesCeleryTasks):
class TestImportExportHistoryViaTasksIntegration(
ImportExportTests, IntegrationTestCase, UsesCeleryTasks, PosixFileSourceSetup
):
task_based = True
framework_tool_and_types = True

@classmethod
def handle_galaxy_config_kwds(cls, config):
PosixFileSourceSetup.handle_galaxy_config_kwds(config, cls)
UsesCeleryTasks.handle_galaxy_config_kwds(config)

def setUp(self):
super().setUp()
self._set_up_populators()
Expand All @@ -48,6 +57,42 @@ def test_import_from_model_store_async(self):
"task based import history",
)

def test_import_model_store_from_file_source_async_with_format(self):
history_name = f"for_export_format_async_{uuid4()}"
history_id = self.dataset_populator.setup_history_for_export_testing(history_name)
# Add bam dataset to test metadata generation on import
self.dataset_populator.new_dataset(
history_id, content=open(self.test_data_resolver.get_filename("1.bam"), "rb"), file_type="bam", wait=True
)
model_store_format = "rocrate.zip"
target_uri = f"gxfiles://posix_test/history.{model_store_format}"

self.dataset_populator.export_history_to_uri_async(history_id, target_uri, model_store_format)
self.dataset_populator.import_history_from_uri_async(target_uri, model_store_format)

last_history = self._get("histories?limit=1").json()
assert len(last_history) == 1
imported_history = last_history[0]
imported_history_id = imported_history["id"]
assert imported_history_id != history_id
assert imported_history["name"] == history_name
self.dataset_populator.wait_for_history(imported_history_id)
history_contents = self.dataset_populator.get_history_contents(imported_history_id)
assert len(history_contents) == 3
# Only deleted datasets should appear as "discarded"
for dataset in history_contents:
if dataset["deleted"] is True:
assert dataset["state"] == "discarded"
else:
assert dataset["state"] == "ok"
# Check metadata generation
if dataset["extension"] == "bam":
imported_bam_details = self.dataset_populator.get_history_dataset_details(
imported_history_id, dataset_id=dataset["id"]
)
bai_metadata = imported_bam_details["meta_files"][0]
assert bai_metadata["file_type"] == "bam_index"


class TestImportExportHistoryContentsViaTasksIntegration(IntegrationTestCase, UsesCeleryTasks):
dataset_populator: DatasetPopulator
Expand Down