Skip to content

Commit

Permalink
Merge pull request #167 from getindata/kedro_update_b
Browse files Browse the repository at this point in the history
Update Kedro to 0.18.1
  • Loading branch information
blazejpalkus-gid committed Aug 1, 2022
2 parents 7392e23 + ca334db commit b0e11f1
Show file tree
Hide file tree
Showing 17 changed files with 616 additions and 441 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
- name: Initialize kedro spaceflights project
run: |
pip install . 'kedro<0.18'
pip install .
kedro new --starter spaceflights --config tests/e2e/starter-config.yml --verbose
- name: Install project dependencies
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,5 @@ venv.bak/
.mypy_cache/

docs/_build

spaceflights/
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Added e2e tests github action for pull requests with kubeflow setup in gcp
- Added support for extra volumes per node
- Refactored configuration classes to Pydantic
- Add support for `kedro>=0.18.1,<0.19`

## [0.6.4] - 2022-06-01

Expand Down
14 changes: 4 additions & 10 deletions kedro_kubeflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,7 @@ def init(ctx, kfp_url: str, with_github_actions: bool):
)
@click.pass_context
def mlflow_start(ctx, kubeflow_run_id: str, output: str):
import mlflow
from kedro_mlflow.framework.context import get_mlflow_config
import mlflow # NOQA

token = AuthHandler().obtain_id_token()
if token:
Expand All @@ -320,13 +319,9 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str):

try:
kedro_context = ctx.obj["context_helper"].context
mlflow_conf = get_mlflow_config(kedro_context)
mlflow_conf.setup(kedro_context)
mlflow_conf = kedro_context.mlflow
except AttributeError:
kedro_session = ctx.obj["context_helper"].session
with kedro_session:
mlflow_conf = get_mlflow_config(kedro_session)
mlflow_conf.setup()
raise click.ClickException("Could not read MLFlow config")

run = mlflow.start_run(
experiment_id=mlflow_conf.experiment.experiment_id, nested=False
Expand All @@ -340,8 +335,7 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str):
@kubeflow_group.command(hidden=True)
@click.argument("pvc_name", type=str)
def delete_pipeline_volume(pvc_name: str):
import kubernetes.client
import kubernetes.config
import kubernetes.config # NOQA

kubernetes.config.load_incluster_config()
current_namespace = open(
Expand Down
31 changes: 21 additions & 10 deletions kedro_kubeflow/context_helper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
from functools import lru_cache
from pathlib import Path
from typing import Dict, Iterable
from typing import Any, Dict

from kedro import __version__ as kedro_version
from kedro.config import TemplatedConfigLoader
from kedro.framework.session import KedroSession
from semver import VersionInfo

from .config import PluginConfig
Expand All @@ -18,8 +18,23 @@ class EnvTemplatedConfigLoader(TemplatedConfigLoader):
# defaults provided so default variables ${commit_id|dirty} work for some entries
ENV_DEFAULTS = {"commit_id": None, "branch_name": None}

def __init__(self, conf_paths: Iterable[str]):
super().__init__(conf_paths, globals_dict=self.read_env())
def __init__(
self,
conf_source: str,
env: str = None,
runtime_params: Dict[str, Any] = None,
*,
base_env: str = "base",
default_run_env: str = "local"
):
super().__init__(
conf_source,
env=env,
runtime_params=runtime_params,
globals_dict=self.read_env(),
base_env=base_env,
default_run_env=default_run_env,
)

def read_env(self) -> Dict:
config = EnvTemplatedConfigLoader.ENV_DEFAULTS.copy()
Expand Down Expand Up @@ -49,8 +64,6 @@ def project_name(self):
@property
@lru_cache()
def session(self):
from kedro.framework.session import KedroSession

return KedroSession.create(self._metadata.package_name, env=self._env)

@property
Expand All @@ -65,7 +78,7 @@ def context(self):
@lru_cache()
def config(self) -> PluginConfig:
raw = EnvTemplatedConfigLoader(
self.context.config_loader.conf_paths
self.context.config_loader.conf_source
).get(self.CONFIG_FILE_PATTERN)
return PluginConfig(**raw)

Expand Down Expand Up @@ -98,6 +111,4 @@ def project_name(self):

@property
def context(self):
from kedro.framework.context import load_context

return load_context(Path.cwd(), env=self._env)
return self.session.load_context()
3 changes: 2 additions & 1 deletion kedro_kubeflow/generators/one_pod_pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from kedro.framework.context import KedroContext
from kfp import dsl

from ..utils import clean_name
Expand All @@ -19,7 +20,7 @@ class OnePodPipelineGenerator(object):

def __init__(self, config, project_name, context):
self.project_name = project_name
self.context = context
self.context: KedroContext = context
dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True
self.run_config = config.run_config
self.catalog = context.config_loader.get("catalog*")
Expand Down
10 changes: 6 additions & 4 deletions kedro_kubeflow/generators/pod_per_node_pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, Set

import kubernetes.client as k8s
from kedro.framework.context import KedroContext
from kedro.pipeline.node import Node
from kfp import dsl

Expand All @@ -22,7 +23,7 @@ class PodPerNodePipelineGenerator(object):

def __init__(self, config, project_name, context):
self.project_name = project_name
self.context = context
self.context: KedroContext = context
dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True
self.run_config = config.run_config
self.catalog = context.config_loader.get("catalog*")
Expand All @@ -42,12 +43,13 @@ def generate_pipeline(self, pipeline, image, image_pull_policy):
@maybe_add_params(self.context.params)
def convert_kedro_pipeline_to_kfp() -> None:
"""Convert from a Kedro pipeline into a kfp container graph."""

from kedro.framework.project import pipelines # NOQA

dsl.get_pipeline_conf().set_ttl_seconds_after_finished(
self.run_config.ttl
)
node_dependencies = self.context.pipelines.get(
pipeline
).node_dependencies
node_dependencies = pipelines[pipeline].node_dependencies
with create_pipeline_exit_handler(
pipeline,
image,
Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/generators/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import fsspec
import kubernetes.client as k8s
from fsspec.implementations.local import LocalFileSystem
from kfp import dsl
from kfp.compiler._k8s_helper import sanitize_k8s_name

Expand Down Expand Up @@ -153,7 +154,6 @@ def customize_op(op, image_pull_policy, run_config: RunConfig):


def is_local_fs(filepath):
from fsspec.implementations.local import LocalFileSystem

file_open = fsspec.open(filepath)
return isinstance(file_open.fs, LocalFileSystem)
3 changes: 2 additions & 1 deletion kedro_kubeflow/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

from kedro_kubeflow.utils import is_mlflow_enabled

from .auth import AuthHandler


class MlflowIapAuthHook:
"""Allows authentication trough IAP proxy the same way as kubeflow pipelines"""

@hook_impl
def after_catalog_created(self, catalog: DataCatalog, **kwargs) -> None:
from .auth import AuthHandler

token = AuthHandler().obtain_id_token()
if token:
Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ def clean_name(name: str) -> str:

def is_mlflow_enabled() -> bool:
try:
import kedro_mlflow # NOQA
import mlflow # NOQA
from kedro_mlflow.framework.context import get_mlflow_config # NOQA

return True
except ImportError:
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

# Runtime Requirements.
INSTALL_REQUIRES = [
"kedro>=0.16,<0.18",
"click<8.0",
"kedro>=0.18.1, <0.19.0",
"click>=8.0.4",
"kfp>=1.8.12,<2.0",
"tabulate>=0.8.7",
"semver~=2.10",
Expand All @@ -17,7 +17,7 @@

# Dev Requirements
EXTRA_REQUIRE = {
"mlflow": ["kedro-mlflow>=0.4.1,<0.8.0"],
"mlflow": ["kedro-mlflow~=0.11.1"],
"tests": [
"pytest>=5.4.0, <8.0.0",
"pytest-cov>=2.8.0, <4.0.0",
Expand Down
File renamed without changes.
5 changes: 1 addition & 4 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,9 @@ def test_init_with_github_actions(self, cwd):
assert "kedro kubeflow upload-pipeline" in content
assert "kedro kubeflow schedule" in content

@patch("kedro_mlflow.framework.context.get_mlflow_config")
@patch("mlflow.start_run")
@patch("mlflow.set_tag")
def test_mlflow_start(
self, set_tag_mock, start_run_mock, get_mlflow_config_mock
):
def test_mlflow_start(self, set_tag_mock, start_run_mock):
context_helper = MagicMock(ContextHelper)
config = dict(context_helper=context_helper)
runner = CliRunner()
Expand Down
7 changes: 5 additions & 2 deletions tests/test_context_helper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import unittest
from pathlib import Path
from unittest.mock import MagicMock, Mock, patch

from kedro.framework.session import KedroSession
Expand Down Expand Up @@ -57,8 +58,10 @@ def test_config(self):
class TestEnvTemplatedConfigLoader(unittest.TestCase):
@staticmethod
def get_config():
config_path = [os.path.dirname(os.path.abspath(__file__))]
loader = EnvTemplatedConfigLoader(config_path)
config_path = str(
Path(os.path.dirname(os.path.abspath(__file__))) / "conf"
)
loader = EnvTemplatedConfigLoader(config_path, default_run_env="base")
return loader.get("test_config.yml")

def test_loader_with_defaults(self):
Expand Down
11 changes: 10 additions & 1 deletion tests/test_generator_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@
from kedro_kubeflow.generators.utils import is_local_fs


def gcsfs_is_missing():
try:
import gcsfs # NOQA

return False
except ImportError:
return True


@unittest.skipIf(gcsfs_is_missing(), "Package gcsfs is not installed")
class TestGeneratorUtils(unittest.TestCase):
def test_is_local(self):

assert is_local_fs("data/test/file.txt") is True
assert is_local_fs("gs://test-bucket/file.txt") is False
Loading

0 comments on commit b0e11f1

Please sign in to comment.