From bc8c4161b360e2b3158d9e9a4a2117c2242be9e7 Mon Sep 17 00:00:00 2001 From: Mohammad Alisafaee Date: Mon, 7 Nov 2022 23:07:05 +0100 Subject: [PATCH] models --- renku/command/graph.py | 25 +++-- renku/command/schema/activity.py | 30 +++++- renku/command/schema/workflow_file.py | 52 ++++++++++ renku/core/workflow/execute.py | 14 ++- renku/core/workflow/workflow_file.py | 36 ++++--- renku/domain_model/provenance/activity.py | 54 +++++++++- renku/domain_model/workflow/plan.py | 14 +-- renku/domain_model/workflow/workflow_file.py | 102 +++++++++++++++++++ renku/ui/cli/graph.py | 2 +- tests/fixtures/domain_models.py | 2 +- 10 files changed, 285 insertions(+), 46 deletions(-) create mode 100644 renku/command/schema/workflow_file.py create mode 100644 renku/domain_model/workflow/workflow_file.py diff --git a/renku/command/graph.py b/renku/command/graph.py index 6a482b46fa..336f92366f 100644 --- a/renku/command/graph.py +++ b/renku/command/graph.py @@ -21,11 +21,12 @@ from typing import Dict, List, Set, Union from renku.command.command_builder.command import Command, inject -from renku.command.schema.activity import ActivitySchema +from renku.command.schema.activity import ActivitySchema, WorkflowFileActivityCollectionSchema from renku.command.schema.composite_plan import CompositePlanSchema from renku.command.schema.dataset import DatasetSchema, DatasetTagSchema from renku.command.schema.plan import PlanSchema from renku.command.schema.project import ProjectSchema +from renku.command.schema.workflow_file import WorkflowFileCompositePlanSchema, WorkflowFilePlanSchema from renku.command.view_model.graph import GraphViewModel from renku.core import errors from renku.core.interface.activity_gateway import IActivityGateway @@ -37,9 +38,10 @@ from renku.core.util.urls import get_host from renku.domain_model.dataset import Dataset, DatasetTag from renku.domain_model.project import Project -from renku.domain_model.provenance.activity import Activity +from renku.domain_model.provenance.activity import Activity, WorkflowFileActivityCollection from renku.domain_model.workflow.composite_plan import CompositePlan from renku.domain_model.workflow.plan import AbstractPlan, Plan +from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan, WorkflowFilePlan try: import importlib_resources @@ -153,9 +155,14 @@ def get_graph_for_all_objects( """ project = project_gateway.get_project() # NOTE: Include deleted activities when exporting graph - objects: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan]] = activity_gateway.get_all_activities( - include_deleted=True - ) + objects: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan, WorkflowFileActivityCollection]] + + objects = activity_gateway.get_all_activities(include_deleted=True) + + workflow_file_executions = [ + a for a in activity_gateway.get_all_activity_collections() if isinstance(a, WorkflowFileActivityCollection) + ] + objects.extend(workflow_file_executions) processed_plans = set() @@ -184,7 +191,8 @@ def get_graph_for_all_objects( def _convert_entities_to_graph( - entities: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan]], project: Project + entities: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan, WorkflowFileActivityCollection]], + project: Project, ) -> List[Dict]: """Convert entities to JSON-LD graph. @@ -201,8 +209,11 @@ def _convert_entities_to_graph( Dataset: DatasetSchema, DatasetTag: DatasetTagSchema, Activity: ActivitySchema, + WorkflowFilePlan: WorkflowFilePlanSchema, Plan: PlanSchema, + WorkflowFileCompositePlan: WorkflowFileCompositePlanSchema, CompositePlan: CompositePlanSchema, + WorkflowFileActivityCollection: WorkflowFileActivityCollectionSchema, } processed_plans = set() @@ -222,7 +233,7 @@ def _convert_entities_to_graph( schema = next(s for t, s in schemas.items() if isinstance(entity, t)) graph.extend(schema(flattened=True).dump(entity)) - if not isinstance(entity, Activity): + if not isinstance(entity, (Activity, WorkflowFileActivityCollection)): continue # NOTE: mark activity plans as processed diff --git a/renku/command/schema/activity.py b/renku/command/schema/activity.py index fe4d9141db..ea67a18600 100644 --- a/renku/command/schema/activity.py +++ b/renku/command/schema/activity.py @@ -24,7 +24,14 @@ from renku.command.schema.calamus import JsonLDSchema, Nested, fields, oa, prov, renku, schema from renku.command.schema.entity import CollectionSchema, EntitySchema from renku.command.schema.plan import PlanSchema -from renku.domain_model.provenance.activity import Activity, Association, Generation, Usage +from renku.command.schema.workflow_file import WorkflowFileCompositePlanSchema, WorkflowFilePlanSchema +from renku.domain_model.provenance.activity import ( + Activity, + Association, + Generation, + Usage, + WorkflowFileActivityCollection, +) from renku.domain_model.provenance.parameter import ParameterValue @@ -63,7 +70,7 @@ class Meta: agent = Nested(prov.agent, [SoftwareAgentSchema, PersonSchema]) id = fields.Id() - plan = Nested(prov.hadPlan, PlanSchema) + plan = Nested(prov.hadPlan, [PlanSchema, WorkflowFilePlanSchema, WorkflowFileCompositePlanSchema]) @pre_dump def _pre_dump(self, obj, **kwargs): @@ -164,3 +171,22 @@ class Meta: def _pre_dump(self, obj, **kwargs): """Pre-dump hook.""" return _fix_id(obj) + + +class WorkflowFileActivityCollectionSchema(JsonLDSchema): + """WorkflowFileActivityCollection schema.""" + + class Meta: + """Meta class.""" + + rdf_type = prov.Activity + model = WorkflowFileActivityCollection + unknown = EXCLUDE + + activities = Nested(schema.hasPart, ActivitySchema, many=True) + agents = Nested(prov.wasAssociatedWith, [PersonSchema, SoftwareAgentSchema], many=True) + association = Nested(prov.qualifiedAssociation, AssociationSchema) + ended_at_time = fields.DateTime(prov.endedAtTime, add_value_types=True) + id = fields.Id() + project_id = fields.IRI(renku.hasActivityCollection, reverse=True) # TODO + started_at_time = fields.DateTime(prov.startedAtTime, add_value_types=True) diff --git a/renku/command/schema/workflow_file.py b/renku/command/schema/workflow_file.py new file mode 100644 index 0000000000..9fe63548c6 --- /dev/null +++ b/renku/command/schema/workflow_file.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2018-2022 - Swiss Data Science Center (SDSC) +# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and +# Eidgenössische Technische Hochschule Zürich (ETHZ). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Represent workflow file run templates.""" + +import marshmallow + +from renku.command.schema.calamus import fields, prov, renku, schema +from renku.command.schema.composite_plan import CompositePlanSchema +from renku.command.schema.plan import PlanSchema +from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan, WorkflowFilePlan + +MAX_GENERATED_NAME_LENGTH = 25 + + +class WorkflowFilePlanSchema(PlanSchema): + """WorkflowFilePlan schema.""" + + class Meta: + """Meta class.""" + + rdf_type = [prov.Plan, schema.Action, schema.CreativeWork, renku.Plan, renku.WorkflowFilePlan] + model = WorkflowFilePlan + unknown = marshmallow.EXCLUDE + + +class WorkflowFileCompositePlanSchema(CompositePlanSchema): + """Plan schema.""" + + class Meta: + """Meta class.""" + + rdf_type = [prov.Plan, schema.Action, schema.CreativeWork, renku.CompositePlan, renku.WorkflowFileCompositePlan] + model = WorkflowFileCompositePlan + unknown = marshmallow.EXCLUDE + + path = fields.String(prov.atLocation) + plans = fields.Nested(renku.hasSubprocess, WorkflowFilePlanSchema, many=True) diff --git a/renku/core/workflow/execute.py b/renku/core/workflow/execute.py index 70a036d6a4..56fbe28731 100644 --- a/renku/core/workflow/execute.py +++ b/renku/core/workflow/execute.py @@ -37,8 +37,9 @@ from renku.core.workflow.plan_factory import delete_indirect_files_list from renku.core.workflow.value_resolution import ValueResolver from renku.domain_model.project_context import project_context -from renku.domain_model.provenance.activity import Activity, ActivityCollection +from renku.domain_model.provenance.activity import Activity, ActivityCollection, WorkflowFileActivityCollection from renku.domain_model.workflow.plan import AbstractPlan +from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan if TYPE_CHECKING: from networkx import DiGraph @@ -51,6 +52,7 @@ def execute_workflow_graph( plan_gateway: IPlanGateway, provider="cwltool", config=None, + workflow_file_plan: Optional[WorkflowFileCompositePlan] = None, ): """Execute a Run with/without subprocesses. @@ -78,6 +80,8 @@ def execute_workflow_graph( if config: config = safe_read_yaml(config) + # TODO: Set start and end time for each step -> Create story + started_at_time = local_now() execute(dag=dag, basedir=project_context.path, provider=provider, config=config) @@ -91,7 +95,6 @@ def execute_workflow_graph( original_plan = plan_gateway.get_by_id(plan.id) # NOTE: Workflow files don't have an original plan - # TODO: Make sure that plan ID is consistent or we create new plans everytime that we execute a workflow file. if not original_plan: original_plan = plan @@ -105,7 +108,12 @@ def execute_workflow_graph( activity_gateway.add(activity) activities.append(activity) - if len(activities) > 1: + if workflow_file_plan: + activity_collection = WorkflowFileActivityCollection.from_activities( + activities=activities, plan=workflow_file_plan + ) + activity_gateway.add_activity_collection(activity_collection) + elif len(activities) > 1: activity_collection = ActivityCollection(activities=activities) activity_gateway.add_activity_collection(activity_collection) diff --git a/renku/core/workflow/workflow_file.py b/renku/core/workflow/workflow_file.py index 7827d4c3a2..a987292bb3 100644 --- a/renku/core/workflow/workflow_file.py +++ b/renku/core/workflow/workflow_file.py @@ -27,13 +27,14 @@ import bashlex from bashlex.ast import node as BashlexNode +from renku.command.command_builder import inject from renku.command.view_model.composite_plan import CompositePlanViewModel from renku.core import errors +from renku.core.interface.project_gateway import IProjectGateway from renku.core.util import communication from renku.core.util.os import safe_read_yaml -from renku.domain_model.workflow.composite_plan import CompositePlan from renku.domain_model.workflow.parameter import CommandInput, CommandOutput, CommandParameter, MappedIOStream -from renku.domain_model.workflow.plan import WorkflowFilePlan +from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan, WorkflowFilePlan class WorkflowFile: @@ -56,7 +57,9 @@ def __init__( @classmethod def load(cls, path: Union[Path, str]) -> WorkflowFile: """Read content from a given path.""" - path = Path(path).resolve() + # NOTE: Path should be relative to project's root + # TODO: Validate path + path = Path(path) data = safe_read_yaml(path=path) return cls( @@ -82,19 +85,23 @@ def load_from_string(cls, data: str) -> WorkflowFile: keywords=yaml_data.get("keywords"), ) - def to_plan(self) -> CompositePlan: + @inject.autoparams("project_gateway") + def to_plan(self, project_gateway: IProjectGateway) -> WorkflowFileCompositePlan: """Convert a workflow file to a CompositePlan.""" - return CompositePlan( - # derived_from: Optional[str] = None, # TODO: Is this needed!? - description=self.description, - id=CompositePlan.generate_id(), + project_id = project_gateway.get_project().id + + return WorkflowFileCompositePlan( date_created=None, + derived_from=None, # TODO: Create a chain when executing WFF + description=self.description, + id=WorkflowFileCompositePlan.generate_id(path=self.path), keywords=self.keywords.copy(), # links: Optional[List[ParameterLink]] = None, # TODO # mappings: Optional[List[ParameterMapping]] = None, # TODO name=self.name, - plans=[s.to_plan() for s in self.steps], - # project_id: Optional[str] = None, # TODO: What should we use here!? + path=self.path, + plans=[s.to_plan(path=self.path, project_id=project_id) for s in self.steps], + project_id=project_id, ) @@ -143,20 +150,21 @@ def from_workflow_step(cls, data: Dict[str, Any]) -> Step: keywords=data.get("keywords"), ) - def to_plan(self) -> WorkflowFilePlan: + def to_plan(self, path: Path, project_id: str) -> WorkflowFilePlan: """Convert a step to a WorkflowFilePlan.""" - id = WorkflowFilePlan.generate_id() # TODO: Generate a consistent ID + id = WorkflowFilePlan.generate_id(path=path, name=self.name) return WorkflowFilePlan( parameters=[p.to_command_parameter(plan_id=id) for p in self.parameters], command=self.command, + derived_from=None, # TODO: Create a chain when executing WFF description=self.description, id=id, inputs=[i.to_command_input(plan_id=id) for i in self.inputs], date_created=None, keywords=self.keywords.copy(), name=self.name, - # project_id=, # TODO + project_id=project_id, outputs=[o.to_command_output(plan_id=id) for o in self.outputs], success_codes=self.success_codes.copy(), # annotations=, # TODO @@ -576,6 +584,6 @@ def run_workflow_file(path: Union[Path, str]) -> CompositePlanViewModel: workflow = WorkflowFile.load(path=path).to_plan() graph = ExecutionGraph(workflows=[workflow], virtual_links=True) - execute_workflow_graph(dag=graph.workflow_graph, provider="local") + execute_workflow_graph(dag=graph.workflow_graph, provider="local", workflow_file_plan=workflow) return CompositePlanViewModel.from_composite_plan(workflow) diff --git a/renku/domain_model/provenance/activity.py b/renku/domain_model/provenance/activity.py index f66a395da6..5c284e1234 100644 --- a/renku/domain_model/provenance/activity.py +++ b/renku/domain_model/provenance/activity.py @@ -33,6 +33,7 @@ from renku.domain_model.provenance.annotation import Annotation from renku.domain_model.provenance.parameter import ParameterValue from renku.domain_model.workflow.plan import Plan +from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan from renku.infrastructure.database import Persistent from renku.infrastructure.immutable import Immutable from renku.infrastructure.repository import Repository @@ -42,10 +43,10 @@ class Association: """Assign responsibility to an agent for an activity.""" - def __init__(self, *, agent: Union[Person, SoftwareAgent], id: str, plan: Plan): + def __init__(self, *, agent: Union[Person, SoftwareAgent], id: str, plan: Union[Plan, WorkflowFileCompositePlan]): self.agent: Union[Person, SoftwareAgent] = agent self.id: str = id - self.plan: Plan = plan + self.plan: Union[Plan, WorkflowFileCompositePlan] = plan @staticmethod def generate_id(activity_id: str) -> str: @@ -138,7 +139,6 @@ def from_plan( ended_at_time: datetime, annotations: List[Annotation] = None, id: Optional[str] = None, - update_commits=False, ): """Convert a ``Plan`` to a ``Activity``.""" from renku.core.plugin.pluginmanager import get_plugin_manager @@ -270,9 +270,53 @@ class ActivityCollection(Persistent): def __init__(self, *, activities: List[Activity], id: str = None): self.activities: List[Activity] = activities or [] - self.id: str = id or ActivityCollection.generate_id() + self.id: str = id or self.generate_id() @staticmethod def generate_id() -> str: - """Generate an identifier for an activity.""" + """Generate an identifier for an activity collection.""" return f"/activity-collection/{uuid4().hex}" + + +class WorkflowFileActivityCollection(ActivityCollection): + """Represent activities of a workflow file execution.""" + + def __init__( + self, + *, + activities: List[Activity], + agents: List[Union[Person, SoftwareAgent]], + association: Association, + ended_at_time: datetime, + id: str = None, + invalidated_at: Optional[datetime] = None, + started_at_time: datetime, + ): + super().__init__(activities=activities, id=id) + + self.agents: List[Union[Person, SoftwareAgent]] = agents + self.association: Association = association + self.ended_at_time: datetime = ended_at_time + self.invalidated_at: Optional[datetime] = invalidated_at + self.started_at_time: datetime = started_at_time + + @classmethod + def from_activities(cls, plan: WorkflowFileCompositePlan, activities: List[Activity]): + """Create an instance from a list of ``Activity``.""" + id = cls.generate_id() + association = Association(agent=activities[0].association.agent, id=Association.generate_id(id), plan=plan) + + return cls( + activities=activities, + agents=activities[0].agents.copy(), + association=association, + ended_at_time=max(a.ended_at_time for a in activities), + id=id, + invalidated_at=None, + started_at_time=min(a.started_at_time for a in activities), + ) + + @staticmethod + def generate_id() -> str: + """Generate an identifier.""" + return f"/workflow-file-activity-collection/{uuid4().hex}" diff --git a/renku/domain_model/workflow/plan.py b/renku/domain_model/workflow/plan.py index 8f620d57b6..74a3667090 100644 --- a/renku/domain_model/workflow/plan.py +++ b/renku/domain_model/workflow/plan.py @@ -84,7 +84,7 @@ def deleted(self) -> bool: return self.invalidated_at is not None @staticmethod - def generate_id(uuid: Optional[str] = None) -> str: + def generate_id(*, uuid: Optional[str] = None, **_) -> str: """Generate an identifier for Plan.""" uuid = uuid or uuid4().hex return f"/plans/{uuid}" @@ -381,18 +381,6 @@ def get_field_by_id(self, id: str) -> Union[CommandInput, CommandOutput, Command raise errors.ParameterError(f"Parameter {id} not found on plan {self.id}.") -class WorkflowFilePlan(Plan): - """Represent a Plan that is converted from a workflow file.""" - - # def to_argv(self, with_streams: bool = False, quote_string: bool = False) -> List[Any]: - # """Convert a Plan into argv list.""" - # return [self.command] - - def set_parameters_from_strings(self, params_strings: List[str]) -> None: - """Set parameters by parsing parameters strings.""" - raise NotImplementedError - - class PlanDetailsJson(marshmallow.Schema): """Serialize a plan to a response object.""" diff --git a/renku/domain_model/workflow/workflow_file.py b/renku/domain_model/workflow/workflow_file.py new file mode 100644 index 0000000000..4ef79214f4 --- /dev/null +++ b/renku/domain_model/workflow/workflow_file.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2017-2022 - Swiss Data Science Center (SDSC) +# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and +# Eidgenössische Technische Hochschule Zürich (ETHZ). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Models to represent a workflow file run templates.""" + +from __future__ import annotations + +import hashlib +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Union + +from renku.domain_model.provenance.agent import Person +from renku.domain_model.provenance.annotation import Annotation +from renku.domain_model.workflow.composite_plan import CompositePlan +from renku.domain_model.workflow.parameter import ParameterLink, ParameterMapping +from renku.domain_model.workflow.plan import Plan + + +class WorkflowFileCompositePlan(CompositePlan): + """A workflow file composite plan.""" + + def __init__( + self, + *, + annotations: Optional[List[Annotation]] = None, + creators: Optional[List[Person]] = None, + date_created: Optional[datetime] = None, + derived_from: Optional[str] = None, + description: Optional[str] = None, + id: str, + invalidated_at: Optional[datetime] = None, + keywords: Optional[List[str]] = None, + links: Optional[List[ParameterLink]] = None, + mappings: Optional[List[ParameterMapping]] = None, + name: str, + path: Union[Path, str], + plans: List[WorkflowFilePlan], + project_id: Optional[str] = None, + ): + super().__init__( + annotations=annotations, + creators=creators, + date_created=date_created, + derived_from=derived_from, + description=description, + id=id, + invalidated_at=invalidated_at, + keywords=keywords, + links=links, + mappings=mappings, + name=name, + plans=plans, + project_id=project_id, + ) + + self.path: str = str(path) + + @staticmethod + def generate_id(path: Union[Path, str] = None, **_) -> str: + """Generate an identifier for Plan.""" + assert path, "Path is needed to generate id for WorkflowFileCompositePlan" + + key = str(path).encode("utf-8") + uuid = hashlib.sha256(key).hexdigest()[:32] + return CompositePlan.generate_id(uuid=uuid) + + +class WorkflowFilePlan(Plan): + """Represent a Plan that is converted from a workflow file.""" + + @staticmethod + def generate_id(path: Union[Path, str] = None, name: str = None, **_) -> str: + """Generate an identifier for Plan.""" + assert path, "Path is needed to generate id for WorkflowFilePlan" + assert name, "Name is needed to generate id for WorkflowFilePlan" + + key = f"{path}::{name}".encode("utf-8") + uuid = hashlib.sha256(key).hexdigest()[:32] + return Plan.generate_id(uuid=uuid) + + # def to_argv(self, with_streams: bool = False, quote_string: bool = False) -> List[Any]: + # """Convert a Plan into argv list.""" + # return [self.command] + + def set_parameters_from_strings(self, params_strings: List[str]) -> None: + """Set parameters by parsing parameters strings.""" + raise NotImplementedError diff --git a/renku/ui/cli/graph.py b/renku/ui/cli/graph.py index 1ff0c1e4a2..1b7c894439 100644 --- a/renku/ui/cli/graph.py +++ b/renku/ui/cli/graph.py @@ -115,7 +115,7 @@ def graph(): default="HEAD", help="Limit graph to changes done in revision (or range of revisions like 'A..B').", ) -@click.option("--full", is_flag=True, help="Generate full graph for project. Overrides --revision.") +@click.option("-f", "--full", is_flag=True, help="Generate full graph for project. Overrides --revision.") @click.option("--strict", is_flag=True, default=False, help="Validate triples before output.") @click.option( "--no-indent", is_flag=True, default=False, help="Format without indentation/pretty-printing (only for JSON-LD)." diff --git a/tests/fixtures/domain_models.py b/tests/fixtures/domain_models.py index 45b4885f9c..789b739380 100644 --- a/tests/fixtures/domain_models.py +++ b/tests/fixtures/domain_models.py @@ -133,7 +133,7 @@ def _create_composite(name="my-composite", num_steps=2): for i in range(num_steps): steps.append(plan_model(name=f"my-plan-{i}", identifier=uuid4().hex)) return CompositePlan( - id=CompositePlan.generate_id("2ecdb9651ea745a4a419272f2451457c"), + id=CompositePlan.generate_id(uuid="2ecdb9651ea745a4a419272f2451457c"), name=name, plans=steps, date_created=datetime.fromisoformat("2022-07-12T16:29:14+02:00"),