Skip to content

Commit

Permalink
Monitor implementation, renamed example directory
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlo Antonio Venditti committed May 17, 2022
1 parent d0ab34c commit 11a2a4d
Show file tree
Hide file tree
Showing 18 changed files with 47 additions and 10 deletions.
File renamed without changes.
6 changes: 6 additions & 0 deletions examples/basic_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class BasicMonitor:
def __init__(self, yup):
self.yup = yup

def job_start(self, pipeline):
print(f"I'm printing from a monitor with {self.yup}")
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
4 changes: 4 additions & 0 deletions example/pipelines.yml → examples/pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
on: job_start
config:
pippo: 12333
monitor:
use: BasicMonitor
with:
yup: 'a tunable parameter'


simple_task:
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
36 changes: 27 additions & 9 deletions src/yapp/cli/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import re
import types
from copy import copy
from collections import defaultdict
from types import MethodType

Expand Down Expand Up @@ -78,9 +79,9 @@ class ConfigParser:
"hooks",
"monitor",
"config",
} # TODO add generic config directly using "config"
# Auxiliary fields, not used for steps definition
config_fields = valid_fields - {"steps", "config"}
}
# Auxiliary fields, all lists
config_fields = valid_fields - {"steps", "config", "monitor"}

def __init__(self, pipeline_name, path="./", pipelines_file="pipelines.yml"):
self.pipeline_name = pipeline_name
Expand All @@ -103,7 +104,8 @@ def load_module(self, module_name):
# Remove eventual trailing ".py" and split at dots
ref = re.sub(r"\.py$", "", module_name).split(".")
# same but snake_case
ref_snake = re.sub(r"\.py$", "", module_name).split(".")
ref_snake = copy(ref)
ref_snake[-1] = camel_to_snake(ref_snake[-1])
# return a possible path for each base_path for both possible names
paths = [os.path.join(*[base_path] + ref) for base_path in self.base_paths]
paths += [
Expand All @@ -113,7 +115,9 @@ def load_module(self, module_name):
for path in paths:
logging.debug("Trying path %s for module %s", path, module_name)
try:
spec = importlib.util.spec_from_file_location(module_name, path + ".py")
# Module name may differ from the original name (camel_to_snake)
name = os.path.split(path)[-1]
spec = importlib.util.spec_from_file_location(name, path + ".py")
except FileNotFoundError:
continue

Expand Down Expand Up @@ -241,7 +245,7 @@ def build_job(self, step, params): # pylint: disable=no-self-use

return job

def build_pipeline(self, pipeline_cfg, inputs=None, outputs=None, hooks=None):
def build_pipeline(self, pipeline_cfg, inputs=None, outputs=None, hooks=None, monitor=None):
"""
Creates pipeline from pipeline and config definition dicts
"""
Expand Down Expand Up @@ -285,7 +289,7 @@ def make_dag(step_list):
hooks = {}

return Pipeline(
jobs, name=self.pipeline_name, inputs=inputs, outputs=outputs, **hooks
jobs, name=self.pipeline_name, inputs=inputs, outputs=outputs, monitor=monitor, **hooks
)

def create_adapter(self, adapter_name: str, params: dict):
Expand Down Expand Up @@ -407,6 +411,16 @@ def build_hooks(self, cfg_hooks):
logging.debug('Parsed hooks: %s"', hooks)
return hooks

def build_monitor(self, cfg_monitor):
"""
Sets up monitor from `monitor` field in YAML files
"""
if cfg_monitor:
module = self.load_module(cfg_monitor['use'])
monitor = getattr(module, cfg_monitor['use'])
return monitor(**cfg_monitor['with'])
return None

def do_validation(self, pipelines_yaml: dict):
"""
Performs validation on a dict read from a pipelines.yml file
Expand All @@ -424,7 +438,7 @@ def do_validation(self, pipelines_yaml: dict):
logging.error(
"Configuration errors for pipelines: %s", list(config_errors.keys())
)
if self.pipeline_name in config_errors:
if self.pipeline_name in config_errors or "+all" in config_errors:
raise ConfigurationError(
config_errors, relevant_field=self.pipeline_name
)
Expand Down Expand Up @@ -481,12 +495,16 @@ def parse(self, skip_validation=False):
pipeline_config = pipeline_cfg.get("config", {})
global_config.update(pipeline_config)

cfg_monitor = cfg.get("monitor")
cfg_monitor = pipeline_cfg.get("monitor", cfg_monitor)

# Building objects
inputs = self.build_inputs(cfg["inputs"], global_config)
outputs = self.build_outputs(cfg["outputs"])
hooks = self.build_hooks(cfg["hooks"])
monitor = self.build_monitor(cfg_monitor)
pipeline = self.build_pipeline(
pipeline_cfg, inputs=inputs, outputs=outputs, hooks=hooks
pipeline_cfg, inputs=inputs, outputs=outputs, hooks=hooks, monitor=monitor
)

return pipeline
Expand Down
10 changes: 9 additions & 1 deletion src/yapp/cli/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def check_code_reference(field, value, error):
if not isinstance(value, str) or (
not re.match(r"^[a-zA-Z_.][a-zA-Z0-9_.]*[\.[a-zA-Z_.]+[a-zA-Z0-9_.]*]*$", value)
):
error(field, f"{value} is not a valid reference string")
error(field, f'"{value}" is not a valid reference string')


input_expose_schema = {
Expand Down Expand Up @@ -140,6 +140,14 @@ def check_code_reference(field, value, error):
"required": False,
"type": "dict",
},
"monitor": {
"required": False,
"allow_unknown": False,
"schema": {
"use": {"type": "string", "check_with": check_code_reference},
"with": {"required": False, "type": "dict"},
},
},
}

# used for special `+all` field
Expand Down
1 change: 1 addition & 0 deletions src/yapp/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def __init__(
# If monitor object has it, use the method
if hasattr(monitor, hook_name) and callable(getattr(monitor, hook_name)):
new_hooks.append(getattr(monitor, hook_name))
logging.debug('Adding %s from monitor: %s', hook_name, monitor)
setattr(self, hook_name, new_hooks)

# current job if any
Expand Down

0 comments on commit 11a2a4d

Please sign in to comment.