Skip to content

Commit

Permalink
First iteration about representing containers in RO-Crates following R…
Browse files Browse the repository at this point in the history
…esearchObject/workflow-run-crate#9 (comment)

Also, common.py has been thinned, moving several declarations to their "natural" places.

This has led to a major code reorganization, which has raised an issue unmarshalling some instances from the working directory state files. So, yaml loader has been taught how to deal with this mismatch.
  • Loading branch information
jmfernandez committed Oct 5, 2023
1 parent 77ce8f3 commit 5291cfc
Show file tree
Hide file tree
Showing 13 changed files with 561 additions and 338 deletions.
9 changes: 8 additions & 1 deletion wfexs_backend/abstract_docker_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
from .common import (
AbsPath,
AnyPath,
Container,
ContainerEngineVersionStr,
ContainerFileNamingMethod,
ContainerLocalConfig,
Expand All @@ -80,6 +79,10 @@
RelPath,
)

from .container import (
Container,
)

DockerLikeManifest: TypeAlias = Mapping[str, Any]
MutableDockerLikeManifest: TypeAlias = MutableMapping[str, Any]

Expand All @@ -94,10 +97,14 @@ class DockerManifestMetadata(TypedDict):
from .container import (
ContainerFactory,
ContainerFactoryException,
DOCKER_URI_PREFIX,
)
from .utils.digests import ComputeDigestFromObject


DOCKER_PROTO = DOCKER_URI_PREFIX + "//"


class AbstractDockerContainerFactory(ContainerFactory):
ACCEPTED_CONTAINER_TYPES = set(
(
Expand Down
4 changes: 1 addition & 3 deletions wfexs_backend/cache_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class CacheMetadataDict(TypedDict):
ContentKind,
DefaultNoLicenceTuple,
LicensedURI,
META_JSON_POSTFIX,
URIWithMetadata,
)

Expand Down Expand Up @@ -144,9 +145,6 @@ class CachedContent(NamedTuple):
fingerprint: "Optional[Fingerprint]" = None


META_JSON_POSTFIX = "_meta.json"


class CacheHandlerException(AbstractWfExSException):
pass

Expand Down
226 changes: 2 additions & 224 deletions wfexs_backend/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,156 +603,10 @@ class LocalWorkflow(NamedTuple):
relPathFiles: "Optional[Sequence[Union[RelPath, URIType]]]" = None


# This skeleton is here only for type mapping reasons
class AbstractWorkflowEngineType(abc.ABC):
@classmethod
@abc.abstractmethod
def MyWorkflowType(cls) -> "WorkflowType":
pass

@property
def workflowType(self) -> "WorkflowType":
return self.MyWorkflowType()

@abc.abstractmethod
def getConfiguredContainerType(self) -> "ContainerType":
pass

@property
def configuredContainerType(self) -> "ContainerType":
return self.getConfiguredContainerType()

@property
@abc.abstractmethod
def engine_url(self) -> "URIType":
pass

@abc.abstractmethod
def _get_engine_version_str(
self, matWfEng: "MaterializedWorkflowEngine"
) -> "WorkflowEngineVersionStr":
"""
It must return a string in the form of
"{symbolic engine name} {version}"
"""
pass

@abc.abstractmethod
def sideContainers(self) -> "Sequence[ContainerTaggedName]":
pass

@abc.abstractmethod
def materialize_containers(
self,
listOfContainerTags: "Sequence[ContainerTaggedName]",
containersDir: "AnyPath",
offline: "bool" = False,
) -> "Tuple[ContainerEngineVersionStr, Sequence[Container], ContainerOperatingSystem, ProcessorArchitecture]":
pass

@abc.abstractmethod
def deploy_containers(
self,
containers_list: "Sequence[Container]",
containersDir: "Optional[AnyPath]" = None,
force: "bool" = False,
) -> "Sequence[Container]":
pass

@property
@abc.abstractmethod
def staged_containers_dir(self) -> "AnyPath":
pass

@abc.abstractmethod
def materializeEngine(
self, localWf: "LocalWorkflow", engineVersion: "Optional[EngineVersion]" = None
) -> "Optional[MaterializedWorkflowEngine]":
pass

@abc.abstractmethod
def identifyWorkflow(
self, localWf: "LocalWorkflow", engineVer: "Optional[EngineVersion]" = None
) -> "Union[Tuple[EngineVersion, LocalWorkflow], Tuple[None, None]]":
"""
This method should return the effective engine version needed
to run it when this workflow engine recognizes the workflow type
"""
pass

@abc.abstractmethod
def materializeWorkflow(
self,
matWorfklowEngine: "MaterializedWorkflowEngine",
consolidatedWorkflowDir: "AbsPath",
offline: "bool" = False,
) -> "Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]]":
"""
Method to ensure the workflow has been materialized. It returns a
possibly updated materialized workflow engine, as well as the list of containers
For Nextflow it is usually a no-op, but for CWL it requires resolution
"""

pass

@abc.abstractmethod
def launchWorkflow(
self,
matWfEng: "MaterializedWorkflowEngine",
inputs: "Sequence[MaterializedInput]",
environment: "Sequence[MaterializedInput]",
outputs: "Sequence[ExpectedOutput]",
) -> "StagedExecution":
pass

@classmethod
@abc.abstractmethod
def FromStagedSetup(
cls,
staged_setup: "StagedSetup",
cache_dir: "Optional[AnyPath]" = None,
cache_workflow_dir: "Optional[AnyPath]" = None,
cache_workflow_inputs_dir: "Optional[AnyPath]" = None,
local_config: "Optional[EngineLocalConfig]" = None,
config_directory: "Optional[AnyPath]" = None,
) -> "AbstractWorkflowEngineType":
pass


if TYPE_CHECKING:
TRS_Workflow_Descriptor: TypeAlias = str


class WorkflowType(NamedTuple):
"""
engineName: symbolic name of the engine
shortname: short name used in the WfExS-backend configuration files
for the workflow language
name: Textual representation of the workflow language
clazz: Class implementing the engine invocation
uriMatch: The URI patterns used in RO-Crate to identify the workflow type
uriTemplate: The URI template to be used when RO-Crate ComputerLanguage is generated
url: The URL used in RO-Crate to represent the workflow language
trs_descriptor: The string used in GA4GH TRSv2 specification to define this workflow type
rocrate_programming_language: Traditional internal id in RO-Crate implementations used for this workflow type (to be deprecated)
"""

engineName: "str"
shortname: "str"
name: "str"
clazz: "Type[AbstractWorkflowEngineType]"
uriMatch: "Sequence[Union[Pattern[str], URIType]]"
uriTemplate: "URIType"
url: "URIType"
trs_descriptor: "TRS_Workflow_Descriptor"
rocrate_programming_language: "str"

@classmethod
def _value_fixes(cls) -> "Mapping[str, Optional[str]]":
return {"shortname": "trs_descriptor"}


class RepoType(enum.Enum):
Git = "git"
Raw = "raw"
Expand Down Expand Up @@ -844,52 +698,8 @@ def __repr__(self) -> "str":

DEFAULT_CONTAINER_TYPE = ContainerType.Singularity


@dataclass
class Container(ContainerTaggedName):
"""
origTaggedName: Symbolic name or identifier of the container
(including tag) which appears in the workflow.
type: Container type
registries:
taggedName: Symbolic name or identifier of the container (including tag)
localPath: The full local path to the container file (it can be None)
signature: Signature (aka file fingerprint) of the container
(sha256 or similar). It could be None outside Singularity solutions.
fingerprint: Server fingerprint of the container.
Mainly from docker registries.
metadataLocalPath: The full local path to the container metadata file (it can be None)
"""

taggedName: "URIType" = cast("URIType", "")
architecture: "Optional[ProcessorArchitecture]" = None
operatingSystem: "Optional[ContainerOperatingSystem]" = None
localPath: "Optional[AbsPath]" = None
signature: "Optional[Fingerprint]" = None
fingerprint: "Optional[Fingerprint]" = None
metadataLocalPath: "Optional[AbsPath]" = None


class MaterializedWorkflowEngine(NamedTuple):
"""
instance: Instance of the workflow engine
version: Version of the engine to be used
fingerprint: Fingerprint of the engine to be used (it could be the version)
engine_path: Absolute path to the fetched engine
workflow: Instance of LocalWorkflow
containers_path: Where the containers are going to be available for offline-execute
containers: List of Container instances (needed by workflow)
operational_containers: List of Container instances (needed by engine)
"""

instance: "AbstractWorkflowEngineType"
version: "EngineVersion"
fingerprint: "Union[Fingerprint, str]"
engine_path: "EnginePath"
workflow: "LocalWorkflow"
containers_path: "Optional[AbsPath]" = None
containers: "Optional[Sequence[Container]]" = None
operational_containers: "Optional[Sequence[Container]]" = None
# Postfix of metadata files (generated by the instances)
META_JSON_POSTFIX: "Final[str]" = "_meta.json"


class AbstractWfExSException(Exception):
Expand Down Expand Up @@ -983,38 +793,6 @@ class CratableItem(enum.IntFlag):
NoCratableItem = CratableItem(0)


class ExportItem(NamedTuple):
type: "ExportItemType"
block: "Optional[str]" = None
name: "Optional[Union[SymbolicParamName, SymbolicOutputName]]" = None


# The description of an export action
class ExportAction(NamedTuple):
action_id: "SymbolicName"
plugin_id: "SymbolicName"
what: "Sequence[ExportItem]"
context_name: "Optional[SymbolicName]"
setup: "Optional[SecurityContextConfig]"
preferred_scheme: "Optional[str]"
preferred_id: "Optional[str]"
licences: "Sequence[str]" = []


class MaterializedExportAction(NamedTuple):
"""
The description of an export action which was materialized, so
a permanent identifier was obtained, along with some metadata
"""

action: "ExportAction"
elems: "Sequence[AnyContent]"
pids: "Sequence[URIWithMetadata]"
when: "datetime.datetime" = datetime.datetime.now(
tz=datetime.timezone.utc
).astimezone()


class StagedExecution(NamedTuple):
"""
The description of the execution of a workflow, giving the relative directory of the output
Expand Down

0 comments on commit 5291cfc

Please sign in to comment.