Skip to content

Commit fd6690a

Browse files
authored
Merge cfa7ad7 into 925d545
2 parents 925d545 + cfa7ad7 commit fd6690a

File tree

6 files changed

+414
-53
lines changed

6 files changed

+414
-53
lines changed

examples/prefect/deploy_flow.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Example usage of deploy function"""
2+
3+
from datetime import timedelta
4+
from os import environ
5+
6+
from prefect import flow
7+
from prefect.artifacts import create_table_artifact
8+
9+
from osw.utils.workflow import DeployConfig, DeployParam, deploy, tags_str_to_list
10+
11+
# Set environment variables
12+
environ["PREFECT_DEPLOYMENT_NAME"] = "osw-python-deploy-example"
13+
environ["PREFECT_DEPLOYMENT_DESCRIPTION"] = "Deployment of notify_teams.py"
14+
environ["PREFECT_DEPLOYMENT_VERSION"] = "0.0.1"
15+
environ["PREFECT_DEPLOYMENT_TAGS"] = "osw-python,example-deploy-flow"
16+
environ["PREFECT_DEPLOYMENT_INTERVAL_MIN"] = "1"
17+
# environ["PREFECT_DEPLOYMENT_CRON"] = "* * * * *"
18+
19+
20+
@flow(log_prints=True)
21+
def example_flow_to_deploy():
22+
"""Example flow to be deployed"""
23+
print(f"Execution of example: {example_flow_to_deploy.__name__}!")
24+
# set example table artifact
25+
create_table_artifact(
26+
key="example-table",
27+
table=[
28+
{"name": "Alice", "age": 24},
29+
{"name": "Bob", "age": 25},
30+
],
31+
description="Example table artifact",
32+
)
33+
34+
35+
if __name__ == "__main__":
36+
"""Deploy the example flow"""
37+
# Example using environment variables
38+
deploy(
39+
DeployParam(
40+
deployments=[
41+
DeployConfig(
42+
flow=example_flow_to_deploy,
43+
name=environ.get("PREFECT_DEPLOYMENT_NAME"),
44+
description=environ.get("PREFECT_DEPLOYMENT_DESCRIPTION"),
45+
version=environ.get("PREFECT_DEPLOYMENT_VERSION"),
46+
tags=tags_str_to_list(environ.get("PREFECT_DEPLOYMENT_TAGS")),
47+
interval=timedelta(
48+
minutes=int(environ.get("PREFECT_DEPLOYMENT_INTERVAL_MIN"))
49+
), # either interval or cron
50+
# cron=environ.get("PREFECT_DEPLOYMENT_CRON"),
51+
)
52+
],
53+
# remove_existing_deployments=True,
54+
)
55+
)
56+
57+
# Clear secret environment variables
58+
del environ["PREFECT_DEPLOYMENT_NAME"]
59+
del environ["PREFECT_DEPLOYMENT_DESCRIPTION"]
60+
del environ["PREFECT_DEPLOYMENT_VERSION"]
61+
del environ["PREFECT_DEPLOYMENT_TAGS"]
62+
del environ["PREFECT_DEPLOYMENT_INTERVAL_MIN"]
63+
# del environ["PREFECT_DEPLOYMENT_CRON"]

examples/prefect/notify_teams.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Example of sending notifications to MS Teams on prefect flow failures"""
2+
3+
from os import environ
4+
5+
from prefect import flow
6+
from pydantic import SecretStr
7+
8+
from osw.utils.workflow import NotifyTeams, NotifyTeamsParam
9+
10+
# Prerequisite: Set environment variable TEAMS_WEBHOOK_URL
11+
# in CLI: export TEAMS_WEBHOOK_URL="https://prod..."
12+
# in python uncomment below, DO NOT PUSH SECRETS TO GIT
13+
14+
# environ["TEAMS_WEBHOOK_URL"] = "https://prod..."
15+
16+
17+
# Decorator must be configured with on_failure argument
18+
@flow(
19+
# Microsoft Teams notification on failure ->
20+
# on_failure use `notify_teams` function without brackets as list element
21+
on_failure=[
22+
NotifyTeams(
23+
NotifyTeamsParam(
24+
teams_webhook_url=SecretStr(environ.get("TEAMS_WEBHOOK_URL")),
25+
# OPTIONAL, will be empty if no deploment is assigned
26+
deployment_name="osw-python-notify-teams-example",
27+
)
28+
).notify_teams
29+
],
30+
log_prints=True,
31+
)
32+
def example_error_flow():
33+
"""Test flow that always fails"""
34+
35+
raise ValueError(
36+
"oops! LOREM IPSUM DOLOR SIT AMET CONSECTETUR ADIPISICING ELIT " * 1
37+
)
38+
39+
40+
if __name__ == "__main__":
41+
example_error_flow()
42+
# Clear secret environment variable
43+
del environ["TEAMS_WEBHOOK_URL"]

setup.cfg

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ dev =
111111

112112
# Add here test requirements (semicolon/line-separated)
113113
testing =
114+
pytest-asyncio
115+
prefect
114116
setuptools
115117
pytest
116118
pytest-cov
@@ -147,6 +149,8 @@ norecursedirs =
147149
build
148150
.tox
149151
testpaths = tests
152+
asyncio_default_fixture_loop_scope = function
153+
150154
# Use pytest markers to select/deselect specific tests
151155
# markers =
152156
# slow: mark tests as slow (deselect with '-m "not slow"')

src/osw/utils/prefect.py

Lines changed: 0 additions & 53 deletions
This file was deleted.

src/osw/utils/workflow.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
"""Prefect utils as support for OpenSemanticWorld."""
2+
3+
import asyncio
4+
import re
5+
import sys
6+
from datetime import timedelta
7+
from importlib.metadata import version
8+
from inspect import signature
9+
from typing import Any, Dict, Iterable, List, Optional, Union
10+
11+
from packaging.specifiers import SpecifierSet
12+
from prefect import Flow, serve
13+
from prefect.blocks.notifications import MicrosoftTeamsWebhook
14+
from prefect.client.schemas.objects import FlowRun
15+
from prefect.settings import PREFECT_API_URL
16+
from prefect.states import State
17+
from pydantic import SecretStr
18+
from pydantic.v1 import BaseModel
19+
20+
21+
# ------------------------------ NOTIFICATIONS ---------------------
22+
class NotifyTeamsParam(BaseModel):
23+
"""Parameter set for notifying Microsoft Teams using class NotifyTeams"""
24+
25+
teams_webhook_url: SecretStr
26+
"""Microsoft Teams webhook URL containing a secret"""
27+
deployment_name: Optional[str] = None
28+
"""Deployment name to be displayed in the notification"""
29+
30+
# allow arbitrary types for compatibility with pydantic v1
31+
class Config:
32+
arbitrary_types_allowed = True
33+
34+
35+
class NotifyTeams(NotifyTeamsParam):
36+
"""Notify Microsoft Teams channel using a webhook"""
37+
38+
def __init__(self, notify_teams_param: NotifyTeamsParam):
39+
# super().__init__(**notify_teams_param.model_dump()) # pydantic v2
40+
super().__init__(**notify_teams_param.dict()) # pydantic v1
41+
42+
def notify_teams(
43+
self,
44+
flow,
45+
flow_run: FlowRun,
46+
state: State,
47+
):
48+
49+
host_url = str(PREFECT_API_URL.value()).replace("/api", "")
50+
51+
_flow_run = f"**🚨Flow Run: [{flow.name} > {flow_run.name}]({host_url}/flow-runs/flow-run/{flow_run.id}) ❗{state.name}❗**\n\n" # noqa
52+
53+
if flow_run.deployment_id is not None:
54+
# Assigned deployment found
55+
deployment_url = (
56+
f"{host_url}/deployments/deployment/{flow_run.deployment_id}"
57+
)
58+
if self.deployment_name == "" or self.deployment_name is None:
59+
_deployment = f"🚀 Deployment: _[{flow_run.deployment_id}]({deployment_url})_\n\n" # noqa
60+
else:
61+
_deployment = f"🚀 Deployment: _[{self.deployment_name}]({deployment_url})_\n\n" # noqa
62+
else:
63+
# No deployment assigned
64+
_deployment = "🚀 Deployment: _Just flow, no deployment_\n\n"
65+
66+
_ts = f"🕑 Timestamp: _{flow_run.state.timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')}_\n\n" # noqa
67+
if flow_run.tags != []:
68+
_tags = f"🏷️ Tags: _#{' #'.join(flow_run.tags)}_\n\n"
69+
else:
70+
_tags = ""
71+
72+
if state.message is None:
73+
_message = "No message provided."
74+
else:
75+
_message = f"📜 Message:\n\n_`{state.message}`_"
76+
77+
MicrosoftTeamsWebhook(
78+
url=str(self.teams_webhook_url.get_secret_value())
79+
).notify(body=(_flow_run + _deployment + _ts + _tags + _message))
80+
81+
82+
# ------------------------------- DEPLOYMENTS -------------------------------
83+
def tags_str_to_list(tags: str) -> List[str]:
84+
"""Remove tags whitespaces, newlines, tabs, empty strings, split comma"""
85+
return list(filter(None, re.sub(r"\s+", "", tags).split(",")))
86+
87+
88+
# def filter_arguments(func, args_dict):
89+
# """Filter arguments for a function based on its signature"""
90+
# sig = signature(func)
91+
# valid_params = sig.parameters
92+
# filtered_args = {k: v for k, v in args_dict.items() if k in valid_params}
93+
# return filtered_args
94+
95+
96+
def match_func_model_args(func, model: BaseModel) -> dict:
97+
"""Match function arguments with model attributes"""
98+
valid_params = set(signature(func).parameters)
99+
# model_attrs = model.model_dump().items() # pydantic v2
100+
model_attrs = model.dict().items() # pydantic v1
101+
matched_args = {k: v for k, v in model_attrs if k in valid_params}
102+
return matched_args
103+
104+
105+
class DeployConfig(BaseModel):
106+
"""Prefect deployment configuration"""
107+
108+
flow: Flow # to be excluded in `flow.to_deployment()` function
109+
# Union instead of | for compatibility with pydantic v1, python < 3.10
110+
name: Union[str, None] = None
111+
description: Union[str, None] = None
112+
interval: Union[
113+
Iterable[Union[int, float, timedelta]], int, float, timedelta, None
114+
] = None
115+
cron: Union[Iterable[str], str, None] = None
116+
version: Union[str, None] = None
117+
tags: Union[List[str], None] = None
118+
rrule: Union[Iterable[str], str, None] = None
119+
paused: Union[bool, None] = None
120+
is_schedule_active: Union[bool, None] = None
121+
parameters: Union[dict, None] = None
122+
enforce_parameter_schema: bool = False
123+
work_pool_name: Union[str, None] = None
124+
work_queue_name: Union[str, None] = None
125+
job_variables: Union[Dict[str, Any], None] = None
126+
deployment_id: Union[str, None] = None
127+
128+
class Config:
129+
arbitrary_types_allowed = True
130+
131+
132+
class DeployParam(BaseModel):
133+
"""Parameter set for deploying flows as deployments"""
134+
135+
deployments: List[DeployConfig]
136+
"""List of deployments to be served"""
137+
# TODO: Implement remove_existing_deployments
138+
remove_existing_deployments: Optional[bool] = False
139+
"""Will remove existing deployments of the specified flows/software"""
140+
# TODO: Add parameter for OSW support in next version
141+
142+
143+
async def _deploy(param: DeployParam):
144+
"""programmatic deployment supported in newer prefect versions
145+
This should become part of osw-python
146+
"""
147+
148+
deployments = []
149+
150+
for deploy_config in param.deployments:
151+
flow: Flow = deploy_config.flow
152+
# Set deployment name if not provided
153+
if deploy_config.name is None or deploy_config.name == "":
154+
deploy_config.name = flow.name + "-deployment"
155+
156+
# Match valid args of flow.to_deployment and deploy_config
157+
kwargs = match_func_model_args(func=flow.to_deployment, model=deploy_config)
158+
# Set config via matching flow.to_deployment arguments
159+
config = await flow.to_deployment(**kwargs)
160+
await config.apply() # returns the deployment_uuid
161+
162+
deployments.append(config)
163+
164+
if version("prefect") in SpecifierSet(">=3.0"):
165+
print(f"prefect version IF: {version('prefect')}")
166+
# return deployments
167+
await serve(*deployments)
168+
else:
169+
print(f"prefect version ELSE: {version('prefect')}")
170+
await serve(*deployments)
171+
172+
173+
def deploy(param: DeployParam):
174+
"""Function to serve configured flows as deployments by python version."""
175+
if sys.version_info >= (3, 11):
176+
print(f"python version IF: {sys.version_info}")
177+
# python >= 3.11
178+
with asyncio.Runner() as runner:
179+
runner.run(_deploy(param))
180+
else:
181+
# python < 3.11
182+
print(f"python version ELSE: {sys.version_info}")
183+
asyncio.run(_deploy(param))
184+
185+
186+
# # ------------------------------- TEST -------------------------------
187+
# from prefect import flow
188+
189+
190+
# @flow
191+
# def osw_python_test_flow_to_deploy():
192+
# """Example flow to be deployed"""
193+
# print(f"Execution of example: {osw_python_test_flow_to_deploy.__name__}!")
194+
195+
196+
# if __name__ == "__main__":
197+
# deploy(
198+
# DeployParam(
199+
# deployments=[
200+
# DeployConfig(
201+
# flow=osw_python_test_flow_to_deploy,
202+
# name="osw-python-deployment-test",
203+
# description="Deployment of osw-python test flow",
204+
# version="0.0.1",
205+
# tags=["osw-python", "example-deploy-flow"],
206+
# )
207+
# ],
208+
# # remove_existing_deployments=True,
209+
# )
210+
# )

0 commit comments

Comments
 (0)