Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions eoapi_notifier/outputs/cloudevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class CloudEventsConfig(BasePluginConfig):
timeout: float = 30.0
max_retries: int = 3
retry_backoff: float = 1.0
max_header_length: int = 4096

@field_validator("endpoint")
@classmethod
Expand All @@ -53,6 +54,7 @@ def get_sample_config(cls) -> dict[str, Any]:
"timeout": 30.0,
"max_retries": 3,
"retry_backoff": 1.0,
"max_header_length": 4096,
}

@classmethod
Expand Down Expand Up @@ -191,6 +193,16 @@ async def send_event(self, event: NotificationEvent) -> bool:
)
return False

def _truncate_header(self, value: str | None) -> str | None:
"""Truncate header value to max_header_length if needed."""
if not value:
return value
if len(value.encode("utf-8")) <= self.config.max_header_length:
return value
# Truncate to byte limit, ensuring valid UTF-8
truncated = value.encode("utf-8")[: self.config.max_header_length]
return truncated.decode("utf-8", errors="ignore")

def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
"""Convert NotificationEvent to CloudEvent."""
# Use config values which now include environment overrides
Expand All @@ -211,11 +223,15 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:

# Add subject if item_id exists
if event.item_id:
attributes["subject"] = event.item_id
truncated_subject = self._truncate_header(event.item_id)
if truncated_subject:
attributes["subject"] = truncated_subject

# Add collection attribute
if event.collection:
attributes["collection"] = event.collection
truncated_collection = self._truncate_header(event.collection)
if truncated_collection:
attributes["collection"] = truncated_collection

# Event data payload
data = {
Expand Down
1 change: 1 addition & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ outputs:
# Optional: HTTP settings
# timeout: 30.0 # CLOUDEVENTS_TIMEOUT
# max_retries: 3 # CLOUDEVENTS_MAX_RETRIES
# max_header_length: 4096 # CLOUDEVENTS_MAX_HEADER_LENGTH

# Example with multiple sources and outputs
# sources:
Expand Down
57 changes: 57 additions & 0 deletions tests/test_cloudevents_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def test_default_configuration(self) -> None:
assert config.event_type == "org.eoapi.stac"
assert config.timeout == 30.0
assert config.max_retries == 3
assert config.max_header_length == 4096

def test_endpoint_validation_error(self) -> None:
"""Test endpoint validation."""
Expand Down Expand Up @@ -198,6 +199,62 @@ def test_convert_to_cloudevent(
assert cloud_event["subject"] == "test-item"
assert cloud_event["collection"] == "test-collection"

def test_truncate_header(self, adapter: CloudEventsAdapter) -> None:
"""Test header value truncation."""
# Short string should not be truncated
short = "short-string"
assert adapter._truncate_header(short) == short

# None should remain None
assert adapter._truncate_header(None) is None

# Long string should be truncated to max_header_length bytes
long_string = "a" * 3000
truncated = adapter._truncate_header(long_string)
assert truncated is not None
assert len(truncated.encode("utf-8")) <= adapter.config.max_header_length
assert len(truncated) <= adapter.config.max_header_length

# UTF-8 multi-byte characters should be handled correctly
unicode_string = "测试" * 1000 # Chinese characters (3 bytes each)
truncated_unicode = adapter._truncate_header(unicode_string)
assert truncated_unicode is not None
assert (
len(truncated_unicode.encode("utf-8")) <= adapter.config.max_header_length
)
# Should not break in the middle of a character
assert truncated_unicode.encode("utf-8").decode("utf-8") == truncated_unicode

def test_convert_to_cloudevent_with_long_headers(
self, config: CloudEventsConfig
) -> None:
"""Test CloudEvent conversion with long header values."""
config.max_header_length = 50 # Small limit for testing
adapter = CloudEventsAdapter(config)

# Create event with long item_id and collection
event = NotificationEvent(
source="/test/source",
type="test.type",
operation="INSERT",
collection="a-very-long-collection-name-that-exceeds-the-limit",
item_id="a-very-long-item-id-that-also-exceeds-the-configured-limit",
)

cloud_event = adapter._convert_to_cloudevent(event)

# Check that long values are truncated in headers
assert "subject" in cloud_event
assert "collection" in cloud_event
assert len(cloud_event["subject"].encode("utf-8")) <= config.max_header_length
assert (
len(cloud_event["collection"].encode("utf-8")) <= config.max_header_length
)

# Original values should still be in data payload
assert cloud_event.data["item_id"] == event.item_id
assert cloud_event.data["collection"] == event.collection

def test_operation_mapping(self, adapter: CloudEventsAdapter) -> None:
"""Test operation to event type mapping."""
test_cases = [
Expand Down