Skip to content

Commit

Permalink
Merge branch 'feature/ssh-into-sessions' of github.com:SwissDataScien…
Browse files Browse the repository at this point in the history
…ceCenter/renku-python into feature/ssh-into-sessions
  • Loading branch information
Panaetius committed Feb 22, 2023
2 parents 4f55723 + 18d33e8 commit 6d7b64a
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 58 deletions.
7 changes: 4 additions & 3 deletions renku/command/checks/__init__.py
Expand Up @@ -17,7 +17,7 @@
# limitations under the License.
"""Define repository checks for :program:`renku doctor`."""

from .activities import check_migrated_activity_ids
from .activities import check_activity_dates, check_migrated_activity_ids
from .datasets import (
check_dataset_files_outside_datadir,
check_dataset_old_metadata_location,
Expand All @@ -30,7 +30,7 @@
from .project import check_project_id_group
from .storage import check_lfs_info
from .validate_shacl import check_datasets_structure, check_project_structure
from .workflow import check_activity_catalog, check_modification_date
from .workflow import check_activity_catalog, check_plan_modification_date

# Checks will be executed in the order as they are listed in __all__. They are mostly used in ``doctor`` command to
# inspect broken things. The order of operations matters when fixing issues, so, don't sort this list.
Expand All @@ -48,5 +48,6 @@
"check_missing_files",
"check_project_id_group",
"check_project_structure",
"check_modification_date",
"check_plan_modification_date",
"check_activity_dates",
)
58 changes: 58 additions & 0 deletions renku/command/checks/activities.py
Expand Up @@ -71,3 +71,61 @@ def check_migrated_activity_ids(fix, activity_gateway: IActivityGateway, **_):
)

return False, problems


@inject.autoparams("activity_gateway")
def check_activity_dates(fix, activity_gateway: IActivityGateway, **_):
"""Check activities have correct start/end/delete dates.
Args:
fix(bool): Whether to fix found issues.
activity_gateway(IActivityGateway): Injected ActivityGateway.
_: keyword arguments.
Returns:
Tuple[bool, Optional[str]]: Tuple of whether there are activities with invalid dates a string of the problem.
"""
invalid_activities = []

for activity in activity_gateway.get_all_activities(include_deleted=True):
plan = activity.association.plan
if (
activity.started_at_time < plan.date_created
or activity.ended_at_time < activity.started_at_time
or (activity.invalidated_at and activity.invalidated_at < activity.ended_at_time)
):
invalid_activities.append(activity)

if not invalid_activities:
return True, None
if not fix:
ids = [a.id for a in invalid_activities]
message = (
WARNING
+ "The following activity have incorrect start, end, or delete date (use 'renku doctor --fix' to fix them):"
+ "\n\t"
+ "\n\t".join(ids)
)
return False, message

fix_activity_dates(activities=invalid_activities)
project_context.database.commit()
communication.info("Activity dates were fixed")

return True, None


def fix_activity_dates(activities):
"""Fix activities' start/end/delete dates."""
for activity in activities:
plan = activity.association.plan
activity.unfreeze()
if activity.started_at_time < plan.date_created:
activity.started_at_time = plan.date_created

if activity.ended_at_time < activity.started_at_time:
activity.ended_at_time = activity.started_at_time

if activity.invalidated_at and activity.invalidated_at < activity.ended_at_time:
activity.invalidated_at = activity.ended_at_time
activity.freeze()
8 changes: 5 additions & 3 deletions renku/command/checks/workflow.py
Expand Up @@ -16,7 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Checks needed to determine integrity of workflows."""

from datetime import timedelta
from typing import List, Optional, Tuple, cast

from renku.command.command_builder import inject
Expand Down Expand Up @@ -64,7 +64,7 @@ def check_activity_catalog(fix, force, **_) -> Tuple[bool, Optional[str]]:


@inject.autoparams("plan_gateway")
def check_modification_date(fix, plan_gateway: IPlanGateway, **_) -> Tuple[bool, Optional[str]]:
def check_plan_modification_date(fix, plan_gateway: IPlanGateway, **_) -> Tuple[bool, Optional[str]]:
"""Check if all plans have modification date set for them.
Args:
Expand All @@ -88,7 +88,7 @@ def check_modification_date(fix, plan_gateway: IPlanGateway, **_) -> Tuple[bool,
ids = [plan.id for plan in to_be_processed]
message = (
WARNING
+ "The following workflows have incorrect modification date (use 'renku doctor --fix' to fix them).:\n\t"
+ "The following workflows have incorrect modification date (use 'renku doctor --fix' to fix them):\n\t"
+ "\n\t".join(ids)
)
return False, message
Expand Down Expand Up @@ -124,4 +124,6 @@ def fix_plan_dates(plans: List[AbstractPlan], plan_gateway):
plan.unfreeze()
plan.date_modified = plan.date_created
plan.date_created = creation_date
if plan.date_removed and plan.date_removed < plan.date_created:
plan.date_removed = plan.date_created + timedelta(seconds=1)
plan.freeze()
23 changes: 10 additions & 13 deletions renku/core/migration/m_0009__new_metadata_storage.py
Expand Up @@ -655,19 +655,16 @@ def _process_datasets(

for dataset in datasets:
dataset, tags = convert_dataset(dataset=dataset, revision=revision)
if is_last_commit:
datasets_provenance.update_during_migration(
dataset,
commit_sha=revision,
date=date,
tags=tags,
replace=True,
preserve_identifiers=preserve_identifiers,
)
else:
datasets_provenance.update_during_migration(
dataset, commit_sha=revision, date=date, tags=tags, preserve_identifiers=preserve_identifiers
)

datasets_provenance.update_during_migration(
dataset,
commit_sha=revision,
date=date,
tags=tags,
replace=True if is_last_commit else False,
preserve_identifiers=preserve_identifiers,
)

for dataset in deleted_datasets:
dataset, _ = convert_dataset(dataset=dataset, revision=revision)
datasets_provenance.update_during_migration(
Expand Down
95 changes: 74 additions & 21 deletions renku/core/migration/m_0010__metadata_fixes.py
Expand Up @@ -16,6 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Various metadata migrations for v10."""

import io
import json
import os
Expand All @@ -26,6 +27,7 @@

import zstandard as zstd

from renku.command.checks.activities import fix_activity_dates
from renku.command.checks.workflow import fix_plan_dates
from renku.command.command_builder import inject
from renku.core.interface.activity_gateway import IActivityGateway
Expand Down Expand Up @@ -53,9 +55,12 @@ def migrate(migration_context: MigrationContext):
if MigrationType.WORKFLOWS in migration_context.options.type:
migrate_activity_ids()
fix_plan_times()
fix_activity_times()

migrate_remote_entity_ids()
fix_dataset_date_modified()
fix_dataset_image_ids()
fix_removed_plans()

# NOTE: Rebuild all workflow catalogs since ids and times have changed
communication.echo("Rebuilding workflow metadata")
Expand All @@ -76,8 +81,8 @@ def migrate_old_metadata_namespaces():
header = int.from_bytes(file.read(4), "little")
file.seek(0)
if header == zstd.MAGIC_NUMBER:
with decompressor.stream_reader(file) as zfile:
data = json.load(zfile)
with decompressor.stream_reader(file) as compressed_file:
data = json.load(compressed_file)
compressed = True
else:
data = json.load(file)
Expand All @@ -99,7 +104,7 @@ def migrate_old_metadata_namespaces():


def nested_update(data: Dict[str, Any], target_key: str, transforms: List[Tuple[str, str]]) -> None:
"""Update a key's value based on tranformations (from, to) in a deeply nested dictionary."""
"""Update a key's value based on transformations (from, to) in a deeply nested dictionary."""
for k in list(data.keys()):
value = data[k]
if isinstance(value, str) and k == target_key:
Expand Down Expand Up @@ -232,19 +237,10 @@ def migrate_project_template_data(project_gateway: IProjectGateway):
project_context.database.commit()


@inject.autoparams("activity_gateway", "plan_gateway")
def fix_plan_times(activity_gateway: IActivityGateway, plan_gateway: IPlanGateway):
@inject.autoparams("plan_gateway")
def fix_plan_times(plan_gateway: IPlanGateway):
"""Add timezone to plan invalidations."""
database = project_context.database

plans: List[AbstractPlan] = plan_gateway.get_all_plans()
all_activities = activity_gateway.get_all_activities()
activity_map: Dict[str, Activity] = {}

for activity in all_activities:
plan_id = activity.association.plan.id
if plan_id not in activity_map or activity.started_at_time < activity_map[plan_id].started_at_time:
activity_map[plan_id] = activity

for plan in plans:
plan.unfreeze()
Expand All @@ -255,24 +251,41 @@ def fix_plan_times(activity_gateway: IActivityGateway, plan_gateway: IPlanGatewa
plan.date_removed = None

if plan.date_removed is not None:
if plan.date_removed < plan.date_created:
# NOTE: Fix invalidation times set before creation date on plans
plan.date_removed = plan.date_created
if plan.date_removed.tzinfo is None:
# NOTE: There was a bug that caused date_removed to be set without timezone (as UTC time)
# so we patch in the timezone here
plan.date_removed = plan.date_removed.replace(microsecond=0).astimezone(timezone.utc)
if plan.id in activity_map and plan.date_created > activity_map[plan.id].started_at_time:
plan.date_created = activity_map[plan.id].started_at_time
if plan.date_removed < plan.date_created:
# NOTE: Fix invalidation times set before creation date on plans
plan.date_removed = plan.date_created
plan.freeze()

fix_plan_dates(plans=plans, plan_gateway=plan_gateway)
database.commit()
project_context.database.commit()


@inject.autoparams("activity_gateway")
def fix_activity_times(activity_gateway: IActivityGateway):
"""Make sure activities have valid start/end/delete dates."""
fix_activity_dates(activities=activity_gateway.get_all_activities(include_deleted=True))
project_context.database.commit()


@inject.autoparams("dataset_gateway")
def fix_dataset_date_modified(dataset_gateway: IDatasetGateway):
"""Change date_created and date_modified to have correct semantics."""

def fix_creation_date(dataset):
"""Check creation date to make sure that it's after project's creation date."""
if dataset.date_created and dataset.date_created < project_context.project.date_created:
try:
dataset.date_created = min([f.date_added for f in dataset.files])
except (ValueError, TypeError):
dataset.date_created = project_context.project.date_created
else:
if dataset.date_created < project_context.project.date_created:
dataset.date_created = project_context.project.date_created

tails = dataset_gateway.get_provenance_tails()

for dataset_tail in tails:
Expand All @@ -281,6 +294,7 @@ def fix_dataset_date_modified(dataset_gateway: IDatasetGateway):
previous_modification_date = local_now()

while dataset.derived_from is not None:
fix_creation_date(dataset)
modification_date = dataset.date_removed or dataset.date_created

if modification_date is not None:
Expand All @@ -294,8 +308,9 @@ def fix_dataset_date_modified(dataset_gateway: IDatasetGateway):
found_datasets.append(dataset)
dataset = dataset_gateway.get_by_id(dataset.derived_from.value)

fix_creation_date(dataset)
# NOTE: first dataset in chain
modification_date = dataset.date_created or dataset.date_published
modification_date = dataset.date_published or dataset.date_created
if modification_date is not None:
dataset.unfreeze()
dataset.date_modified = modification_date
Expand All @@ -308,3 +323,41 @@ def fix_dataset_date_modified(dataset_gateway: IDatasetGateway):
child.freeze()

project_context.database.commit()


@inject.autoparams("dataset_gateway")
def fix_dataset_image_ids(dataset_gateway: IDatasetGateway):
"""Remove dashes from dataset image IDs."""
for dataset in dataset_gateway.get_provenance_tails():
while True:
if dataset.images:
for image in dataset.images:
image.id = image.id.replace("-", "")

dataset._p_changed = True

if not dataset.derived_from:
break

dataset = dataset_gateway.get_by_id(dataset.derived_from.value)

project_context.database.commit()


@inject.autoparams("plan_gateway")
def fix_removed_plans(plan_gateway: IPlanGateway):
"""Create a derivative if a removed plan doesn't have one."""
plans: List[AbstractPlan] = plan_gateway.get_all_plans()

for plan in plans:
if plan.date_removed and plan.derived_from is None:
derived_plan = plan.derive()
derived_plan.date_modified = plan.date_modified
derived_plan.delete(when=plan.date_removed)
plan_gateway.add(derived_plan)

plan.unfreeze()
plan.date_removed = None
plan.freeze()

project_context.database.commit()
5 changes: 3 additions & 2 deletions renku/core/util/datetime8601.py
Expand Up @@ -75,6 +75,7 @@ def _set_to_local_timezone(value):
return value.replace(tzinfo=local_tz)


def local_now() -> datetime:
def local_now(remove_microseconds: bool = True) -> datetime:
"""Return current datetime in local timezone."""
return datetime.now(timezone.utc).replace(microsecond=0).astimezone()
now = datetime.now(timezone.utc).astimezone()
return now.replace(microsecond=0) if remove_microseconds else now
6 changes: 4 additions & 2 deletions renku/core/util/git.py
Expand Up @@ -379,7 +379,7 @@ def get_entity_from_revision(
Entity: The Entity for the given path and revision.
"""
from renku.domain_model.entity import Collection, Entity
from renku.domain_model.entity import NON_EXISTING_ENTITY_CHECKSUM, Collection, Entity

def get_directory_members(absolute_path: Path) -> List[Entity]:
"""Return first-level files/directories in a directory."""
Expand Down Expand Up @@ -410,7 +410,9 @@ def get_directory_members(absolute_path: Path) -> List[Entity]:
checksum = repository.get_object_hash(revision=revision, path=path)
# NOTE: If object was not found at a revision it's either removed or exists in a different revision; keep the
# entity and use revision as checksum
checksum = checksum or revision or "HEAD"
if isinstance(revision, str) and revision == "HEAD":
revision = repository.head.commit.hexsha
checksum = checksum or revision or NON_EXISTING_ENTITY_CHECKSUM
id = Entity.generate_id(checksum=checksum, path=path)

absolute_path = repository.path / path
Expand Down
1 change: 0 additions & 1 deletion renku/data/shacl_shape.json
Expand Up @@ -1635,7 +1635,6 @@
"datatype": {
"@id": "xsd:string"
},
"minCount": 1,
"maxCount": 1
},
{
Expand Down
5 changes: 3 additions & 2 deletions renku/domain_model/workflow/plan.py
Expand Up @@ -153,14 +153,14 @@ def is_derivation(self) -> bool:
"""Return if an ``AbstractPlan`` has correct derived_from."""
raise NotImplementedError()

def delete(self):
def delete(self, when: datetime = local_now()):
"""Mark a plan as deleted.
NOTE: Don't call this function for deleting plans since it doesn't delete the whole plan derivatives chain. Use
renku.core.workflow.plan::remove_plan instead.
"""
self.unfreeze()
self.date_removed = local_now()
self.date_removed = when
self.freeze()


Expand Down Expand Up @@ -323,6 +323,7 @@ def derive(self, creator: Optional[Person] = None) -> "Plan":
derived.keywords = copy.deepcopy(self.keywords)
derived.outputs = self.outputs.copy()
derived.success_codes = self.success_codes.copy()
derived.creators = self.creators.copy()
derived.assign_new_id()

if creator and hasattr(creator, "email") and not any(c for c in self.creators if c.email == creator.email):
Expand Down

0 comments on commit 6d7b64a

Please sign in to comment.