Skip to content

Commit

Permalink
feat(cli): add a flag to fail on migration errors
Browse files Browse the repository at this point in the history
  • Loading branch information
m-alisafaee committed Sep 21, 2021
1 parent 7579f4f commit 80ea6cf
Show file tree
Hide file tree
Showing 21 changed files with 118 additions and 74 deletions.
7 changes: 5 additions & 2 deletions renku/cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
@click.option(
"-d", "--skip-docker-update", is_flag=True, hidden=True, help="Do not update Dockerfile to current renku version."
)
def migrate(check, skip_template_update, skip_docker_update):
@click.option("-s", "--strict", is_flag=True, hidden=True, help="Abort migrations if an error is raised.")
def migrate(check, skip_template_update, skip_docker_update, strict):
"""Check for migration and migrate to the latest Renku project version."""
status = check_project().build().execute().output

Expand Down Expand Up @@ -107,7 +108,9 @@ def migrate(check, skip_template_update, skip_docker_update):
communicator = ClickCallback()

command = migrate_project().with_communicator(communicator).with_commit()
result = command.build().execute(skip_template_update=skip_template_update, skip_docker_update=skip_docker_update)
result = command.build().execute(
skip_template_update=skip_template_update, skip_docker_update=skip_docker_update, strict=strict
)

result, _, _ = result.output

Expand Down
7 changes: 6 additions & 1 deletion renku/core/commands/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,19 @@ def migrate_project():


def _migrate_project(
force_template_update=False, skip_template_update=False, skip_docker_update=False, skip_migrations=False
force_template_update=False,
skip_template_update=False,
skip_docker_update=False,
skip_migrations=False,
strict=False,
):
"""Migrate all project's entities."""
return migrate(
force_template_update=force_template_update,
skip_template_update=skip_template_update,
skip_docker_update=skip_docker_update,
skip_migrations=skip_migrations,
strict=strict,
)


Expand Down
13 changes: 3 additions & 10 deletions renku/core/commands/providers/renku.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,20 +402,13 @@ def _fetch_dataset(self, client_dispatcher: IClientDispatcher, database_dispatch
self._files_info = files_info

@staticmethod
@inject.autoparams()
def _migrate_project(client_dispatcher: IClientDispatcher):
def _migrate_project():
if is_project_unsupported():
return

client = client_dispatcher.current_client

# NOTE: We are not interested in migrating workflows when importing datasets
previous_migration_type = client.migration_type
client.migration_type = ~MigrationType.WORKFLOWS
try:
communication.disable()
migrate(skip_template_update=True, skip_docker_update=True)
# NOTE: We are not interested in migrating workflows when importing datasets
migrate(skip_template_update=True, skip_docker_update=True, migration_type=~MigrationType.WORKFLOWS)
finally:
client.migration_type = previous_migration_type

communication.enable()
10 changes: 9 additions & 1 deletion renku/core/management/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
from renku.core.management.interface.project_gateway import IProjectGateway
from renku.core.management.migrations.utils import (
OLD_METADATA_PATH,
MigrationContext,
MigrationOptions,
MigrationType,
is_using_temporary_datasets_path,
read_project_version,
)
Expand Down Expand Up @@ -97,6 +100,8 @@ def migrate(
skip_migrations=False,
project_version=None,
max_version=None,
strict=False,
migration_type=MigrationType.ALL,
):
"""Apply all migration files to the project."""
client = client_dispatcher.current_client
Expand Down Expand Up @@ -136,6 +141,9 @@ def migrate(
project_version = project_version or _get_project_version()
n_migrations_executed = 0

migration_options = MigrationOptions(strict=strict, type=migration_type)
migration_context = MigrationContext(client=client, options=migration_options)

version = 1
for version, path in get_migrations():
if max_version and version > max_version:
Expand All @@ -145,7 +153,7 @@ def migrate(
module_name = module.__name__.split(".")[-1]
communication.echo(f"Applying migration {module_name}...")
try:
module.migrate(client)
module.migrate(migration_context)
except (Exception, BaseException) as e:
raise MigrationError("Couldn't execute migration") from e
n_migrations_executed += 1
Expand Down
4 changes: 2 additions & 2 deletions renku/core/management/migrations/m_0003__0_pyld2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
from .m_0005__1_pyld2 import migrate_datasets_for_pyld2


def migrate(client):
def migrate(migration_context):
"""Migration function."""
migrate_datasets_for_pyld2(client)
migrate_datasets_for_pyld2(migration_context.client)
6 changes: 3 additions & 3 deletions renku/core/management/migrations/m_0003__1_jsonld.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
from renku.core.models.jsonld import read_yaml, write_yaml


def migrate(client):
def migrate(migration_context):
"""Migration function."""

_migrate_project_metadata(client)
_migrate_datasets_metadata(client)
_migrate_project_metadata(migration_context.client)
_migrate_datasets_metadata(migration_context.client)


def _migrate_project_metadata(client):
Expand Down
3 changes: 2 additions & 1 deletion renku/core/management/migrations/m_0003__2_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
from renku.core.utils.urls import url_to_string


def migrate(client):
def migrate(migration_context):
"""Migration function."""
client = migration_context.client
_ensure_clean_lock(client)
_do_not_track_lock_file(client)
_migrate_datasets_pre_v0_3(client)
Expand Down
4 changes: 2 additions & 2 deletions renku/core/management/migrations/m_0004__0_pyld2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
from .m_0005__1_pyld2 import migrate_datasets_for_pyld2


def migrate(client):
def migrate(migration_context):
"""Migration function."""
migrate_datasets_for_pyld2(client)
migrate_datasets_for_pyld2(migration_context.client)
4 changes: 2 additions & 2 deletions renku/core/management/migrations/m_0004__submodules.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
from renku.core.utils.urls import remove_credentials


def migrate(client):
def migrate(migration_context):
"""Migration function."""
_migrate_submodule_based_datasets(client)
_migrate_submodule_based_datasets(migration_context.client)


@inject.autoparams()
Expand Down
4 changes: 2 additions & 2 deletions renku/core/management/migrations/m_0005__1_pyld2.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from renku.core.management.migrations.utils import OLD_METADATA_PATH, get_datasets_path


def migrate(client):
def migrate(migration_context):
"""Migration function."""
migrate_datasets_for_pyld2(client)
migrate_datasets_for_pyld2(migration_context.client)


def migrate_datasets_for_pyld2(client):
Expand Down
6 changes: 3 additions & 3 deletions renku/core/management/migrations/m_0005__2_cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@
)


def migrate(client):
def migrate(migration_context):
"""Migration function."""
if MigrationType.WORKFLOWS not in client.migration_type:
if MigrationType.WORKFLOWS not in migration_context.options.type:
return
_migrate_old_workflows(client)
_migrate_old_workflows(migration_context.client)


def _migrate_old_workflows(client):
Expand Down
4 changes: 2 additions & 2 deletions renku/core/management/migrations/m_0006__dataset_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from renku.core.management.migrations.models.v3 import get_client_datasets


def migrate(client):
def migrate(migration_context):
"""Migration function."""
_fix_dataset_metadata(client)
_fix_dataset_metadata(migration_context.client)


def _fix_dataset_metadata(client):
Expand Down
4 changes: 2 additions & 2 deletions renku/core/management/migrations/m_0007__source_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from renku.core.management.migrations.utils import generate_dataset_file_url


def migrate(client):
def migrate(migration_context):
"""Migration function."""
_fix_dataset_file_source_and_url(client)
_fix_dataset_file_source_and_url(migration_context.client)


def _fix_dataset_file_source_and_url(client):
Expand Down
4 changes: 2 additions & 2 deletions renku/core/management/migrations/m_0008__dataset_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from renku.core.management.migrations.models.v8 import get_client_datasets


def migrate(client):
def migrate(migration_context):
"""Migration function."""
_fix_dataset_metadata(client)
_fix_dataset_metadata(migration_context.client)


def _fix_dataset_metadata(client):
Expand Down
22 changes: 16 additions & 6 deletions renku/core/management/migrations/m_0009__new_metadata_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@
PLAN_CACHE = {}


def migrate(client):
def migrate(migration_context):
"""Migration function."""
client = migration_context.client
committed = _commit_previous_changes(client)
# TODO: set remove=True once the migration to the new metadata is finalized
generate_new_metadata(remove=False, committed=committed)
generate_new_metadata(
remove=False,
committed=committed,
strict=migration_context.options.strict,
migration_type=migration_context.options.type,
)
_remove_dataset_metadata_files(client)
metadata_path = client.renku_path.joinpath(OLD_METADATA_PATH)
metadata_path.unlink()
Expand Down Expand Up @@ -148,6 +154,8 @@ def remove_graph_files(client):

@inject.autoparams()
def generate_new_metadata(
strict,
migration_type: MigrationType,
client_dispatcher: IClientDispatcher,
database_gateway: IDatabaseGateway,
activity_gateway: IActivityGateway,
Expand Down Expand Up @@ -184,15 +192,19 @@ def generate_new_metadata(

try:
# NOTE: Don't migrate workflows for dataset-only migrations
if MigrationType.WORKFLOWS in client.migration_type:
if MigrationType.WORKFLOWS in migration_type:
_process_workflows(client=client, activity_gateway=activity_gateway, commit=commit, remove=remove)
_process_datasets(
client=client, commit=commit, datasets_provenance=datasets_provenance, is_last_commit=is_last_commit
)
except errors.MigrationError:
if strict:
raise
communication.echo("")
communication.warn(f"Cannot process commit '{commit.hexsha}' - Migration failed: {traceback.format_exc()}")
except Exception:
if strict:
raise
communication.echo("")
communication.warn(f"Cannot process commit '{commit.hexsha}' - Exception: {traceback.format_exc()}")

Expand Down Expand Up @@ -656,15 +668,13 @@ def copy_and_migrate_datasets():
project_version = read_project_version()
set_temporary_datasets_path(datasets_path)
communication.disable()
previous_migration_type = client.migration_type
client.migration_type = MigrationType.DATASETS
renku.core.management.migrate.migrate(
project_version=project_version,
skip_template_update=True,
skip_docker_update=True,
max_version=8,
migration_type=MigrationType.DATASETS,
)
client.migration_type = previous_migration_type
finally:
communication.enable()
unset_temporary_datasets_path()
Expand Down
45 changes: 36 additions & 9 deletions renku/core/management/migrations/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import threading
import uuid
from enum import IntFlag
from typing import NamedTuple
from urllib.parse import ParseResult, quote, urljoin, urlparse

import pyld
Expand All @@ -35,6 +36,41 @@
thread_local_storage = threading.local()


class MigrationType(IntFlag):
"""Type of migration that is being executed."""

DATASETS = 1
WORKFLOWS = 2
STRUCTURAL = 4
ALL = DATASETS | WORKFLOWS | STRUCTURAL


class MigrationOptions(NamedTuple):
"""Migration options."""

strict: bool
type: MigrationType = MigrationType.ALL


class MigrationContext:
"""Context containing required migration information."""

def __init__(self, client, options: MigrationOptions):
self._client = client
self._options = options
self._type = type

@property
def client(self):
"""Return migration's client."""
return self._client

@property
def options(self) -> MigrationOptions:
"""Return migration's options."""
return self._options


def generate_url_id(client, url_str, url_id):
"""Generate @id field for Url."""
url = url_str or url_id
Expand Down Expand Up @@ -104,15 +140,6 @@ def generate_dataset_file_url(client, filepath):
return project_id.geturl()


class MigrationType(IntFlag):
"""Type of migration that is being executed."""

DATASETS = 1
WORKFLOWS = 2
STRUCTURAL = 4
ALL = DATASETS | WORKFLOWS | STRUCTURAL


def migrate_types(data):
"""Fix data types."""
type_mapping = {
Expand Down
12 changes: 5 additions & 7 deletions renku/core/management/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client for handling a local repository."""

import hashlib
import json
import os
Expand All @@ -34,7 +35,6 @@
from renku.core.management.config import RENKU_HOME
from renku.core.management.interface.database_gateway import IDatabaseGateway
from renku.core.management.interface.project_gateway import IProjectGateway
from renku.core.management.migrations.utils import MigrationType
from renku.core.models.enums import ConfigFilter
from renku.core.models.project import Project
from renku.core.utils import communication
Expand Down Expand Up @@ -123,8 +123,6 @@ class RepositoryApiMixin(GitCore):

_remote_cache = attr.ib(factory=dict)

_migration_type = attr.ib(default=MigrationType.ALL)

def __attrs_post_init__(self):
"""Initialize computed attributes."""
#: Configure Renku path.
Expand Down Expand Up @@ -169,14 +167,14 @@ def lock(self):
@property
def migration_type(self):
"""Type of migration that is being executed on this client."""
return self._migration_type
# TODO: Remove
raise NotImplementedError

@migration_type.setter
def migration_type(self, value):
"""Set type of migration."""
if not isinstance(value, MigrationType):
raise ValueError(f"Invalid value for MigrationType: {type(value)}")
self._migration_type = value
# TODO: Remove
raise NotImplementedError

@property
def docker_path(self):
Expand Down

0 comments on commit 80ea6cf

Please sign in to comment.