Skip to content

Commit

Permalink
Merge branch 'develop' into feature/2810-minimum-renku-version-property
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius committed Apr 27, 2022
2 parents 012e30b + fe1c2c7 commit 239ea7b
Show file tree
Hide file tree
Showing 29 changed files with 868 additions and 411 deletions.
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,11 @@
("py:class", "CommandResult"),
("py:class", "CommunicationCallback"),
("py:class", "DynamicProxy"),
("py:class", "IActivityGateway"),
("py:class", "IClientDispatcher"),
("py:class", "IDatabaseDispatcher"),
("py:class", "IDatasetGateway"),
("py:class", "IPlanGateway"),
("py:class", "LocalClient"),
("py:class", "OID_TYPE"),
("py:class", "Path"),
Expand Down
14 changes: 11 additions & 3 deletions helm-chart/renku-core/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ spec:
{{- include "certificates.volumes" $ | nindent 8 }}
initContainers:
{{- include "certificates.initContainer" $ | nindent 8 }}
securityContext:
{{- toYaml $.Values.podSecurityContext | nindent 8 }}
containers:
{{ if $.Values.metrics.enabled }}
- name: {{ $.Chart.Name}}-rqmetrics
image: "{{ $.Values.metrics.image.repository }}:{{ $.Values.metrics.image.tag }}"
imagePullPolicy: {{ $.Values.metrics.image.pullPolicy }}
securityContext:
{{- toYaml $.Values.securityContext | nindent 12 }}
env:
- name: RQ_REDIS_HOST
value: {{ $.Values.global.redis.host | quote }}
Expand All @@ -74,9 +78,7 @@ spec:
image: "{{ $version.image.repository }}:{{ $version.image.tag }}"
imagePullPolicy: {{ $version.image.pullPolicy }}
securityContext:
runAsUser: 1000
runAsGroup: 1000
allowPrivilegeEscalation: false
{{- toYaml $.Values.securityContext | nindent 12 }}
args: ["service", "api"]
env:
- name: REDIS_HOST
Expand Down Expand Up @@ -152,6 +154,8 @@ spec:
- name: {{ $.Chart.Name }}-datasets-workers
image: "{{ $version.image.repository }}:{{ $version.image.tag }}"
imagePullPolicy: {{ $version.image.pullPolicy }}
securityContext:
{{- toYaml $.Values.securityContext | nindent 12 }}
args: ["service", "worker"]
env:
- name: REDIS_HOST
Expand Down Expand Up @@ -206,6 +210,8 @@ spec:
- name: {{ $.Chart.Name }}-management-workers
image: "{{ $version.image.repository }}:{{ $version.image.tag }}"
imagePullPolicy: {{ $version.image.pullPolicy }}
securityContext:
{{- toYaml $.Values.securityContext | nindent 12 }}
args: ["service", "worker"]
env:
- name: REDIS_HOST
Expand Down Expand Up @@ -258,6 +264,8 @@ spec:
- name: {{ $.Chart.Name }}-scheduler
image: "{{ $version.image.repository }}:{{ $version.image.tag }}"
imagePullPolicy: {{ $version.image.pullPolicy }}
securityContext:
{{- toYaml $.Values.securityContext | nindent 12 }}
args: ["service", "scheduler"]
env:
- name: REDIS_HOST
Expand Down
8 changes: 8 additions & 0 deletions helm-chart/renku-core/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,11 @@ versions:
repository: renku/renku-core
tag: "v0.16.7"
pullPolicy: IfNotPresent

podSecurityContext:
runAsUser: 1000
runAsGroup: 1000
fsGroup: 100

securityContext:
allowPrivilegeEscalation: false
85 changes: 32 additions & 53 deletions renku/command/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,95 +18,74 @@
"""Renku ``status`` command."""

from collections import defaultdict
from typing import Dict, Set, Tuple
from pathlib import Path
from typing import Dict, Set

from renku.command.command_builder import inject
from renku.command.command_builder.command import Command
from renku.core.interface.activity_gateway import IActivityGateway
from renku.core.interface.client_dispatcher import IClientDispatcher
from renku.core.util.metadata import filter_overridden_activities, get_modified_activities
from renku.core.util.os import get_relative_path_to_cwd, get_relative_paths
from renku.domain_model.entity import Entity
from renku.domain_model.provenance.activity import Activity
from renku.core.workflow.activity import (
get_all_modified_and_deleted_activities_and_entities,
get_downstream_generating_activities,
is_activity_valid,
)


def get_status_command():
"""Show a status of the repository."""
return Command().command(_get_status).require_migration().require_clean().with_database(write=False)
return Command().command(_get_status).require_migration().with_database(write=False)


@inject.autoparams()
def _get_status(client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway, paths=None):
def get_dependant_activities_from(start_activity):
"""Return a set of activity and all its downstream activities.
Args:
start_activity: Root activity to start from.
Returns:
Root activity and all its downstream activities.
"""
all_activities = activity_gateway.get_downstream_activities(start_activity)
all_activities.add(start_activity)
return all_activities

def _get_status(ignore_deleted: bool, client_dispatcher: IClientDispatcher, paths=None):
def mark_generations_as_stale(activity):
for generation in activity.generations:
generation_path = get_relative_path_to_cwd(client.path / generation.entity.path)
stale_outputs[generation_path].add(usage_path)

client = client_dispatcher.current_client

paths = paths or []
paths = get_relative_paths(base=client.path, paths=paths)
ignore_deleted = ignore_deleted or client.get_value("renku", "update_ignore_delete")

modified, deleted = get_all_modified_and_deleted_activities_and_entities(client.repository)

modified, deleted = _get_modified_paths(activity_gateway=activity_gateway, repository=client.repository)
modified = {(a, e) for a, e in modified if is_activity_valid(a)}
deleted = {(a, e) for a, e in deleted if is_activity_valid(a)}

if not modified and not deleted:
return None, None, None, None

paths = paths or []
paths = get_relative_paths(base=client.path, paths=[Path.cwd() / p for p in paths])

modified_inputs: Set[str] = set()
stale_outputs: Dict[str, Set[str]] = defaultdict(set)
stale_activities: Dict[str, Set[str]] = defaultdict(set)

for start_activity, entity in modified:
usage_path = get_relative_path_to_cwd(client.path / entity.path)

activities = get_dependant_activities_from(start_activity)

if not paths or entity.path in paths: # add all downstream activities
# NOTE: Add all downstream activities if the modified entity is in paths; otherwise, add only activities that
# chain-generate at least one of the paths
generation_paths = [] if not paths or entity.path in paths else paths

activities = get_downstream_generating_activities(
starting_activities={start_activity},
paths=generation_paths,
ignore_deleted=ignore_deleted,
client_path=client.path,
)
if activities:
modified_inputs.add(usage_path)

for activity in activities:
if len(activity.generations) == 0:
stale_activities[activity.id].add(usage_path)
else:
mark_generations_as_stale(activity)
else:
for activity in activities:
if any(g.entity.path in paths for g in activity.generations):
modified_inputs.add(usage_path)
mark_generations_as_stale(activity)

deleted = {get_relative_path_to_cwd(client.path / d) for d in deleted if not paths or d in paths}

return stale_outputs, stale_activities, modified_inputs, deleted


def _get_modified_paths(activity_gateway, repository) -> Tuple[Set[Tuple[Activity, Entity]], Set[str]]:
"""Get modified and deleted usages/inputs of a list of activities.
Args:
activity_gateway: Activity gateway.
repository: Current ``Repository``.
Returns:
Tuple[Set[Tuple[Activity, Entity]], Set[str]]: Tuple of Activities with their modified paths
and deleted paths.
"""
all_activities = activity_gateway.get_all_activities()

relevant_activities = filter_overridden_activities(all_activities)

modified, deleted = get_modified_activities(activities=relevant_activities, repository=repository)
deleted_paths = {e.path for _, e in deleted}
deleted_paths = {get_relative_path_to_cwd(client.path / d) for d in deleted_paths if not paths or d in paths}

return modified, {e.path for _, e in deleted}
return stale_outputs, stale_activities, modified_inputs, deleted_paths
142 changes: 21 additions & 121 deletions renku/command/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@
# limitations under the License.
"""Renku ``update`` command."""

from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from typing import Optional

from renku.command.command_builder import inject
from renku.command.command_builder.command import Command
from renku.command.workflow import execute_workflow
from renku.core import errors
from renku.core.errors import ParameterError
from renku.core.interface.activity_gateway import IActivityGateway
from renku.core.interface.client_dispatcher import IClientDispatcher
from renku.core.interface.plan_gateway import IPlanGateway
from renku.core.util.metadata import add_activity_if_recent, filter_overridden_activities, get_modified_activities
from renku.core.util.os import get_relative_paths
from renku.core.workflow.activity import sort_activities
from renku.core.workflow.activity import (
get_all_modified_and_deleted_activities_and_entities,
get_downstream_generating_activities,
is_activity_valid,
sort_activities,
)
from renku.core.workflow.concrete_execution_graph import ExecutionGraph
from renku.domain_model.provenance.activity import Activity
from renku.domain_model.workflow.plan import AbstractPlan


def update_command():
Expand All @@ -44,10 +43,10 @@ def update_command():

@inject.autoparams()
def _update(
update_all,
dry_run,
update_all: bool,
dry_run: bool,
ignore_deleted: bool,
client_dispatcher: IClientDispatcher,
activity_gateway: IActivityGateway,
provider: str,
config: Optional[str],
paths=None,
Expand All @@ -60,10 +59,18 @@ def _update(
client = client_dispatcher.current_client

paths = paths or []
paths = get_relative_paths(base=client.path, paths=paths)
paths = get_relative_paths(base=client.path, paths=[Path.cwd() / p for p in paths])

modified_activities, modified_paths = _get_modified_activities_and_paths(client.repository, activity_gateway)
activities = _get_downstream_activities(modified_activities, activity_gateway, paths)
modified, _ = get_all_modified_and_deleted_activities_and_entities(client.repository)
modified_activities = {a for a, _ in modified if is_activity_valid(a)}
modified_paths = {e.path for _, e in modified}

activities = get_downstream_generating_activities(
starting_activities=modified_activities,
paths=paths,
ignore_deleted=ignore_deleted,
client_path=client.path,
)

if len(activities) == 0:
raise errors.NothingToExecuteError()
Expand All @@ -75,110 +82,3 @@ def _update(

graph = ExecutionGraph([a.plan_with_values for a in activities], virtual_links=True)
execute_workflow(dag=graph.workflow_graph, provider=provider, config=config)


@inject.autoparams()
def _is_activity_valid(activity: Activity, plan_gateway: IPlanGateway, client_dispatcher: IClientDispatcher) -> bool:
"""Return whether this plan is current and has not been deleted.
Args:
activity(Activity): The Activity whose Plan should be checked.
plan_gateway(IPlanGateway): The injected Plan gateway.
client_dispatcher(IClientDispatcher): The injected client dispatcher.
Returns:
bool: True if the activities' Plan is still valid, False otherwise.
"""
client = client_dispatcher.current_client

for usage in activity.usages:
if not (client.path / usage.entity.path).exists():
return False

plan = activity.association.plan

if plan.invalidated_at is not None:
return False

# get newest with same name
newest_plan = plan_gateway.get_by_name(plan.name)

if newest_plan is None or newest_plan.invalidated_at is not None:
return False

all_plans = plan_gateway.get_all_plans()

derived: Optional[AbstractPlan] = plan
while derived:
plan = derived
derived = next((p for p in all_plans if p.derived_from is not None and p.derived_from == plan.id), None)

return plan.invalidated_at is None


def _get_modified_activities_and_paths(repository, activity_gateway) -> Tuple[Set[Activity], Set[str]]:
"""Return latest activities that one of their inputs is modified.
Args:
repository: The current ``Repository``.
activity_gateway: The injected Activity gateway.
Returns:
Tuple[Set[Activity],Set[str]]: Tuple of modified activites and modified paths.
"""
all_activities = activity_gateway.get_all_activities()
relevant_activities = filter_overridden_activities(all_activities)
modified, _ = get_modified_activities(activities=list(relevant_activities), repository=repository)
return {a for a, _ in modified if _is_activity_valid(a)}, {e.path for _, e in modified}


def _get_downstream_activities(
starting_activities: Set[Activity], activity_gateway: IActivityGateway, paths: List[str]
) -> List[Activity]:
"""Return activities downstream of passed activities.
Args:
starting_activities(Set[Activity]): Activities to use as starting/upstream nodes.
activity_gateway(IActivityGateway): The injected Activity gateway.
paths(List[str]): Optional gnerated paths to end downstream chains at.
Returns:
Set[Activity]: All activites and their downstream activities.
"""
all_activities: Dict[str, Set[Activity]] = defaultdict(set)

def include_newest_activity(activity):
existing_activities = all_activities[activity.association.plan.id]
add_activity_if_recent(activity=activity, activities=existing_activities)

def does_activity_generate_any_paths(activity):
is_same = any(g.entity.path in paths for g in activity.generations)
is_parent = any(Path(p) in Path(g.entity.path).parents for p in paths for g in activity.generations)

return is_same or is_parent

for activity in starting_activities:
downstream_chains = activity_gateway.get_downstream_activity_chains(activity)

if paths:
# NOTE: Add the activity to check if it also matches the condition
downstream_chains.append((activity,))
downstream_chains = [c for c in downstream_chains if does_activity_generate_any_paths(c[-1])]

# NOTE: Include activity only if any of its downstream match the condition
if downstream_chains:
include_newest_activity(activity)
else:
include_newest_activity(activity)

for chain in downstream_chains:
for activity in chain:
if not _is_activity_valid(activity):
# don't process further downstream activities as the plan in question was deleted
break
include_newest_activity(activity)

return list({a for activities in all_activities.values() for a in activities})
Loading

0 comments on commit 239ea7b

Please sign in to comment.