Skip to content

Commit

Permalink
models
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Nov 8, 2022
1 parent 7c13ce7 commit bc8c416
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 46 deletions.
25 changes: 18 additions & 7 deletions renku/command/graph.py
Expand Up @@ -21,11 +21,12 @@
from typing import Dict, List, Set, Union

from renku.command.command_builder.command import Command, inject
from renku.command.schema.activity import ActivitySchema
from renku.command.schema.activity import ActivitySchema, WorkflowFileActivityCollectionSchema
from renku.command.schema.composite_plan import CompositePlanSchema
from renku.command.schema.dataset import DatasetSchema, DatasetTagSchema
from renku.command.schema.plan import PlanSchema
from renku.command.schema.project import ProjectSchema
from renku.command.schema.workflow_file import WorkflowFileCompositePlanSchema, WorkflowFilePlanSchema
from renku.command.view_model.graph import GraphViewModel
from renku.core import errors
from renku.core.interface.activity_gateway import IActivityGateway
Expand All @@ -37,9 +38,10 @@
from renku.core.util.urls import get_host
from renku.domain_model.dataset import Dataset, DatasetTag
from renku.domain_model.project import Project
from renku.domain_model.provenance.activity import Activity
from renku.domain_model.provenance.activity import Activity, WorkflowFileActivityCollection
from renku.domain_model.workflow.composite_plan import CompositePlan
from renku.domain_model.workflow.plan import AbstractPlan, Plan
from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan, WorkflowFilePlan

try:
import importlib_resources
Expand Down Expand Up @@ -153,9 +155,14 @@ def get_graph_for_all_objects(
"""
project = project_gateway.get_project()
# NOTE: Include deleted activities when exporting graph
objects: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan]] = activity_gateway.get_all_activities(
include_deleted=True
)
objects: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan, WorkflowFileActivityCollection]]

objects = activity_gateway.get_all_activities(include_deleted=True)

workflow_file_executions = [
a for a in activity_gateway.get_all_activity_collections() if isinstance(a, WorkflowFileActivityCollection)
]
objects.extend(workflow_file_executions)

processed_plans = set()

Expand Down Expand Up @@ -184,7 +191,8 @@ def get_graph_for_all_objects(


def _convert_entities_to_graph(
entities: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan]], project: Project
entities: List[Union[Project, Dataset, DatasetTag, Activity, AbstractPlan, WorkflowFileActivityCollection]],
project: Project,
) -> List[Dict]:
"""Convert entities to JSON-LD graph.
Expand All @@ -201,8 +209,11 @@ def _convert_entities_to_graph(
Dataset: DatasetSchema,
DatasetTag: DatasetTagSchema,
Activity: ActivitySchema,
WorkflowFilePlan: WorkflowFilePlanSchema,
Plan: PlanSchema,
WorkflowFileCompositePlan: WorkflowFileCompositePlanSchema,
CompositePlan: CompositePlanSchema,
WorkflowFileActivityCollection: WorkflowFileActivityCollectionSchema,
}

processed_plans = set()
Expand All @@ -222,7 +233,7 @@ def _convert_entities_to_graph(
schema = next(s for t, s in schemas.items() if isinstance(entity, t))
graph.extend(schema(flattened=True).dump(entity))

if not isinstance(entity, Activity):
if not isinstance(entity, (Activity, WorkflowFileActivityCollection)):
continue

# NOTE: mark activity plans as processed
Expand Down
30 changes: 28 additions & 2 deletions renku/command/schema/activity.py
Expand Up @@ -24,7 +24,14 @@
from renku.command.schema.calamus import JsonLDSchema, Nested, fields, oa, prov, renku, schema
from renku.command.schema.entity import CollectionSchema, EntitySchema
from renku.command.schema.plan import PlanSchema
from renku.domain_model.provenance.activity import Activity, Association, Generation, Usage
from renku.command.schema.workflow_file import WorkflowFileCompositePlanSchema, WorkflowFilePlanSchema
from renku.domain_model.provenance.activity import (
Activity,
Association,
Generation,
Usage,
WorkflowFileActivityCollection,
)
from renku.domain_model.provenance.parameter import ParameterValue


Expand Down Expand Up @@ -63,7 +70,7 @@ class Meta:

agent = Nested(prov.agent, [SoftwareAgentSchema, PersonSchema])
id = fields.Id()
plan = Nested(prov.hadPlan, PlanSchema)
plan = Nested(prov.hadPlan, [PlanSchema, WorkflowFilePlanSchema, WorkflowFileCompositePlanSchema])

@pre_dump
def _pre_dump(self, obj, **kwargs):
Expand Down Expand Up @@ -164,3 +171,22 @@ class Meta:
def _pre_dump(self, obj, **kwargs):
"""Pre-dump hook."""
return _fix_id(obj)


class WorkflowFileActivityCollectionSchema(JsonLDSchema):
"""WorkflowFileActivityCollection schema."""

class Meta:
"""Meta class."""

rdf_type = prov.Activity
model = WorkflowFileActivityCollection
unknown = EXCLUDE

activities = Nested(schema.hasPart, ActivitySchema, many=True)
agents = Nested(prov.wasAssociatedWith, [PersonSchema, SoftwareAgentSchema], many=True)
association = Nested(prov.qualifiedAssociation, AssociationSchema)
ended_at_time = fields.DateTime(prov.endedAtTime, add_value_types=True)
id = fields.Id()
project_id = fields.IRI(renku.hasActivityCollection, reverse=True) # TODO
started_at_time = fields.DateTime(prov.startedAtTime, add_value_types=True)
52 changes: 52 additions & 0 deletions renku/command/schema/workflow_file.py
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
#
# Copyright 2018-2022 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent workflow file run templates."""

import marshmallow

from renku.command.schema.calamus import fields, prov, renku, schema
from renku.command.schema.composite_plan import CompositePlanSchema
from renku.command.schema.plan import PlanSchema
from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan, WorkflowFilePlan

MAX_GENERATED_NAME_LENGTH = 25


class WorkflowFilePlanSchema(PlanSchema):
"""WorkflowFilePlan schema."""

class Meta:
"""Meta class."""

rdf_type = [prov.Plan, schema.Action, schema.CreativeWork, renku.Plan, renku.WorkflowFilePlan]
model = WorkflowFilePlan
unknown = marshmallow.EXCLUDE


class WorkflowFileCompositePlanSchema(CompositePlanSchema):
"""Plan schema."""

class Meta:
"""Meta class."""

rdf_type = [prov.Plan, schema.Action, schema.CreativeWork, renku.CompositePlan, renku.WorkflowFileCompositePlan]
model = WorkflowFileCompositePlan
unknown = marshmallow.EXCLUDE

path = fields.String(prov.atLocation)
plans = fields.Nested(renku.hasSubprocess, WorkflowFilePlanSchema, many=True)
14 changes: 11 additions & 3 deletions renku/core/workflow/execute.py
Expand Up @@ -37,8 +37,9 @@
from renku.core.workflow.plan_factory import delete_indirect_files_list
from renku.core.workflow.value_resolution import ValueResolver
from renku.domain_model.project_context import project_context
from renku.domain_model.provenance.activity import Activity, ActivityCollection
from renku.domain_model.provenance.activity import Activity, ActivityCollection, WorkflowFileActivityCollection
from renku.domain_model.workflow.plan import AbstractPlan
from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan

if TYPE_CHECKING:
from networkx import DiGraph
Expand All @@ -51,6 +52,7 @@ def execute_workflow_graph(
plan_gateway: IPlanGateway,
provider="cwltool",
config=None,
workflow_file_plan: Optional[WorkflowFileCompositePlan] = None,
):
"""Execute a Run with/without subprocesses.
Expand Down Expand Up @@ -78,6 +80,8 @@ def execute_workflow_graph(
if config:
config = safe_read_yaml(config)

# TODO: Set start and end time for each step -> Create story

started_at_time = local_now()

execute(dag=dag, basedir=project_context.path, provider=provider, config=config)
Expand All @@ -91,7 +95,6 @@ def execute_workflow_graph(
original_plan = plan_gateway.get_by_id(plan.id)

# NOTE: Workflow files don't have an original plan
# TODO: Make sure that plan ID is consistent or we create new plans everytime that we execute a workflow file.
if not original_plan:
original_plan = plan

Expand All @@ -105,7 +108,12 @@ def execute_workflow_graph(
activity_gateway.add(activity)
activities.append(activity)

if len(activities) > 1:
if workflow_file_plan:
activity_collection = WorkflowFileActivityCollection.from_activities(
activities=activities, plan=workflow_file_plan
)
activity_gateway.add_activity_collection(activity_collection)
elif len(activities) > 1:
activity_collection = ActivityCollection(activities=activities)
activity_gateway.add_activity_collection(activity_collection)

Expand Down
36 changes: 22 additions & 14 deletions renku/core/workflow/workflow_file.py
Expand Up @@ -27,13 +27,14 @@
import bashlex
from bashlex.ast import node as BashlexNode

from renku.command.command_builder import inject
from renku.command.view_model.composite_plan import CompositePlanViewModel
from renku.core import errors
from renku.core.interface.project_gateway import IProjectGateway
from renku.core.util import communication
from renku.core.util.os import safe_read_yaml
from renku.domain_model.workflow.composite_plan import CompositePlan
from renku.domain_model.workflow.parameter import CommandInput, CommandOutput, CommandParameter, MappedIOStream
from renku.domain_model.workflow.plan import WorkflowFilePlan
from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan, WorkflowFilePlan


class WorkflowFile:
Expand All @@ -56,7 +57,9 @@ def __init__(
@classmethod
def load(cls, path: Union[Path, str]) -> WorkflowFile:
"""Read content from a given path."""
path = Path(path).resolve()
# NOTE: Path should be relative to project's root
# TODO: Validate path
path = Path(path)
data = safe_read_yaml(path=path)

return cls(
Expand All @@ -82,19 +85,23 @@ def load_from_string(cls, data: str) -> WorkflowFile:
keywords=yaml_data.get("keywords"),
)

def to_plan(self) -> CompositePlan:
@inject.autoparams("project_gateway")
def to_plan(self, project_gateway: IProjectGateway) -> WorkflowFileCompositePlan:
"""Convert a workflow file to a CompositePlan."""
return CompositePlan(
# derived_from: Optional[str] = None, # TODO: Is this needed!?
description=self.description,
id=CompositePlan.generate_id(),
project_id = project_gateway.get_project().id

return WorkflowFileCompositePlan(
date_created=None,
derived_from=None, # TODO: Create a chain when executing WFF
description=self.description,
id=WorkflowFileCompositePlan.generate_id(path=self.path),
keywords=self.keywords.copy(),
# links: Optional[List[ParameterLink]] = None, # TODO
# mappings: Optional[List[ParameterMapping]] = None, # TODO
name=self.name,
plans=[s.to_plan() for s in self.steps],
# project_id: Optional[str] = None, # TODO: What should we use here!?
path=self.path,
plans=[s.to_plan(path=self.path, project_id=project_id) for s in self.steps],
project_id=project_id,
)


Expand Down Expand Up @@ -143,20 +150,21 @@ def from_workflow_step(cls, data: Dict[str, Any]) -> Step:
keywords=data.get("keywords"),
)

def to_plan(self) -> WorkflowFilePlan:
def to_plan(self, path: Path, project_id: str) -> WorkflowFilePlan:
"""Convert a step to a WorkflowFilePlan."""
id = WorkflowFilePlan.generate_id() # TODO: Generate a consistent ID
id = WorkflowFilePlan.generate_id(path=path, name=self.name)

return WorkflowFilePlan(
parameters=[p.to_command_parameter(plan_id=id) for p in self.parameters],
command=self.command,
derived_from=None, # TODO: Create a chain when executing WFF
description=self.description,
id=id,
inputs=[i.to_command_input(plan_id=id) for i in self.inputs],
date_created=None,
keywords=self.keywords.copy(),
name=self.name,
# project_id=, # TODO
project_id=project_id,
outputs=[o.to_command_output(plan_id=id) for o in self.outputs],
success_codes=self.success_codes.copy(),
# annotations=, # TODO
Expand Down Expand Up @@ -576,6 +584,6 @@ def run_workflow_file(path: Union[Path, str]) -> CompositePlanViewModel:
workflow = WorkflowFile.load(path=path).to_plan()

graph = ExecutionGraph(workflows=[workflow], virtual_links=True)
execute_workflow_graph(dag=graph.workflow_graph, provider="local")
execute_workflow_graph(dag=graph.workflow_graph, provider="local", workflow_file_plan=workflow)

return CompositePlanViewModel.from_composite_plan(workflow)

0 comments on commit bc8c416

Please sign in to comment.