Skip to content

Commit

Permalink
#8 Auto-dataset via custom runner - tested and polished
Browse files Browse the repository at this point in the history
  • Loading branch information
marrrcin committed Aug 2, 2022
1 parent 3f18d3c commit 05b7706
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 125 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## [Unreleased]
- Add auto-dataset creation, to make intermediate dataset creation transparent to the end-user (no need to explicitly add them in the Data Catalog) (#8)

## [0.5.0] - 2022-07-13

Expand Down
119 changes: 8 additions & 111 deletions docs/source/03_getting_started/01_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ kedro-vertexai
```

### Adjusting Data Catalog to be compatible with Vertex AI
This change enforces raw data existence in the image. Also, one of the limitations of running the Kedro pipeline on Vertex AI (and not on local environment) is inability to use `MemoryDataSet`s, as the pipeline nodes do not share memory, so every artifact should be stored as file
in a location that can be accessed by the service (e.g. GCS bucket). The `spaceflights` demo configures datasets to output into local `data` folder, so let's change the behaviour by creating a temporary GCS bucket (referred to as `STAGING_BUCKET`) and modifying `conf/base/catalog.yml`:
This change enforces raw input data existence in the image. While running locally, every intermediate dataset is stored as a `MemoryDataSet`. When running in VertexAI Pipelines, there is no shared-memory, Kedro-VertexAI plugin automatically handles intermediate dataset serialization - every intermediate dataset will be stored (as a compressed cloudpickle file) in GCS bucket specified in the `vertexai.yml` config under `run_config.root` key.
Adjusted `catalog.yml` should look like this.

```yaml
companies:
Expand All @@ -119,117 +119,14 @@ shuttles:
type: pandas.ExcelDataSet
filepath: data/01_raw/shuttles.xlsx
layer: raw
load_args:
engine: openpyxl

model_input_table:
type: pandas.CSVDataSet
filepath: gs://STAGING_BUCKET/${run_id}/03_primary/model_input_table.csv
layer: primary

### catalog entries required starter version <= 0.17.6
preprocessed_companies:
type: pandas.CSVDataSet
filepath: gs://STAGING_BUCKET/${run_id}/02_intermediate/preprocessed_companies.csv
layer: intermediate

preprocessed_shuttles:
type: pandas.CSVDataSet
filepath: gs://STAGING_BUCKET/${run_id}/02_intermediate/preprocessed_shuttles.csv
layer: intermediate

X_train:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/X_train.pickle
layer: model_input

y_train:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/y_train.pickle
layer: model_input

X_test:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/X_test.pickle
layer: model_input

y_test:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/y_test.pickle
layer: model_input

regressor:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/06_models/regressor.pickle
versioned: true
layer: models

### catalog entries required for starter version >= 0.17.7
data_processing.preprocessed_companies:
type: pandas.CSVDataSet
filepath: gs://STAGING_BUCKET/${run_id}/02_intermediate/preprocessed_companies.csv
layer: intermediate

data_processing.preprocessed_shuttles:
type: pandas.CSVDataSet
filepath: gs://STAGING_BUCKET/${run_id}/02_intermediate/preprocessed_shuttles.csv
layer: intermediate

data_science.active_modelling_pipeline.X_train:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/X_train.pickle
layer: model_input

data_science.active_modelling_pipeline.y_train:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/y_train.pickle
layer: model_input

data_science.active_modelling_pipeline.X_test:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/X_test.pickle
layer: model_input

data_science.active_modelling_pipeline.y_test:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/y_test.pickle
layer: model_input

data_science.active_modelling_pipeline.regressor:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/06_models/regressor.pickle
versioned: true
layer: models

data_science.candidate_modelling_pipeline.X_train:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/X_train.pickle
layer: model_input

data_science.candidate_modelling_pipeline.y_train:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/y_train.pickle
layer: model_input

data_science.candidate_modelling_pipeline.X_test:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/X_test.pickle
layer: model_input

data_science.candidate_modelling_pipeline.y_test:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/05_model_input/y_test.pickle
layer: model_input

data_science.candidate_modelling_pipeline.regressor:
type: pickle.PickleDataSet
filepath: gs://STAGING_BUCKET/${run_id}/06_models/regressor.pickle
versioned: true
layer: models
```

All intermediate and output data will be stored in the location with the following pattern:
```
gs://<run_config.root from vertexai.yml/kedro-vertexai-temp/<vertex ai job name>/*.bin
```

We're investigating ways to stop enforcing explicit data catalog definitions for intermediate datasets, follow the issue here [https://github.com/getindata/kedro-vertexai/issues/8](https://github.com/getindata/kedro-vertexai/issues/8).
Of course if you want to use intermediate/output data and store it a location of your choice, add it to the catalog. Be aware that you cannot use local paths - use `gs://` paths instead.

### Disable telemetry or ensure consent
Latest version of Kedro starters come with the `kedro-telemetry` installed, which by default prompts the user to allow or deny the data collection. Before submitting the job to Vertex AI Pipelines you have two options:
Expand Down Expand Up @@ -268,7 +165,7 @@ Configuration generated in /Users/getindata/vertex-ai-plugin-demo/conf/base/vert

Then adjust the `conf/base/vertexai.yaml`, especially:
* `image:` key should point to the full image name (like `remote.repo.url.com/vertex-ai-plugin-demo:latest` if you pushed the image at this name).
* `root:` key should point to the GCS bucket that will be used internally by Vertex AI, e.g. `your_bucket_name/subfolder-for-vertexai`
* `root:` key should point to the GCS bucket that will be used internally by Vertex AI and the plugin itself, e.g. `your_bucket_name/subfolder-for-vertexai`

Finally, everything is set to run the pipeline on Vertex AI Pipelines. Execute `run-once` command:

Expand Down
13 changes: 9 additions & 4 deletions kedro_vertexai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from click import ClickException, Context

from .client import VertexAIPipelinesClient
from .config import PluginConfig
from .constants import VERTEXAI_RUN_ID_TAG
from .config import PluginConfig, RunConfig
from .constants import KEDRO_VERTEXAI_BLOB_TEMP_DIR_NAME, VERTEXAI_RUN_ID_TAG
from .context_helper import ContextHelper
from .data_models import PipelineResult
from .utils import materialize_dynamic_configuration, store_parameters_in_yaml
Expand Down Expand Up @@ -99,16 +99,21 @@ def run_once(
"""Deploy pipeline as a single run within given experiment
Config can be specified in kubeflow.yml as well."""
context_helper = ctx.obj["context_helper"]
config = context_helper.config.run_config
config: RunConfig = context_helper.config.run_config
client: VertexAIPipelinesClient = context_helper.vertexai_client

client.run_once(
run = client.run_once(
pipeline=pipeline,
image=image if image else config.image,
image_pull_policy=config.image_pull_policy,
parameters=format_params(params),
)

click.echo(
f"Intermediate data datasets will be stored in{os.linesep}"
f"gs://{config.root.strip('/')}/{KEDRO_VERTEXAI_BLOB_TEMP_DIR_NAME}/{run['displayName']}/*.bin"
)

if wait_for_completion:
result: PipelineResult = client.wait_for_completion(
timeout_seconds
Expand Down
1 change: 1 addition & 0 deletions kedro_vertexai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def run_once(
network=self.run_config.network.vpc,
)
self.log.debug("Run created %s", str(run))

return run

def _generate_run_name(self, config: PluginConfig): # noqa
Expand Down
2 changes: 2 additions & 0 deletions kedro_vertexai/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
KEDRO_VERTEXAI_DISABLE_CONFIG_HOOK = "KEDRO_VERTEXAI_DISABLE_CONFIG_HOOK"
VERTEXAI_RUN_ID_TAG = "vertexai_run_id"
VERTEXAI_JOB_NAME_TAG = "vertexai_job_name"
KEDRO_GLOBALS_PATTERN = "KEDRO_GLOBALS_PATTERN"
KEDRO_VERTEXAI_BLOB_TEMP_DIR_NAME = "kedro-vertexai-temp"
KEDRO_VERTEXAI_RUNNER_CONFIG = "KEDRO_VERTEXAI_RUNNER_CONFIG"
KEDRO_CONFIG_RUN_ID = "KEDRO_CONFIG_RUN_ID"
KEDRO_CONFIG_JOB_NAME = "KEDRO_CONFIG_JOB_NAME"
2 changes: 2 additions & 0 deletions kedro_vertexai/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from kedro_vertexai.config import KedroVertexAIRunnerConfig, RunConfig
from kedro_vertexai.constants import (
KEDRO_CONFIG_JOB_NAME,
KEDRO_CONFIG_RUN_ID,
KEDRO_GLOBALS_PATTERN,
KEDRO_VERTEXAI_DISABLE_CONFIG_HOOK,
Expand Down Expand Up @@ -174,6 +175,7 @@ def _build_kfp_ops(
[
f"{KEDRO_VERTEXAI_DISABLE_CONFIG_HOOK}={'true' if CONFIG_HOOK_DISABLED else 'false'}",
f"{KEDRO_CONFIG_RUN_ID}={dsl.PIPELINE_JOB_ID_PLACEHOLDER}",
f"{KEDRO_CONFIG_JOB_NAME}={dsl.PIPELINE_JOB_NAME_PLACEHOLDER}",
f"{KEDRO_VERTEXAI_RUNNER_CONFIG}='{runner_config.json()}'",
self._globals_env(),
f"kedro run -e {self.context.env}",
Expand Down
10 changes: 9 additions & 1 deletion kedro_vertexai/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from kedro.config import ConfigLoader
from kedro.framework.hooks import hook_impl

from kedro_vertexai.constants import KEDRO_CONFIG_RUN_ID, VERTEXAI_RUN_ID_TAG
from kedro_vertexai.constants import (
KEDRO_CONFIG_JOB_NAME,
KEDRO_CONFIG_RUN_ID,
VERTEXAI_JOB_NAME_TAG,
VERTEXAI_RUN_ID_TAG,
)
from kedro_vertexai.context_helper import EnvTemplatedConfigLoader
from kedro_vertexai.runtime_config import CONFIG_HOOK_DISABLED
from kedro_vertexai.utils import is_mlflow_enabled
Expand All @@ -22,6 +27,9 @@ def before_node_run(self) -> None:
if run_id := os.getenv(KEDRO_CONFIG_RUN_ID, None):
mlflow.set_tag(VERTEXAI_RUN_ID_TAG, run_id)

if job_name := os.getenv(KEDRO_CONFIG_JOB_NAME, None):
mlflow.set_tag(VERTEXAI_JOB_NAME_TAG, job_name)


if not CONFIG_HOOK_DISABLED:

Expand Down
4 changes: 2 additions & 2 deletions kedro_vertexai/vertex_ai/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from kedro_vertexai.config import KedroVertexAIRunnerConfig
from kedro_vertexai.constants import (
KEDRO_CONFIG_RUN_ID,
KEDRO_CONFIG_JOB_NAME,
KEDRO_VERTEXAI_RUNNER_CONFIG,
)
from kedro_vertexai.vertex_ai.datasets import KedroVertexAIRunnerDataset
Expand Down Expand Up @@ -46,5 +46,5 @@ def create_default_data_set(self, ds_name: str) -> AbstractDataSet:
return KedroVertexAIRunnerDataset(
self.runner_config.storage_root,
ds_name,
os.environ.get(KEDRO_CONFIG_RUN_ID),
os.environ.get(KEDRO_CONFIG_JOB_NAME),
)
7 changes: 0 additions & 7 deletions tests/test_runner_and_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from unittest.mock import patch
from uuid import uuid4

import numpy as np
import pandas as pd
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, pipeline

Expand Down Expand Up @@ -91,11 +89,6 @@ def __init__(self, data):
self.data = data

for obj, comparer in [
(
pd.DataFrame(np.random.rand(1000, 3), columns=["a", "b", "c"]),
lambda a, b: a.equals(b),
),
(np.random.rand(100, 100), lambda a, b: np.equal(a, b).all()),
(
["just", "a", "list"],
lambda a, b: all(a[i] == b[i] for i in range(len(a))),
Expand Down

0 comments on commit 05b7706

Please sign in to comment.