Skip to content

Commit

Permalink
Merge pull request #15140 from davelopez/22.05_backport_#14989_and_#1…
Browse files Browse the repository at this point in the history
…5090

[22.05] Backport fixes #14989 and #15090
  • Loading branch information
mvdbeek committed Dec 8, 2022
2 parents fa21013 + e180490 commit 0775ba2
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 43 deletions.
3 changes: 2 additions & 1 deletion lib/galaxy/files/uris.py
Expand Up @@ -47,13 +47,14 @@ def stream_url_to_file(
file_sources: Optional["ConfiguredFileSources"] = None,
prefix: str = "gx_file_stream",
dir: Optional[str] = None,
user_context=None,
) -> str:
temp_name: str
if file_sources and file_sources.looks_like_uri(path):
file_source_path = file_sources.get_file_source_path(path)
with tempfile.NamedTemporaryFile(prefix=prefix, delete=False, dir=dir) as temp:
temp_name = temp.name
file_source_path.file_source.realize_to(file_source_path.path, temp_name)
file_source_path.file_source.realize_to(file_source_path.path, temp_name, user_context=user_context)
elif path.startswith("base64://"):
with tempfile.NamedTemporaryFile(prefix=prefix, delete=False, dir=dir) as temp:
temp_name = temp.name
Expand Down
68 changes: 51 additions & 17 deletions lib/galaxy/managers/model_stores.py
Expand Up @@ -3,7 +3,9 @@
from galaxy import model
from galaxy.exceptions import RequestParameterInvalidException
from galaxy.jobs.manager import JobManager
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.histories import HistoryManager
from galaxy.managers.users import UserManager
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.model.store import (
ImportDiscardedDataType,
Expand All @@ -29,6 +31,28 @@
)


class ModelStoreUserContext(ProvidesUserContext):
def __init__(self, app: MinimalManagerApp, user: model.User) -> None:
self._app = app
self._user = user

@property
def app(self):
return self._app

@property
def url_builder(self):
raise NotImplementedError("URL builder not available in ModelStore context.")

def get_user(self):
return self._user

def set_user(self, user):
raise NotImplementedError("Cannot change user from ModelStore context.")

user = property(get_user, set_user)


class ModelStoreManager:
def __init__(
self,
Expand All @@ -37,12 +61,14 @@ def __init__(
sa_session: galaxy_scoped_session,
job_manager: JobManager,
short_term_storage_monitor: ShortTermStorageMonitor,
user_manager: UserManager,
):
self._app = app
self._sa_session = sa_session
self._job_manager = job_manager
self._history_manager = history_manager
self._short_term_storage_monitor = short_term_storage_monitor
self._user_manager = user_manager

def setup_history_export_job(self, request: SetupHistoryExportJob):
history_id = request.history_id
Expand Down Expand Up @@ -112,9 +138,13 @@ def write_invocation_to(self, request: WriteInvocationTo):
model_store_format = request.model_store_format
export_files = "symlink" if request.include_files else None
target_uri = request.target_uri
with model.store.get_export_store_factory(self._app, model_store_format, export_files=export_files)(
target_uri
) as export_store:
user_context = self._build_user_context(request.user.user_id)
with model.store.get_export_store_factory(
self._app,
model_store_format,
export_files=export_files,
user_context=user_context,
)(target_uri) as export_store:
invocation = self._sa_session.query(model.WorkflowInvocation).get(request.invocation_id)
export_store.export_workflow_invocation(
invocation, include_hidden=request.include_hidden, include_deleted=request.include_deleted
Expand All @@ -124,9 +154,10 @@ def write_history_content_to(self, request: WriteHistoryContentTo):
model_store_format = request.model_store_format
export_files = "symlink" if request.include_files else None
target_uri = request.target_uri
with model.store.get_export_store_factory(self._app, model_store_format, export_files=export_files)(
target_uri
) as export_store:
user_context = self._build_user_context(request.user.user_id)
with model.store.get_export_store_factory(
self._app, model_store_format, export_files=export_files, user_context=user_context
)(target_uri) as export_store:
if request.content_type == HistoryContentType.dataset:
hda = self._sa_session.query(model.HistoryDatasetAssociation).get(request.content_id)
export_store.add_dataset(hda)
Expand All @@ -140,34 +171,31 @@ def write_history_to(self, request: WriteHistoryTo):
model_store_format = request.model_store_format
export_files = "symlink" if request.include_files else None
target_uri = request.target_uri
with model.store.get_export_store_factory(self._app, model_store_format, export_files=export_files)(
target_uri
) as export_store:
user_context = self._build_user_context(request.user.user_id)
with model.store.get_export_store_factory(
self._app, model_store_format, export_files=export_files, user_context=user_context
)(target_uri) as export_store:
history = self._history_manager.by_id(request.history_id)
export_store.export_history(
history, include_hidden=request.include_hidden, include_deleted=request.include_deleted
)

def import_model_store(self, request: ImportModelStoreTaskRequest):
import_options = ImportOptions(
discarded_data=ImportDiscardedDataType.FORCE,
allow_library_creation=request.for_library,
)
history_id = request.history_id
if history_id:
history = self._sa_session.query(model.History).get(history_id)
else:
history = None
user_id = request.user.user_id
if user_id:
galaxy_user = self._sa_session.query(model.User).get(user_id)
else:
galaxy_user = None
user_context = self._build_user_context(request.user.user_id)
model_import_store = source_to_import_store(
request.source_uri,
self._app,
galaxy_user,
import_options,
model_store_format=request.model_store_format,
user_context=user_context,
)
new_history = history is None and not request.for_library
if new_history:
Expand All @@ -183,6 +211,11 @@ def import_model_store(self, request: ImportModelStoreTaskRequest):
)
return object_tracker

def _build_user_context(self, user_id: int):
user = self._user_manager.by_id(user_id)
user_context = ModelStoreUserContext(self._app, user)
return user_context


def create_objects_from_store(
app: MinimalManagerApp,
Expand All @@ -195,12 +228,13 @@ def create_objects_from_store(
discarded_data=ImportDiscardedDataType.FORCE,
allow_library_creation=for_library,
)
user_context = ModelStoreUserContext(app, galaxy_user) if galaxy_user is not None else None
model_import_store = source_to_import_store(
payload.store_content_uri or payload.store_dict,
app=app,
galaxy_user=galaxy_user,
import_options=import_options,
model_store_format=payload.model_store_format,
user_context=user_context,
)
new_history = history is None and not for_library
if new_history:
Expand Down
63 changes: 47 additions & 16 deletions lib/galaxy/model/store/__init__.py
@@ -1,6 +1,7 @@
import abc
import contextlib
import datetime
import logging
import os
import shutil
import tarfile
Expand Down Expand Up @@ -38,7 +39,10 @@
ObjectNotFound,
RequestParameterInvalidException,
)
from galaxy.files import ConfiguredFileSources
from galaxy.files import (
ConfiguredFileSources,
ProvidesUserFileSourcesUserContext,
)
from galaxy.files.uris import stream_url_to_file
from galaxy.model.mapping import GalaxyModelMapping
from galaxy.model.metadata import MetadataCollection
Expand All @@ -47,7 +51,9 @@
add_object_to_session,
get_object_session,
)
from galaxy.model.tags import GalaxyTagHandler
from galaxy.objectstore import ObjectStore
from galaxy.schema.schema import ModelStoreFormat
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.util import (
FILENAME_VALID_CHARS,
Expand All @@ -64,6 +70,8 @@
)
from ... import model

log = logging.getLogger(__name__)

ObjectKeyType = Union[str, int]

ATTRS_FILENAME_HISTORY = "history_attrs.txt"
Expand Down Expand Up @@ -96,6 +104,7 @@ class StoreAppProtocol(Protocol):
datatypes_registry: Registry
object_store: ObjectStore
security: IdEncodingHelper
tag_handler: GalaxyTagHandler
model: GalaxyModelMapping
file_sources: ConfiguredFileSources

Expand Down Expand Up @@ -582,9 +591,18 @@ def handle_dataset_object_edit(dataset_instance):
pass
if not self.import_options.allow_edit:
# external import, metadata files need to be regenerated (as opposed to extended metadata dataset import)
self.app.datatypes_registry.set_external_metadata_tool.regenerate_imported_metadata_if_needed(
dataset_instance, history, **regenerate_kwds
)
if self.app.datatypes_registry.set_external_metadata_tool:
self.app.datatypes_registry.set_external_metadata_tool.regenerate_imported_metadata_if_needed(
dataset_instance, history, **regenerate_kwds
)
else:
# Try to set metadata directly. @mvdbeek thinks we should only record the datasets
try:
if dataset_instance.has_metadata_files:
dataset_instance.datatype.set_meta(dataset_instance)
except Exception:
log.debug(f"Metadata setting failed on {dataset_instance}", exc_info=True)
dataset_instance.dataset.state = dataset_instance.dataset.states.FAILED_METADATA

if model_class == "HistoryDatasetAssociation":
if object_key in dataset_attrs:
Expand Down Expand Up @@ -1589,7 +1607,8 @@ def __init__(
export_files: Optional[str] = None,
strip_metadata_files: bool = True,
serialize_jobs: bool = True,
):
user_context=None,
) -> None:
"""
:param export_directory: path to export directory. Will be created if it does not exist.
:param app: Galaxy App or app-like object. Must be provided if `for_edit` and/or `serialize_dataset_objects` are True
Expand All @@ -1613,6 +1632,7 @@ def __init__(
sessionless = True
security = IdEncodingHelper(id_secret="randomdoesntmatter")

self.user_context = ProvidesUserFileSourcesUserContext(user_context)
self.file_sources = file_sources
self.serialize_jobs = serialize_jobs
self.sessionless = sessionless
Expand Down Expand Up @@ -2140,7 +2160,7 @@ def _finalize(self):
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(self.out_file)
file_source.write_from(file_source_path.path, self.out_file)
file_source.write_from(file_source_path.path, self.out_file, user_context=self.user_context)
shutil.rmtree(self.temp_output_dir)


Expand Down Expand Up @@ -2179,16 +2199,22 @@ def _finalize(self):
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(rval)
file_source.write_from(file_source_path.path, rval)
file_source.write_from(file_source_path.path, rval, user_context=self.user_context)
shutil.rmtree(self.temp_output_dir)


def get_export_store_factory(app, download_format: str, export_files=None) -> Callable[[str], ModelExportStore]:
def get_export_store_factory(
app,
download_format: str,
export_files=None,
user_context=None,
) -> Callable[[str], ModelExportStore]:
export_store_class: Union[Type[TarModelExportStore], Type[BagArchiveModelExportStore]]
export_store_class_kwds = {
"app": app,
"export_files": export_files,
"serialize_dataset_objects": False,
"user_context": user_context,
}
if download_format in ["tar.gz", "tgz"]:
export_store_class = TarModelExportStore
Expand Down Expand Up @@ -2237,10 +2263,11 @@ def imported_store_for_metadata(directory, object_store=None):
def source_to_import_store(
source: Union[str, dict],
app: StoreAppProtocol,
galaxy_user: Optional[model.User],
import_options: Optional[ImportOptions],
model_store_format: Optional[str] = None,
model_store_format: Optional[ModelStoreFormat] = None,
user_context=None,
) -> ModelImportStore:
galaxy_user = user_context.user if user_context else None
if isinstance(source, dict):
if model_store_format is not None:
raise Exception(
Expand All @@ -2255,10 +2282,14 @@ def source_to_import_store(
else:
source_uri: str = str(source)
delete = False
tag_handler = app.tag_handler.create_tag_handler_session()
if source_uri.startswith("file://"):
source_uri = source_uri[len("file://") :]
if "://" in source_uri:
source_uri = stream_url_to_file(source_uri, app.file_sources, prefix="gx_import_model_store")
user_context = ProvidesUserFileSourcesUserContext(user_context)
source_uri = stream_url_to_file(
source_uri, app.file_sources, prefix="gx_import_model_store", user_context=user_context
)
delete = True
target_path = source_uri
if target_path.endswith(".json"):
Expand All @@ -2273,21 +2304,21 @@ def source_to_import_store(
)
elif os.path.isdir(target_path):
model_import_store = get_import_model_store_for_directory(
target_path, import_options=import_options, app=app, user=galaxy_user
target_path, import_options=import_options, app=app, user=galaxy_user, tag_handler=tag_handler
)
else:
model_store_format = model_store_format or "tgz"
if model_store_format in ["tar.gz", "tgz", "tar"]:
model_store_format = model_store_format or ModelStoreFormat.TGZ
if ModelStoreFormat.is_compressed(model_store_format):
try:
temp_dir = mkdtemp()
target_dir = CompressedFile(target_path).extract(temp_dir)
finally:
if delete:
os.remove(target_path)
model_import_store = get_import_model_store_for_directory(
target_dir, import_options=import_options, app=app, user=galaxy_user
target_dir, import_options=import_options, app=app, user=galaxy_user, tag_handler=tag_handler
)
elif model_store_format in ["bag.gz", "bag.tar", "bag.zip"]:
elif ModelStoreFormat.is_bag(model_store_format):
model_import_store = BagArchiveImportModelStore(
target_path, import_options=import_options, app=app, user=galaxy_user
)
Expand Down
8 changes: 8 additions & 0 deletions lib/galaxy/schema/schema.py
Expand Up @@ -1291,6 +1291,14 @@ class ModelStoreFormat(str, Enum):
BAG_DOT_TAR = "bag.tar"
BAG_DOT_TGZ = "bag.tgz"

@classmethod
def is_compressed(cls, value: "ModelStoreFormat"):
return value in [cls.TAR_DOT_GZ, cls.TGZ, cls.TAR]

@classmethod
def is_bag(cls, value: "ModelStoreFormat"):
return value in [cls.BAG_DOT_TAR, cls.BAG_DOT_TGZ, cls.BAG_DOT_ZIP]


class StoreContentSource(Model):
store_content_uri: Optional[str]
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/schema/tasks.py
Expand Up @@ -8,6 +8,7 @@
from .schema import (
DatasetSourceType,
HistoryContentType,
ModelStoreFormat,
StoreExportPayload,
WriteStoreToPayload,
)
Expand Down Expand Up @@ -82,6 +83,7 @@ class ImportModelStoreTaskRequest(BaseModel):
history_id: Optional[int]
source_uri: str
for_library: bool
model_store_format: Optional[ModelStoreFormat]


class MaterializeDatasetInstanceTaskRequest(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/webapps/galaxy/services/histories.py
Expand Up @@ -278,6 +278,7 @@ def create_from_store_async(
user=trans.async_request_user,
source_uri=source_uri,
for_library=False,
model_store_format=payload.model_store_format,
)
result = import_model_store.delay(request=request)
return async_task_summary(result)
Expand Down

0 comments on commit 0775ba2

Please sign in to comment.