Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions eoapi_notifier/outputs/cloudevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Supports standard CloudEvents environment variables and KNative SinkBinding.
"""

import json
import os
from typing import Any
from uuid import uuid4
Expand All @@ -29,6 +30,7 @@ class CloudEventsConfig(BasePluginConfig):
max_retries: int = 3
retry_backoff: float = 1.0
max_header_length: int = 4096
overrides: dict[str, str] = {}

@field_validator("endpoint")
@classmethod
Expand All @@ -43,6 +45,10 @@ def apply_knative_overrides(self) -> "CloudEventsConfig":
if k_sink := os.getenv("K_SINK"):
self.endpoint = k_sink

if k_ce_overrides := os.getenv("K_CE_OVERRIDES"):
overrides_data = json.loads(k_ce_overrides)
self.overrides = overrides_data.get("extensions", {})

return self

@classmethod
Expand Down Expand Up @@ -88,6 +94,26 @@ def __init__(self, config: CloudEventsConfig) -> None:
super().__init__(config)
self.config: CloudEventsConfig = config
self._client: httpx.AsyncClient | None = None
# Parse K_CE_OVERRIDES once during initialization
self._ce_extensions = self._parse_k_ce_overrides()

def _parse_k_ce_overrides(self) -> dict[str, str]:
"""Parse K_CE_OVERRIDES environment variable once during initialization."""
k_ce_overrides = os.getenv("K_CE_OVERRIDES")
if not k_ce_overrides:
return {}

try:
overrides_data = json.loads(k_ce_overrides)
extensions = overrides_data.get("extensions", {})
if isinstance(extensions, dict):
return {str(k): str(v) for k, v in extensions.items()}
return {}
except json.JSONDecodeError:
self.logger.warning(
"Invalid K_CE_OVERRIDES JSON, ignoring: %s", k_ce_overrides
)
return {}

async def start(self) -> None:
"""Start the HTTP client."""
Expand Down Expand Up @@ -209,6 +235,9 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
source = self.config.source
event_type_base = self.config.event_type

# Use pre-parsed KNative CE overrides
ce_extensions = self._ce_extensions

# Map operation to event type suffix
operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"}
operation = operation_map.get(event.operation.upper(), event.operation.lower())
Expand All @@ -233,6 +262,10 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
if truncated_collection:
attributes["collection"] = truncated_collection

# Apply KNative CE extension overrides
for key, value in ce_extensions.items():
attributes[key] = str(value)

# Event data payload
data = {
"id": event.id,
Expand Down
1 change: 1 addition & 0 deletions helm-chart/eoapi-notifier/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ secrets:
# KNative Support:
# The cloudevents plugin supports K_SINK variables for KNative SinkBinding:
# - K_SINK: Overrides CLOUDEVENTS_ENDPOINT (automatically set by SinkBinding)
# - K_CE_OVERRIDE: A JSON object that specifies overrides to the outbound event.
env: {}
# Examples - Standard environment variables:
# PGSTAC_HOST: postgresql-service
Expand Down
80 changes: 80 additions & 0 deletions tests/test_cloudevents_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,29 @@ def test_connection_info(self) -> None:
config = CloudEventsConfig(endpoint="https://example.com/webhook")
assert "POST https://example.com/webhook" in config.get_connection_info()

@patch.dict(
os.environ,
{"K_CE_OVERRIDES": '{"extensions": {"extra": "test", "num": 42}}'},
)
def test_k_ce_overrides_valid(self) -> None:
"""Test K_CE_OVERRIDES with valid extensions."""
config = CloudEventsConfig()
assert config.overrides == {"extra": "test", "num": 42}

@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'})
def test_k_ce_overrides_no_extensions(self) -> None:
"""Test K_CE_OVERRIDES without extensions field."""
config = CloudEventsConfig()
assert config.overrides == {}

@patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"})
def test_k_ce_overrides_invalid_json(self) -> None:
"""Test K_CE_OVERRIDES with invalid JSON."""
from pydantic import ValidationError

with pytest.raises(ValidationError):
CloudEventsConfig()


class TestCloudEventsAdapter:
"""Test CloudEvents output adapter."""
Expand Down Expand Up @@ -286,3 +309,60 @@ async def test_health_check(self, adapter: CloudEventsAdapter) -> None:
# Running with client
adapter._client = MagicMock()
assert await adapter.health_check() is True

@patch.dict(
os.environ,
{
"K_CE_OVERRIDES": (
'{"extensions": {"extra": "test-value", "priority": "high"}}'
)
},
)
def test_convert_to_cloudevent_with_overrides(
self, config: CloudEventsConfig, sample_event: NotificationEvent
) -> None:
"""Test CloudEvent conversion with K_CE_OVERRIDES."""
# Create adapter after environment variable is set
adapter = CloudEventsAdapter(config)
cloud_event = adapter._convert_to_cloudevent(sample_event)

assert isinstance(cloud_event, CloudEvent)
assert cloud_event["extra"] == "test-value"
assert cloud_event["priority"] == "high"

@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"extensions": {"number": 123}}'})
def test_convert_to_cloudevent_with_number_override(
self, config: CloudEventsConfig, sample_event: NotificationEvent
) -> None:
"""Test CloudEvent conversion with number in K_CE_OVERRIDES."""
# Create adapter after environment variable is set
adapter = CloudEventsAdapter(config)
cloud_event = adapter._convert_to_cloudevent(sample_event)

assert cloud_event["number"] == "123" # Should be converted to string

@patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"})
def test_convert_to_cloudevent_invalid_overrides(
self, config: CloudEventsConfig, sample_event: NotificationEvent
) -> None:
"""Test CloudEvent conversion with invalid K_CE_OVERRIDES JSON."""
# Create adapter after environment variable is set
adapter = CloudEventsAdapter(config)
cloud_event = adapter._convert_to_cloudevent(sample_event)

# Should work normally without overrides
assert isinstance(cloud_event, CloudEvent)
assert cloud_event["source"] == "/eoapi/stac"

@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'})
def test_convert_to_cloudevent_no_extensions(
self, config: CloudEventsConfig, sample_event: NotificationEvent
) -> None:
"""Test CloudEvent conversion with K_CE_OVERRIDES but no extensions field."""
# Create adapter after environment variable is set
adapter = CloudEventsAdapter(config)
cloud_event = adapter._convert_to_cloudevent(sample_event)

# Should work normally without extensions
assert isinstance(cloud_event, CloudEvent)
assert cloud_event["source"] == "/eoapi/stac"