Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding an account-level event subscriber #12808

Merged
merged 9 commits into from
Apr 23, 2024
28 changes: 28 additions & 0 deletions src/prefect/events/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,31 @@ def __init__(
)

self._api_key = api_key


class PrefectCloudAccountEventSubscriber(PrefectCloudEventSubscriber):
def __init__(
self,
api_url: Optional[str] = None,
api_key: Optional[str] = None,
filter: Optional["EventFilter"] = None,
reconnection_attempts: int = 10,
):
"""
Args:
api_url: The base URL for a Prefect Cloud workspace
api_key: The API of an actor with the manage_events scope
reconnection_attempts: When the client is disconnected, how many times
the client should attempt to reconnect
"""
api_url, api_key = _get_api_url_and_key(api_url, api_key)

account_api_url, _, _ = api_url.partition("/workspaces/")
chrisguidry marked this conversation as resolved.
Show resolved Hide resolved

super().__init__(
api_url=account_api_url,
filter=filter,
reconnection_attempts=reconnection_attempts,
)

self._api_key = api_key
84 changes: 58 additions & 26 deletions tests/events/client/test_events_subscriber.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from typing import Type
from typing import Optional, Type

import pytest
from websockets.exceptions import ConnectionClosedError

from prefect.events import Event, get_events_subscriber
from prefect.events.clients import PrefectCloudEventSubscriber, PrefectEventSubscriber
from prefect.events.clients import (
PrefectCloudAccountEventSubscriber,
PrefectCloudEventSubscriber,
PrefectEventSubscriber,
)
from prefect.events.filters import EventFilter, EventNameFilter
from prefect.settings import (
PREFECT_API_KEY,
Expand Down Expand Up @@ -69,11 +73,24 @@ async def test_constructs_cloud_client(cloud_settings):
def pytest_generate_tests(metafunc: pytest.Metafunc):
fixtures = set(metafunc.fixturenames)

cloud_subscribers = [
(
PrefectCloudEventSubscriber,
"/accounts/A/workspaces/W/events/out",
"my-token",
),
(PrefectCloudAccountEventSubscriber, "/accounts/A/events/out", "my-token"),
]
subscribers = [
# The base subscriber for OSS will just use the API URL, which is set to a
# Cloud URL here, but it would usually be just /events/out
(PrefectEventSubscriber, "/accounts/A/workspaces/W/events/out", None),
] + cloud_subscribers

if "Subscriber" in fixtures:
metafunc.parametrize(
"Subscriber",
[PrefectEventSubscriber, PrefectCloudEventSubscriber],
)
metafunc.parametrize("Subscriber,socket_path,token", subscribers)
elif "CloudSubscriber" in fixtures:
metafunc.parametrize("CloudSubscriber,socket_path,token", cloud_subscribers)


@pytest.fixture(autouse=True)
Expand All @@ -89,48 +106,52 @@ def api_setup(events_cloud_api_url: str):

async def test_subscriber_can_connect_with_defaults(
Subscriber: Type[PrefectEventSubscriber],
events_cloud_api_url: str,
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]

async with Subscriber() as subscriber:
async for event in subscriber:
recorder.events.append(event)

assert recorder.connections == 1
assert recorder.path == "/accounts/A/workspaces/W/events/out"
assert recorder.path == socket_path
assert recorder.events == [example_event_1, example_event_2]
assert recorder.token == puppeteer.token
assert subscriber._filter
assert recorder.filter == subscriber._filter


async def test_cloud_subscriber_complains_without_api_url_and_key(
events_cloud_api_url: str,
CloudSubscriber: Type[PrefectCloudEventSubscriber],
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
with temporary_settings(updates={PREFECT_API_KEY: "", PREFECT_API_URL: ""}):
with pytest.raises(ValueError, match="must be provided or set"):
PrefectCloudEventSubscriber()
CloudSubscriber()


async def test_subscriber_can_connect_and_receive_one_event(
Subscriber: Type[PrefectEventSubscriber],
events_cloud_api_url: str,
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]

filter = EventFilter(event=EventNameFilter(name=["example.event"]))
Expand All @@ -143,21 +164,22 @@ async def test_subscriber_can_connect_and_receive_one_event(
recorder.events.append(event)

assert recorder.connections == 1
assert recorder.path == "/accounts/A/workspaces/W/events/out"
assert recorder.path == socket_path
assert recorder.events == [example_event_1, example_event_2]
assert recorder.token == puppeteer.token
assert recorder.filter == filter


async def test_subscriber_specifying_negative_reconnects_gets_error(
Subscriber: Type[PrefectEventSubscriber],
events_cloud_api_url: str,
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]

filter = EventFilter(event=EventNameFilter(name=["example.event"]))
Expand All @@ -172,6 +194,9 @@ async def test_subscriber_specifying_negative_reconnects_gets_error(


async def test_subscriber_raises_on_invalid_auth_with_soft_denial(
CloudSubscriber: Type[PrefectCloudEventSubscriber],
socket_path: str,
token: Optional[str],
events_cloud_api_url: str,
example_event_1: Event,
example_event_2: Event,
Expand All @@ -184,7 +209,7 @@ async def test_subscriber_raises_on_invalid_auth_with_soft_denial(
filter = EventFilter(event=EventNameFilter(name=["example.event"]))

with pytest.raises(Exception, match="Unable to authenticate"):
subscriber = PrefectCloudEventSubscriber(
subscriber = CloudSubscriber(
events_cloud_api_url,
"bogus",
filter=filter,
Expand All @@ -193,12 +218,15 @@ async def test_subscriber_raises_on_invalid_auth_with_soft_denial(
await subscriber.__aenter__()

assert recorder.connections == 1
assert recorder.path == "/accounts/A/workspaces/W/events/out"
assert recorder.path == socket_path
assert recorder.token == "bogus"
assert recorder.events == []


async def test_cloud_subscriber_raises_on_invalid_auth_with_hard_denial(
CloudSubscriber: Type[PrefectCloudEventSubscriber],
socket_path: str,
token: Optional[str],
events_cloud_api_url: str,
example_event_1: Event,
example_event_2: Event,
Expand All @@ -212,7 +240,7 @@ async def test_cloud_subscriber_raises_on_invalid_auth_with_hard_denial(
filter = EventFilter(event=EventNameFilter(name=["example.event"]))

with pytest.raises(Exception, match="Unable to authenticate"):
subscriber = PrefectCloudEventSubscriber(
subscriber = CloudSubscriber(
events_cloud_api_url,
"bogus",
filter=filter,
Expand All @@ -221,19 +249,21 @@ async def test_cloud_subscriber_raises_on_invalid_auth_with_hard_denial(
await subscriber.__aenter__()

assert recorder.connections == 1
assert recorder.path == "/accounts/A/workspaces/W/events/out"
assert recorder.path == socket_path
assert recorder.token == "bogus"
assert recorder.events == []


async def test_subscriber_reconnects_on_hard_disconnects(
Subscriber: Type[PrefectEventSubscriber],
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]
puppeteer.hard_disconnect_after = example_event_1.id

Expand All @@ -252,12 +282,14 @@ async def test_subscriber_reconnects_on_hard_disconnects(

async def test_subscriber_gives_up_after_so_many_attempts(
Subscriber: Type[PrefectEventSubscriber],
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]
puppeteer.hard_disconnect_after = example_event_1.id

Expand All @@ -277,19 +309,19 @@ async def test_subscriber_gives_up_after_so_many_attempts(

async def test_subscriber_skips_duplicate_events(
Subscriber: Type[PrefectEventSubscriber],
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_1, example_event_2]

filter = EventFilter(event=EventNameFilter(name=["example.event"]))

async with Subscriber(
filter=filter,
) as subscriber:
async with Subscriber(filter=filter) as subscriber:
async for event in subscriber:
recorder.events.append(event)

Expand Down
Loading