Skip to content

Commit

Permalink
Adding an account-level event subscriber (#12808)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisguidry committed Apr 23, 2024
1 parent e0ab0f0 commit f68d6d4
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 26 deletions.
28 changes: 28 additions & 0 deletions src/prefect/events/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,31 @@ def __init__(
)

self._api_key = api_key


class PrefectCloudAccountEventSubscriber(PrefectCloudEventSubscriber):
def __init__(
self,
api_url: Optional[str] = None,
api_key: Optional[str] = None,
filter: Optional["EventFilter"] = None,
reconnection_attempts: int = 10,
):
"""
Args:
api_url: The base URL for a Prefect Cloud workspace
api_key: The API of an actor with the manage_events scope
reconnection_attempts: When the client is disconnected, how many times
the client should attempt to reconnect
"""
api_url, api_key = _get_api_url_and_key(api_url, api_key)

account_api_url, _, _ = api_url.partition("/workspaces/")

super().__init__(
api_url=account_api_url,
filter=filter,
reconnection_attempts=reconnection_attempts,
)

self._api_key = api_key
84 changes: 58 additions & 26 deletions tests/events/client/test_events_subscriber.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from typing import Type
from typing import Optional, Type

import pytest
from websockets.exceptions import ConnectionClosedError

from prefect.events import Event, get_events_subscriber
from prefect.events.clients import PrefectCloudEventSubscriber, PrefectEventSubscriber
from prefect.events.clients import (
PrefectCloudAccountEventSubscriber,
PrefectCloudEventSubscriber,
PrefectEventSubscriber,
)
from prefect.events.filters import EventFilter, EventNameFilter
from prefect.settings import (
PREFECT_API_KEY,
Expand Down Expand Up @@ -69,11 +73,24 @@ async def test_constructs_cloud_client(cloud_settings):
def pytest_generate_tests(metafunc: pytest.Metafunc):
fixtures = set(metafunc.fixturenames)

cloud_subscribers = [
(
PrefectCloudEventSubscriber,
"/accounts/A/workspaces/W/events/out",
"my-token",
),
(PrefectCloudAccountEventSubscriber, "/accounts/A/events/out", "my-token"),
]
subscribers = [
# The base subscriber for OSS will just use the API URL, which is set to a
# Cloud URL here, but it would usually be just /events/out
(PrefectEventSubscriber, "/accounts/A/workspaces/W/events/out", None),
] + cloud_subscribers

if "Subscriber" in fixtures:
metafunc.parametrize(
"Subscriber",
[PrefectEventSubscriber, PrefectCloudEventSubscriber],
)
metafunc.parametrize("Subscriber,socket_path,token", subscribers)
elif "CloudSubscriber" in fixtures:
metafunc.parametrize("CloudSubscriber,socket_path,token", cloud_subscribers)


@pytest.fixture(autouse=True)
Expand All @@ -89,48 +106,52 @@ def api_setup(events_cloud_api_url: str):

async def test_subscriber_can_connect_with_defaults(
Subscriber: Type[PrefectEventSubscriber],
events_cloud_api_url: str,
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]

async with Subscriber() as subscriber:
async for event in subscriber:
recorder.events.append(event)

assert recorder.connections == 1
assert recorder.path == "/accounts/A/workspaces/W/events/out"
assert recorder.path == socket_path
assert recorder.events == [example_event_1, example_event_2]
assert recorder.token == puppeteer.token
assert subscriber._filter
assert recorder.filter == subscriber._filter


async def test_cloud_subscriber_complains_without_api_url_and_key(
events_cloud_api_url: str,
CloudSubscriber: Type[PrefectCloudEventSubscriber],
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
with temporary_settings(updates={PREFECT_API_KEY: "", PREFECT_API_URL: ""}):
with pytest.raises(ValueError, match="must be provided or set"):
PrefectCloudEventSubscriber()
CloudSubscriber()


async def test_subscriber_can_connect_and_receive_one_event(
Subscriber: Type[PrefectEventSubscriber],
events_cloud_api_url: str,
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]

filter = EventFilter(event=EventNameFilter(name=["example.event"]))
Expand All @@ -143,21 +164,22 @@ async def test_subscriber_can_connect_and_receive_one_event(
recorder.events.append(event)

assert recorder.connections == 1
assert recorder.path == "/accounts/A/workspaces/W/events/out"
assert recorder.path == socket_path
assert recorder.events == [example_event_1, example_event_2]
assert recorder.token == puppeteer.token
assert recorder.filter == filter


async def test_subscriber_specifying_negative_reconnects_gets_error(
Subscriber: Type[PrefectEventSubscriber],
events_cloud_api_url: str,
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]

filter = EventFilter(event=EventNameFilter(name=["example.event"]))
Expand All @@ -172,6 +194,9 @@ async def test_subscriber_specifying_negative_reconnects_gets_error(


async def test_subscriber_raises_on_invalid_auth_with_soft_denial(
CloudSubscriber: Type[PrefectCloudEventSubscriber],
socket_path: str,
token: Optional[str],
events_cloud_api_url: str,
example_event_1: Event,
example_event_2: Event,
Expand All @@ -184,7 +209,7 @@ async def test_subscriber_raises_on_invalid_auth_with_soft_denial(
filter = EventFilter(event=EventNameFilter(name=["example.event"]))

with pytest.raises(Exception, match="Unable to authenticate"):
subscriber = PrefectCloudEventSubscriber(
subscriber = CloudSubscriber(
events_cloud_api_url,
"bogus",
filter=filter,
Expand All @@ -193,12 +218,15 @@ async def test_subscriber_raises_on_invalid_auth_with_soft_denial(
await subscriber.__aenter__()

assert recorder.connections == 1
assert recorder.path == "/accounts/A/workspaces/W/events/out"
assert recorder.path == socket_path
assert recorder.token == "bogus"
assert recorder.events == []


async def test_cloud_subscriber_raises_on_invalid_auth_with_hard_denial(
CloudSubscriber: Type[PrefectCloudEventSubscriber],
socket_path: str,
token: Optional[str],
events_cloud_api_url: str,
example_event_1: Event,
example_event_2: Event,
Expand All @@ -212,7 +240,7 @@ async def test_cloud_subscriber_raises_on_invalid_auth_with_hard_denial(
filter = EventFilter(event=EventNameFilter(name=["example.event"]))

with pytest.raises(Exception, match="Unable to authenticate"):
subscriber = PrefectCloudEventSubscriber(
subscriber = CloudSubscriber(
events_cloud_api_url,
"bogus",
filter=filter,
Expand All @@ -221,19 +249,21 @@ async def test_cloud_subscriber_raises_on_invalid_auth_with_hard_denial(
await subscriber.__aenter__()

assert recorder.connections == 1
assert recorder.path == "/accounts/A/workspaces/W/events/out"
assert recorder.path == socket_path
assert recorder.token == "bogus"
assert recorder.events == []


async def test_subscriber_reconnects_on_hard_disconnects(
Subscriber: Type[PrefectEventSubscriber],
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]
puppeteer.hard_disconnect_after = example_event_1.id

Expand All @@ -252,12 +282,14 @@ async def test_subscriber_reconnects_on_hard_disconnects(

async def test_subscriber_gives_up_after_so_many_attempts(
Subscriber: Type[PrefectEventSubscriber],
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_2]
puppeteer.hard_disconnect_after = example_event_1.id

Expand All @@ -277,19 +309,19 @@ async def test_subscriber_gives_up_after_so_many_attempts(

async def test_subscriber_skips_duplicate_events(
Subscriber: Type[PrefectEventSubscriber],
socket_path: str,
token: Optional[str],
example_event_1: Event,
example_event_2: Event,
recorder: Recorder,
puppeteer: Puppeteer,
):
puppeteer.token = "my-token" if Subscriber == PrefectCloudEventSubscriber else None
puppeteer.token = token
puppeteer.outgoing_events = [example_event_1, example_event_1, example_event_2]

filter = EventFilter(event=EventNameFilter(name=["example.event"]))

async with Subscriber(
filter=filter,
) as subscriber:
async with Subscriber(filter=filter) as subscriber:
async for event in subscriber:
recorder.events.append(event)

Expand Down

0 comments on commit f68d6d4

Please sign in to comment.