Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrating composite triggers with the DeploymentTrigger YAML representation #12413

Merged
merged 5 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
from typing import Any, Dict, List, Optional, Tuple
from uuid import UUID

from prefect._internal.pydantic import HAS_PYDANTIC_V2

if HAS_PYDANTIC_V2:
import pydantic.v1 as pydantic
else:
import pydantic

import typer
import typer.core
import yaml
Expand Down Expand Up @@ -54,7 +61,7 @@
_save_deployment_to_prefect_file,
)
from prefect.deployments.steps.core import run_steps
from prefect.events.schemas import DeploymentTrigger
from prefect.events.schemas import DeploymentTrigger, DeploymentTriggerTypes
from prefect.exceptions import ObjectNotFound
from prefect.flows import load_flow_from_entrypoint
from prefect.settings import (
Expand Down Expand Up @@ -1597,7 +1604,7 @@ def _initialize_deployment_triggers(
triggers = []
for i, spec in enumerate(triggers_spec, start=1):
spec.setdefault("name", f"{deployment_name}__automation_{i}")
triggers.append(DeploymentTrigger(**spec))
triggers.append(pydantic.parse_obj_as(DeploymentTriggerTypes, spec))

return triggers

Expand Down
4 changes: 2 additions & 2 deletions src/prefect/deployments/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def fast_flow():
create_minimal_deployment_schedule,
normalize_to_minimal_deployment_schedules,
)
from prefect.events.schemas import DeploymentTrigger
from prefect.events.schemas import DeploymentTrigger, DeploymentTriggerTypes
from prefect.exceptions import (
ObjectNotFound,
PrefectHTTPStatusError,
Expand Down Expand Up @@ -175,7 +175,7 @@ class Config:
"The path to the entrypoint for the workflow, relative to the `path`."
),
)
triggers: List[DeploymentTrigger] = Field(
triggers: List[DeploymentTriggerTypes] = Field(
default_factory=list,
description="The triggers that should cause this deployment to run.",
)
Expand Down
20 changes: 19 additions & 1 deletion src/prefect/events/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
from .schemas import Event, RelatedResource, Resource
from .schemas import (
Event,
RelatedResource,
Resource,
Trigger,
ResourceTrigger,
EventTrigger,
MetricTrigger,
CompositeTrigger,
CompoundTrigger,
SequenceTrigger,
)
from .utilities import emit_event

__all__ = [
"Event",
"Resource",
"RelatedResource",
"emit_event",
"Trigger",
"ResourceTrigger",
"EventTrigger",
"MetricTrigger",
"CompositeTrigger",
"CompoundTrigger",
"SequenceTrigger",
]
271 changes: 200 additions & 71 deletions src/prefect/events/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,14 +453,88 @@ class ExistingAutomation(Automation):
id: UUID = Field(..., description="The ID of this automation")


class AutomationCreateFromTrigger(PrefectBaseModel):
# Schemas for defining triggers within a Prefect deployment YAML. This is a separate
# parallel hierarchy for representing triggers so that they can also include the
# information necessary to create an automation.
#
# These triggers should follow the validation rules of the main Trigger class hierarchy
# as closely as possible (because otherwise users will get validation errors creating
# triggers), but we can be more liberal with the defaults here to make it simpler to
# create them from YAML.


class DeploymentTrigger(PrefectBaseModel, abc.ABC):
"""
Base class describing a set of criteria that must be satisfied in order to trigger
an automation.
"""

class Config:
extra = Extra.ignore

# Fields from Automation

name: Optional[str] = Field(
None, description="The name to give to the automation created for this trigger."
)
description: str = Field("", description="A longer description of this automation")
enabled: bool = Field(True, description="Whether this automation will be evaluated")

# from ResourceTrigger
# Fields from Trigger

type: str

# Fields from Deployment

parameters: Optional[Dict[str, Any]] = Field(
None,
description=(
"The parameters to pass to the deployment, or None to use the "
"deployment's default parameters"
),
)

_deployment_id: Optional[UUID] = PrivateAttr(default=None)

def set_deployment_id(self, deployment_id: UUID):
self._deployment_id = deployment_id

def owner_resource(self) -> Optional[str]:
return f"prefect.deployment.{self._deployment_id}"

def actions(self) -> List[RunDeployment]:
assert self._deployment_id
return [
RunDeployment(
parameters=self.parameters,
deployment_id=self._deployment_id,
)
]

def as_automation(self) -> Automation:
if not self.name:
raise ValueError("name is required")

return Automation(
name=self.name,
description=self.description,
enabled=self.enabled,
trigger=self.as_trigger(),
actions=self.actions(),
owner_resource=self.owner_resource(),
)

@abc.abstractmethod
def as_trigger(self) -> Trigger:
...


class DeploymentResourceTrigger(DeploymentTrigger, abc.ABC):
"""
Base class for triggers that may filter by the labels of resources.
"""

type: str

match: ResourceSpecification = Field(
default_factory=lambda: ResourceSpecification(__root__={}),
Expand All @@ -471,19 +545,14 @@ class AutomationCreateFromTrigger(PrefectBaseModel):
description="Labels for related resources which this trigger will match.",
)

# from both EventTrigger and MetricTrigger

posture: Posture = Field(
Posture.Reactive,
description=(
"The posture of this trigger, either Reactive, Proactive, or Metric. "
"Reactive triggers respond to the _presence_ of the expected events, while "
"Proactive triggers respond to the _absence_ of those expected events. "
"Metric triggers periodically evaluate the configured metric query."
),
)
class DeploymentEventTrigger(DeploymentResourceTrigger):
"""
A trigger that fires based on the presence or absence of events within a given
period of time.
"""

# from EventTrigger
type: Literal["event"] = "event"

after: Set[str] = Field(
default_factory=set,
Expand Down Expand Up @@ -514,6 +583,14 @@ class AutomationCreateFromTrigger(PrefectBaseModel):
"evaluate the trigger for each flow."
),
)
posture: Literal[Posture.Reactive, Posture.Proactive] = Field( # type: ignore[valid-type]
Posture.Reactive,
description=(
"The posture of this trigger, either Reactive or Proactive. Reactive "
"triggers respond to the _presence_ of the expected events, while "
"Proactive triggers respond to the _absence_ of those expected events."
),
)
threshold: int = Field(
1,
description=(
Expand All @@ -533,73 +610,125 @@ class AutomationCreateFromTrigger(PrefectBaseModel):
),
)

# from MetricTrigger
@validator("within")
def enforce_minimum_within(
cls, value: timedelta, values, config, field: ModelField
):
minimum = field.field_info.extra["minimum"]
if value.total_seconds() < minimum:
raise ValueError("The minimum within is 0 seconds")
return value

metric: Optional[MetricTriggerQuery] = Field(
None,
description="The metric query to evaluate for this trigger. ",
)
@root_validator(skip_on_failure=True)
def enforce_minimum_within_for_proactive_triggers(cls, values: Dict[str, Any]):
posture: Optional[Posture] = values.get("posture")
within: Optional[timedelta] = values.get("within")

def as_automation(self) -> Automation:
assert self.name

if self.posture == Posture.Metric:
trigger = MetricTrigger(
type="metric",
match=self.match,
match_related=self.match_related,
posture=self.posture,
metric=self.metric,
)
else:
trigger = EventTrigger(
match=self.match,
match_related=self.match_related,
after=self.after,
expect=self.expect,
for_each=self.for_each,
posture=self.posture,
threshold=self.threshold,
within=self.within,
)
if posture == Posture.Proactive:
if not within or within == timedelta(0):
values["within"] = timedelta(seconds=10.0)
elif within < timedelta(seconds=10.0):
raise ValueError(
"The minimum within for Proactive triggers is 10 seconds"
)

return Automation(
name=self.name,
description=self.description,
enabled=self.enabled,
trigger=trigger,
actions=self.actions(),
owner_resource=self.owner_resource(),
return values

def as_trigger(self) -> Trigger:
return EventTrigger(
match=self.match,
match_related=self.match_related,
after=self.after,
expect=self.expect,
for_each=self.for_each,
posture=self.posture,
threshold=self.threshold,
within=self.within,
)

def owner_resource(self) -> Optional[str]:
return None

def actions(self) -> List[ActionTypes]:
raise NotImplementedError
class DeploymentMetricTrigger(DeploymentResourceTrigger):
"""
A trigger that fires based on the results of a metric query.
"""

type: Literal["metric"] = "metric"

class DeploymentTrigger(AutomationCreateFromTrigger):
_deployment_id: Optional[UUID] = PrivateAttr(default=None)
parameters: Optional[Dict[str, Any]] = Field(
None,
description=(
"The parameters to pass to the deployment, or None to use the "
"deployment's default parameters"
),
posture: Literal[Posture.Metric] = Field( # type: ignore[valid-type]
Posture.Metric,
description="Periodically evaluate the configured metric query.",
)

def set_deployment_id(self, deployment_id: UUID):
self._deployment_id = deployment_id
metric: MetricTriggerQuery = Field(
...,
description="The metric query to evaluate for this trigger. ",
)

def owner_resource(self) -> Optional[str]:
return f"prefect.deployment.{self._deployment_id}"
def as_trigger(self) -> Trigger:
return MetricTrigger(
match=self.match,
match_related=self.match_related,
posture=self.posture,
metric=self.metric,
)

def actions(self) -> List[RunDeployment]:
assert self._deployment_id
return [
RunDeployment(
parameters=self.parameters,
deployment_id=self._deployment_id,
)
]

class DeploymentCompositeTrigger(DeploymentTrigger, abc.ABC):
"""
Requires some number of triggers to have fired within the given time period.
"""

type: Literal["compound", "sequence"]
triggers: List["TriggerTypes"]
within: Optional[timedelta]


class DeploymentCompoundTrigger(DeploymentCompositeTrigger):
"""A composite trigger that requires some number of triggers to have
fired within the given time period"""

type: Literal["compound"] = "compound"
require: Union[int, Literal["any", "all"]]

@root_validator
def validate_require(cls, values: Dict[str, Any]) -> Dict[str, Any]:
require = values.get("require")

if isinstance(require, int):
if require < 1:
raise ValueError("required must be at least 1")
if require > len(values["triggers"]):
raise ValueError(
"required must be less than or equal to the number of triggers"
)

return values

def as_trigger(self) -> Trigger:
return CompoundTrigger(
require=self.require,
triggers=self.triggers,
within=self.within,
)


class DeploymentSequenceTrigger(DeploymentCompositeTrigger):
"""A composite trigger that requires some number of triggers to have fired
within the given time period in a specific order"""

type: Literal["sequence"] = "sequence"

def as_trigger(self) -> Trigger:
return SequenceTrigger(
triggers=self.triggers,
within=self.within,
)


# Concrete deployment trigger types
DeploymentTriggerTypes: TypeAlias = Union[
DeploymentEventTrigger,
DeploymentMetricTrigger,
DeploymentCompoundTrigger,
DeploymentSequenceTrigger,
]
Loading