Skip to content

Commit

Permalink
Integrating composite triggers with the DeploymentTrigger YAML repr…
Browse files Browse the repository at this point in the history
…esentation (#12413)
  • Loading branch information
chrisguidry committed Mar 26, 2024
1 parent df31924 commit e173697
Show file tree
Hide file tree
Showing 9 changed files with 588 additions and 134 deletions.
11 changes: 9 additions & 2 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
from typing import Any, Dict, List, Optional, Tuple
from uuid import UUID

from prefect._internal.pydantic import HAS_PYDANTIC_V2

if HAS_PYDANTIC_V2:
import pydantic.v1 as pydantic
else:
import pydantic

import typer
import typer.core
import yaml
Expand Down Expand Up @@ -54,7 +61,7 @@
_save_deployment_to_prefect_file,
)
from prefect.deployments.steps.core import run_steps
from prefect.events.schemas import DeploymentTrigger
from prefect.events.schemas import DeploymentTrigger, DeploymentTriggerTypes
from prefect.exceptions import ObjectNotFound
from prefect.flows import load_flow_from_entrypoint
from prefect.settings import (
Expand Down Expand Up @@ -1597,7 +1604,7 @@ def _initialize_deployment_triggers(
triggers = []
for i, spec in enumerate(triggers_spec, start=1):
spec.setdefault("name", f"{deployment_name}__automation_{i}")
triggers.append(DeploymentTrigger(**spec))
triggers.append(pydantic.parse_obj_as(DeploymentTriggerTypes, spec))

return triggers

Expand Down
4 changes: 2 additions & 2 deletions src/prefect/deployments/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def fast_flow():
create_minimal_deployment_schedule,
normalize_to_minimal_deployment_schedules,
)
from prefect.events.schemas import DeploymentTrigger
from prefect.events.schemas import DeploymentTrigger, DeploymentTriggerTypes
from prefect.exceptions import (
ObjectNotFound,
PrefectHTTPStatusError,
Expand Down Expand Up @@ -175,7 +175,7 @@ class Config:
"The path to the entrypoint for the workflow, relative to the `path`."
),
)
triggers: List[DeploymentTrigger] = Field(
triggers: List[DeploymentTriggerTypes] = Field(
default_factory=list,
description="The triggers that should cause this deployment to run.",
)
Expand Down
20 changes: 19 additions & 1 deletion src/prefect/events/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
from .schemas import Event, RelatedResource, Resource
from .schemas import (
Event,
RelatedResource,
Resource,
Trigger,
ResourceTrigger,
EventTrigger,
MetricTrigger,
CompositeTrigger,
CompoundTrigger,
SequenceTrigger,
)
from .utilities import emit_event

__all__ = [
"Event",
"Resource",
"RelatedResource",
"emit_event",
"Trigger",
"ResourceTrigger",
"EventTrigger",
"MetricTrigger",
"CompositeTrigger",
"CompoundTrigger",
"SequenceTrigger",
]
271 changes: 200 additions & 71 deletions src/prefect/events/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,14 +453,88 @@ class ExistingAutomation(Automation):
id: UUID = Field(..., description="The ID of this automation")


class AutomationCreateFromTrigger(PrefectBaseModel):
# Schemas for defining triggers within a Prefect deployment YAML. This is a separate
# parallel hierarchy for representing triggers so that they can also include the
# information necessary to create an automation.
#
# These triggers should follow the validation rules of the main Trigger class hierarchy
# as closely as possible (because otherwise users will get validation errors creating
# triggers), but we can be more liberal with the defaults here to make it simpler to
# create them from YAML.


class DeploymentTrigger(PrefectBaseModel, abc.ABC):
"""
Base class describing a set of criteria that must be satisfied in order to trigger
an automation.
"""

class Config:
extra = Extra.ignore

# Fields from Automation

name: Optional[str] = Field(
None, description="The name to give to the automation created for this trigger."
)
description: str = Field("", description="A longer description of this automation")
enabled: bool = Field(True, description="Whether this automation will be evaluated")

# from ResourceTrigger
# Fields from Trigger

type: str

# Fields from Deployment

parameters: Optional[Dict[str, Any]] = Field(
None,
description=(
"The parameters to pass to the deployment, or None to use the "
"deployment's default parameters"
),
)

_deployment_id: Optional[UUID] = PrivateAttr(default=None)

def set_deployment_id(self, deployment_id: UUID):
self._deployment_id = deployment_id

def owner_resource(self) -> Optional[str]:
return f"prefect.deployment.{self._deployment_id}"

def actions(self) -> List[RunDeployment]:
assert self._deployment_id
return [
RunDeployment(
parameters=self.parameters,
deployment_id=self._deployment_id,
)
]

def as_automation(self) -> Automation:
if not self.name:
raise ValueError("name is required")

return Automation(
name=self.name,
description=self.description,
enabled=self.enabled,
trigger=self.as_trigger(),
actions=self.actions(),
owner_resource=self.owner_resource(),
)

@abc.abstractmethod
def as_trigger(self) -> Trigger:
...


class DeploymentResourceTrigger(DeploymentTrigger, abc.ABC):
"""
Base class for triggers that may filter by the labels of resources.
"""

type: str

match: ResourceSpecification = Field(
default_factory=lambda: ResourceSpecification(__root__={}),
Expand All @@ -471,19 +545,14 @@ class AutomationCreateFromTrigger(PrefectBaseModel):
description="Labels for related resources which this trigger will match.",
)

# from both EventTrigger and MetricTrigger

posture: Posture = Field(
Posture.Reactive,
description=(
"The posture of this trigger, either Reactive, Proactive, or Metric. "
"Reactive triggers respond to the _presence_ of the expected events, while "
"Proactive triggers respond to the _absence_ of those expected events. "
"Metric triggers periodically evaluate the configured metric query."
),
)
class DeploymentEventTrigger(DeploymentResourceTrigger):
"""
A trigger that fires based on the presence or absence of events within a given
period of time.
"""

# from EventTrigger
type: Literal["event"] = "event"

after: Set[str] = Field(
default_factory=set,
Expand Down Expand Up @@ -514,6 +583,14 @@ class AutomationCreateFromTrigger(PrefectBaseModel):
"evaluate the trigger for each flow."
),
)
posture: Literal[Posture.Reactive, Posture.Proactive] = Field( # type: ignore[valid-type]
Posture.Reactive,
description=(
"The posture of this trigger, either Reactive or Proactive. Reactive "
"triggers respond to the _presence_ of the expected events, while "
"Proactive triggers respond to the _absence_ of those expected events."
),
)
threshold: int = Field(
1,
description=(
Expand All @@ -533,73 +610,125 @@ class AutomationCreateFromTrigger(PrefectBaseModel):
),
)

# from MetricTrigger
@validator("within")
def enforce_minimum_within(
cls, value: timedelta, values, config, field: ModelField
):
minimum = field.field_info.extra["minimum"]
if value.total_seconds() < minimum:
raise ValueError("The minimum within is 0 seconds")
return value

metric: Optional[MetricTriggerQuery] = Field(
None,
description="The metric query to evaluate for this trigger. ",
)
@root_validator(skip_on_failure=True)
def enforce_minimum_within_for_proactive_triggers(cls, values: Dict[str, Any]):
posture: Optional[Posture] = values.get("posture")
within: Optional[timedelta] = values.get("within")

def as_automation(self) -> Automation:
assert self.name

if self.posture == Posture.Metric:
trigger = MetricTrigger(
type="metric",
match=self.match,
match_related=self.match_related,
posture=self.posture,
metric=self.metric,
)
else:
trigger = EventTrigger(
match=self.match,
match_related=self.match_related,
after=self.after,
expect=self.expect,
for_each=self.for_each,
posture=self.posture,
threshold=self.threshold,
within=self.within,
)
if posture == Posture.Proactive:
if not within or within == timedelta(0):
values["within"] = timedelta(seconds=10.0)
elif within < timedelta(seconds=10.0):
raise ValueError(
"The minimum within for Proactive triggers is 10 seconds"
)

return Automation(
name=self.name,
description=self.description,
enabled=self.enabled,
trigger=trigger,
actions=self.actions(),
owner_resource=self.owner_resource(),
return values

def as_trigger(self) -> Trigger:
return EventTrigger(
match=self.match,
match_related=self.match_related,
after=self.after,
expect=self.expect,
for_each=self.for_each,
posture=self.posture,
threshold=self.threshold,
within=self.within,
)

def owner_resource(self) -> Optional[str]:
return None

def actions(self) -> List[ActionTypes]:
raise NotImplementedError
class DeploymentMetricTrigger(DeploymentResourceTrigger):
"""
A trigger that fires based on the results of a metric query.
"""

type: Literal["metric"] = "metric"

class DeploymentTrigger(AutomationCreateFromTrigger):
_deployment_id: Optional[UUID] = PrivateAttr(default=None)
parameters: Optional[Dict[str, Any]] = Field(
None,
description=(
"The parameters to pass to the deployment, or None to use the "
"deployment's default parameters"
),
posture: Literal[Posture.Metric] = Field( # type: ignore[valid-type]
Posture.Metric,
description="Periodically evaluate the configured metric query.",
)

def set_deployment_id(self, deployment_id: UUID):
self._deployment_id = deployment_id
metric: MetricTriggerQuery = Field(
...,
description="The metric query to evaluate for this trigger. ",
)

def owner_resource(self) -> Optional[str]:
return f"prefect.deployment.{self._deployment_id}"
def as_trigger(self) -> Trigger:
return MetricTrigger(
match=self.match,
match_related=self.match_related,
posture=self.posture,
metric=self.metric,
)

def actions(self) -> List[RunDeployment]:
assert self._deployment_id
return [
RunDeployment(
parameters=self.parameters,
deployment_id=self._deployment_id,
)
]

class DeploymentCompositeTrigger(DeploymentTrigger, abc.ABC):
"""
Requires some number of triggers to have fired within the given time period.
"""

type: Literal["compound", "sequence"]
triggers: List["TriggerTypes"]
within: Optional[timedelta]


class DeploymentCompoundTrigger(DeploymentCompositeTrigger):
"""A composite trigger that requires some number of triggers to have
fired within the given time period"""

type: Literal["compound"] = "compound"
require: Union[int, Literal["any", "all"]]

@root_validator
def validate_require(cls, values: Dict[str, Any]) -> Dict[str, Any]:
require = values.get("require")

if isinstance(require, int):
if require < 1:
raise ValueError("required must be at least 1")
if require > len(values["triggers"]):
raise ValueError(
"required must be less than or equal to the number of triggers"
)

return values

def as_trigger(self) -> Trigger:
return CompoundTrigger(
require=self.require,
triggers=self.triggers,
within=self.within,
)


class DeploymentSequenceTrigger(DeploymentCompositeTrigger):
"""A composite trigger that requires some number of triggers to have fired
within the given time period in a specific order"""

type: Literal["sequence"] = "sequence"

def as_trigger(self) -> Trigger:
return SequenceTrigger(
triggers=self.triggers,
within=self.within,
)


# Concrete deployment trigger types
DeploymentTriggerTypes: TypeAlias = Union[
DeploymentEventTrigger,
DeploymentMetricTrigger,
DeploymentCompoundTrigger,
DeploymentSequenceTrigger,
]
Loading

0 comments on commit e173697

Please sign in to comment.