Skip to content

Commit

Permalink
Feat/add create and list commands (#39)
Browse files Browse the repository at this point in the history
* feat: add comand list to list pipelines

* feat: add command create to cli and folder structure reorg

* enh: renamed pipelines_deployer.py -> pipeline_deployer.py

* test: update tests

* doc: update readme

* enh: factorize get config paths
  • Loading branch information
julesbertrand committed Sep 26, 2023
1 parent d01d60c commit 9b973bf
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 95 deletions.
34 changes: 30 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
- [Folder Structure](#folder-structure)
- [CLI: Deploying a Pipeline](#cli-deploying-a-pipeline)
- [CLI: Checking Pipelines are valid](#cli-checking-pipelines-are-valid)
- [CLI Options](#cli-options)
- [CLI: Other commands](#cli-other-commands)
- [`create`](#create)
- [`list`](#list)
- [CLI: Options](#cli-options)


## Why this tool?
Expand Down Expand Up @@ -234,7 +237,26 @@ To validate all pipelines in the `vertex/pipelines` folder:
vertex-deployer check --all
```

### CLI Options

### CLI: Other commands

#### `create`

You can create all files needed for a pipeline using the `create` command:
```bash
vertex-deployer create my_new_pipeline --config-type py
```

This will create a `my_new_pipeline.py` file in the `vertex/pipelines` folder and a `vertex/config/my_new_pipeline/` folder with mutliple config files in it.

#### `list`

You can list all pipelines in the `vertex/pipelines` folder using the `list` command:
```bash
vertex-deployer list --with-configs
```

### CLI: Options

```bash
vertex-deployer --help
Expand All @@ -258,10 +280,14 @@ vertex-deployer --log-level DEBUG deploy ...
│ ├─ __init__.py
│ ├─ cli.py
│ ├─ constants.py
│ ├─ models.py
│ ├─ pipeline_checks.py
│ ├─ pipeline_deployer.py
│ └─ utils.py
│ └─ utils
│ ├─ config.py
│ ├─ exceptions.py
│ ├─ logging.py
│ ├─ models.py
│ └─ utils.py
├─ tests/
├─ example
| ├─ example.env
Expand Down
78 changes: 72 additions & 6 deletions deployer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@
from typing_extensions import Annotated

from deployer.constants import (
CONFIG_ROOT_PATH,
DEFAULT_LOCAL_PACKAGE_PATH,
DEFAULT_TAGS,
PIPELINE_MINIMAL_TEMPLATE,
PIPELINE_ROOT_PATH,
PYTHON_CONFIG_TEMPLATE,
)
from deployer.pipeline_checks import Pipelines
from deployer.pipelines_deployer import VertexPipelineDeployer
from deployer.utils import (
LoguruLevel,
import_pipeline_from_dir,
from deployer.pipeline_deployer import VertexPipelineDeployer
from deployer.utils.config import (
ConfigType,
list_config_filepaths,
load_config,
load_vertex_settings,
)
from deployer.utils.logging import LoguruLevel
from deployer.utils.utils import (
import_pipeline_from_dir,
make_enum_from_python_package_dir,
)

Expand Down Expand Up @@ -122,7 +129,7 @@ def deploy(
),
] = DEFAULT_LOCAL_PACKAGE_PATH,
):
"""Deploy and manage Vertex AI Pipelines."""
"""Compile, upload, run and schedule pipelines."""
vertex_settings = load_vertex_settings(env_file=env_file)

pipeline_func = import_pipeline_from_dir(PIPELINE_ROOT_PATH, pipeline_name.value)
Expand Down Expand Up @@ -192,7 +199,7 @@ def check(
),
] = None,
):
"""Check that all pipelines are valid.
"""Check that pipelines are valid.
Checking that a pipeline is valid includes:
Expand Down Expand Up @@ -241,3 +248,62 @@ def check(
for config_filepath in pipeline.config_paths:
log_message += f" - {config_filepath.name}\n"
logger.opt(ansi=True).success(log_message)


@app.command()
def list(
with_configs: Annotated[
bool,
typer.Option(
"--with-configs / --no-configs", "-wc / -nc ", help="Whether to list config files."
),
] = False
):
"""List all pipelines."""
log_msg = "Available pipelines:\n"
if len(PipelineName.__members__) == 0:
log_msg += (
"<yellow>No pipeline found. Please check that the pipeline root path is"
f" correct ('{PIPELINE_ROOT_PATH}')</yellow>"
)
else:
for pipeline_name in PipelineName.__members__.values():
log_msg += f"- {pipeline_name.value}\n"

if with_configs:
config_filepaths = list_config_filepaths(CONFIG_ROOT_PATH, pipeline_name.value)
if len(config_filepaths) == 0:
log_msg += " <yellow>- No config file found</yellow>\n"
for config_filepath in config_filepaths:
log_msg += f" - {config_filepath.name}\n"

logger.opt(ansi=True).info(log_msg)


@app.command(no_args_is_help=True)
def create(
pipeline_name: Annotated[
str,
typer.Argument(..., help="The name of the pipeline to create."),
],
config_type: Annotated[
ConfigType,
typer.Option("--config-type", "-ct", help="The type of the config to create."),
] = ConfigType.json,
):
"""Create files structure for a new pipeline."""
logger.info(f"Creating pipeline {pipeline_name}")

pipeline_filepath = Path(PIPELINE_ROOT_PATH) / f"{pipeline_name}.py"
pipeline_filepath.touch(exist_ok=False)
pipeline_filepath.write_text(PIPELINE_MINIMAL_TEMPLATE.format(pipeline_name=pipeline_name))

config_dirpath = Path(CONFIG_ROOT_PATH) / pipeline_name
config_dirpath.mkdir(exist_ok=False)
for config_name in ["test", "dev", "prod"]:
config_filepath = config_dirpath / f"{config_name}.{config_type}"
config_filepath.touch(exist_ok=False)
if config_type == ConfigType.py:
config_filepath.write_text(PYTHON_CONFIG_TEMPLATE)

logger.info(f"Pipeline {pipeline_name} created with configs in {config_dirpath}")
17 changes: 17 additions & 0 deletions deployer/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,20 @@
DEFAULT_TAGS = ["latest"]

TEMP_LOCAL_PACKAGE_PATH = ".vertex-deployer-temp"


PIPELINE_MINIMAL_TEMPLATE = """import kfp.dsl
@kfp.dsl.pipeline(name="{pipeline_name}")
def pipeline():
pass
"""

PYTHON_CONFIG_TEMPLATE = """from kfp.dsl import Artifact, Dataset, Input, Output, Metrics
parameter_values = {}
input_artifacts = {}
"""
15 changes: 6 additions & 9 deletions deployer/pipeline_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
PIPELINE_ROOT_PATH,
TEMP_LOCAL_PACKAGE_PATH,
)
from deployer.models import CustomBaseModel, create_model_from_pipeline
from deployer.pipelines_deployer import VertexPipelineDeployer
from deployer.utils import (
disable_logger,
from deployer.pipeline_deployer import VertexPipelineDeployer
from deployer.utils.config import list_config_filepaths, load_config
from deployer.utils.logging import disable_logger
from deployer.utils.models import CustomBaseModel, create_model_from_pipeline
from deployer.utils.utils import (
import_pipeline_from_dir,
load_config,
make_enum_from_python_package_dir,
)

Expand All @@ -41,10 +41,7 @@ class Pipeline(CustomBaseModel):
def populate_config_names(cls, data: Any) -> Any:
"""Populate config names before validation"""
if data.get("config_paths") is None:
configs_dirpath = Path(CONFIG_ROOT_PATH) / data["pipeline_name"]
data["config_paths"] = []
for config_type in ["py", "json"]:
data["config_paths"] += list(configs_dirpath.glob(f"*.{config_type}"))
data["config_paths"] = list_config_filepaths(CONFIG_ROOT_PATH, data["pipeline_name"])
return data

@computed_field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from requests import HTTPError

from deployer.constants import DEFAULT_LOCAL_PACKAGE_PATH, DEFAULT_SCHEDULER_TIMEZONE
from deployer.exceptions import MissingGoogleArtifactRegistryHostError, TagNotFoundError
from deployer.utils.exceptions import (
MissingGoogleArtifactRegistryHostError,
TagNotFoundError,
)


class VertexPipelineDeployer:
Expand Down
90 changes: 25 additions & 65 deletions deployer/utils.py → deployer/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,10 @@
from enum import Enum
from pathlib import Path

from kfp.components import graph_component
from loguru import logger
from pydantic import ValidationError
from pydantic_settings import BaseSettings, SettingsConfigDict

from deployer.exceptions import UnsupportedConfigFileError


class LoguruLevel(str, Enum): # noqa: D101
TRACE = "TRACE"
DEBUG = "DEBUG"
INFO = "INFO"
SUCCESS = "SUCCESS"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"


def make_enum_from_python_package_dir(dir_path: Path) -> Enum:
"""Create an Enum of file names without extention from a directory of python modules."""
if not (dir_path_ := Path(dir_path)).exists():
raise FileNotFoundError(f"Directory {dir_path_} not found.")
file_paths = dir_path_.glob("*.py")
enum_dict = {x.stem: x.stem for x in file_paths if x.stem != "__init__"}
FileNamesEnum = Enum("PipelineNames", enum_dict)
return FileNamesEnum


def import_pipeline_from_dir(dirpath: Path, pipeline_name: str) -> graph_component.GraphComponent:
"""Import a pipeline from a directory."""
if dirpath.startswith("."):
dirpath = dirpath[1:]
parent_module = ".".join(Path(dirpath).parts)
module_path = f"{parent_module}.{pipeline_name}"

try:
pipeline_module = importlib.import_module(module_path)
except ModuleNotFoundError as e:
raise e
except Exception as e:
raise ImportError(
f"Error while importing pipeline from {module_path}: {e.__repr__()}"
) from e

try:
pipeline: graph_component.GraphComponent | None = pipeline_module.pipeline
except AttributeError as e:
raise ImportError(
f"Pipeline {module_path}:pipeline not found. "
"Please check that the pipeline is correctly defined and named."
) from e

logger.debug(f"Pipeline {module_path} imported successfully.")

return pipeline
from deployer.utils.exceptions import UnsupportedConfigFileError


class VertexPipelinesSettings(BaseSettings): # noqa: D101
Expand Down Expand Up @@ -86,6 +35,30 @@ def load_vertex_settings(env_file: Path | None = None) -> VertexPipelinesSetting
return settings


class ConfigType(str, Enum): # noqa: D101
json = "json"
py = "py"


def list_config_filepaths(config_root_path: Path | str, pipeline_name: str) -> list[Path]:
"""List the config filepaths for a pipeline.
Args:
config_root_path (Path): A `Path` object representing the root path of the configs.
pipeline_name (str): The name of the pipeline.
Returns:
list[Path]: A list of `Path` objects representing the config filepaths.
"""
configs_dirpath = Path(config_root_path) / pipeline_name
config_filepaths = [
x
for config_type in ConfigType.__members__.values()
for x in configs_dirpath.glob(f"*.{config_type}")
]
return config_filepaths


def load_config(config_filepath: Path) -> tuple[dict | None, dict | None]:
"""Load the parameter values and input artifacts from a config file.
Expand Down Expand Up @@ -159,16 +132,3 @@ def _load_config_python(config_filepath: Path) -> tuple[dict | None, dict | None
)

return parameter_values, input_artifacts


class disable_logger(object):
"""Context manager to disable a loguru logger."""

def __init__(self, name: str) -> None: # noqa: D107
self.name = name

def __enter__(self) -> None: # noqa: D105
logger.disable(self.name)

def __exit__(self, exc_type, exc_val, exc_tb) -> None: # noqa: D105
logger.enable(self.name)
File renamed without changes.
26 changes: 26 additions & 0 deletions deployer/utils/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from enum import Enum

from loguru import logger


class LoguruLevel(str, Enum): # noqa: D101
TRACE = "TRACE"
DEBUG = "DEBUG"
INFO = "INFO"
SUCCESS = "SUCCESS"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"


class disable_logger(object):
"""Context manager to disable a loguru logger."""

def __init__(self, name: str) -> None: # noqa: D107
self.name = name

def __enter__(self) -> None: # noqa: D105
logger.disable(self.name)

def __exit__(self, exc_type, exc_val, exc_tb) -> None: # noqa: D105
logger.enable(self.name)
4 changes: 2 additions & 2 deletions deployer/models.py → deployer/utils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class CustomBaseModel(BaseModel):
model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)


def convert_artifact_type_to_str(annotation: type) -> type:
def _convert_artifact_type_to_str(annotation: type) -> type:
"""Convert a kfp.dsl.Artifact type to a string.
This is mandatory for type checking, as kfp.dsl.Artifact types should be passed as strings
Expand All @@ -32,7 +32,7 @@ def create_model_from_pipeline(
"""Create a Pydantic model from pipeline parameters."""
pipeline_signature = signature(pipeline.pipeline_func)
pipeline_typing = {
p.name: convert_artifact_type_to_str(p.annotation)
p.name: _convert_artifact_type_to_str(p.annotation)
for p in pipeline_signature.parameters.values()
}

Expand Down
Loading

0 comments on commit 9b973bf

Please sign in to comment.