fix: align EventEmitter with server db.Event struct#63
Conversation
- Rename fields to match server JSON tags: type→eventType, data→payload, correlationId→traceId - Add severity field (default 'info') and always-present traceId - Move taskId into payload.task_id - Send events individually (server expects flat db.Event, not batch wrapper) - Re-queue only failed events instead of entire batch on partial failure
|
✅ Documentation validation passed!
|
There was a problem hiding this comment.
Pull request overview
This PR updates EventEmitter’s outbound event JSON schema to match the registry server’s db.Event struct, aiming to fix event ingestion failures caused by mismatched field names and request shape.
Changes:
- Renames/reshapes emitted event fields (e.g.,
type→eventType,data→payload,correlationId→traceId) and ensurestraceId/severityare always present. - Moves
taskIdintopayload.task_idrather than keeping it as a top-level field. - Changes
flush()to POST events individually and re-queue only the events that failed to send.
| "payload": data or {}, | ||
| } | ||
|
|
||
| if task_id: | ||
| event["taskId"] = task_id | ||
| if correlation_id: | ||
| event["correlationId"] = correlation_id | ||
| event["payload"]["task_id"] = task_id |
There was a problem hiding this comment.
Good catch — fixed in 2d905c3. Now uses dict(data) if data else {} to shallow-copy the caller's dict before mutating.
| *, | ||
| task_id: Optional[str] = None, | ||
| correlation_id: Optional[str] = None, | ||
| flush: bool = False, | ||
| ) -> bool: | ||
| """ | ||
| Emit an event to the registry. | ||
|
|
||
| Args: | ||
| event_type: Type of event (e.g., "task_started", "tool_call") | ||
| data: Event-specific data | ||
| task_id: Optional task ID for correlation | ||
| correlation_id: Optional correlation ID for tracing | ||
| flush: Whether to flush immediately (default: batch) | ||
|
|
||
| Returns: | ||
| True if event was queued/sent successfully | ||
|
|
||
| Example: | ||
| emitter.emit("task_started", { | ||
| "task_id": "abc123", | ||
| "input": "Research AI trends", | ||
| }) | ||
| """ | ||
| if not self.enabled: | ||
| return False | ||
|
|
||
| # Build a flat event matching the registry's db.Event struct. | ||
| # See capiscio-server internal/db/models.go for JSON field tags. | ||
| event = { | ||
| "id": str(uuid.uuid4()), | ||
| "type": event_type, | ||
| "agentId": self.agent_id, | ||
| "traceId": correlation_id or str(uuid.uuid4()), | ||
| "eventType": event_type, |
There was a problem hiding this comment.
Agree on the rename, but it's a breaking change to the public API. Will do as a follow-up with a deprecation path (trace_id as primary, correlation_id accepted with a deprecation warning).
| # The registry decodes the POST body as a flat db.Event struct. | ||
| # Send each event individually. | ||
| failed: list = [] | ||
| for event in events_to_send: | ||
| try: | ||
| response = self._client.post( | ||
| f"{self.server_url}/v1/events", | ||
| headers=headers, | ||
| json=event, | ||
| ) |
There was a problem hiding this comment.
Already fixed in acc5375 — tests updated to assert individual POSTs and new field names.
|
✅ SDK server contract tests passed (test_server_integration.py). Cross-product scenarios are validated in capiscio-e2e-tests. |
|
✅ Documentation validation passed!
|
|
✅ All checks passed! Ready for review. |
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
|
✅ SDK server contract tests passed (test_server_integration.py). Cross-product scenarios are validated in capiscio-e2e-tests. |
|
✅ Documentation validation passed!
|
|
✅ All checks passed! Ready for review. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
tests/unit/test_events.py:220
- This PR introduces partial-failure behavior (only failed events are re-queued), but the unit tests only cover the single-event failure case. Add a test with multiple events where one request succeeds and one fails, asserting that only the failed event remains in
_batch(and thatpostwas called for both). This will prevent regressions in the new partial re-queue logic.
def test_flush_requeues_on_failure(self):
"""Test that flush requeues events on server error."""
emitter = EventEmitter(
server_url="https://registry.capisc.io",
api_key="sk_test",
agent_id="test-agent",
)
events = [{"id": "1", "eventType": "event1", "payload": {}}]
emitter._batch = events.copy()
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
with patch.object(emitter._client, "post", return_value=mock_response):
result = emitter.flush()
assert result is False
assert len(emitter._batch) == 1 # Events requeued
| # The registry decodes the POST body as a flat db.Event struct. | ||
| # Send each event individually. | ||
| failed: list = [] | ||
| for event in events_to_send: | ||
| try: | ||
| response = self._client.post( | ||
| f"{self.server_url}/v1/events", | ||
| headers=headers, | ||
| json=event, | ||
| ) |
There was a problem hiding this comment.
Valid concern. In practice, the registry responds in <100ms so 10 sequential POSTs complete in ~1s. The 10s timeout is inherited from the httpx client default and only applies on failure. That said, adding a per-event timeout (e.g., 2s) and/or concurrent sends via a thread pool would be good hardening. Will track for a follow-up — not blocking for this fix since the previous batch approach had the same 10s single-request timeout and would also block on a slow/unreachable server.
|
✅ SDK server contract tests passed (test_server_integration.py). Cross-product scenarios are validated in capiscio-e2e-tests. |
Summary
Aligns the SDK's
EventEmitterpayload with the server'sdb.Eventstruct (internal/db/models.go), fixing event ingestion failures.Changes
type→eventType,data→payload,correlationId→traceIdseverity(default"info"),traceIdalways populated (falls back to new UUID)taskIdintopayload.task_idinstead of top-leveldb.Event, not a batch wrapper — now sends one POST per eventContext
Discovered during demo-two testing in a2a-demos. Events were being sent with field names that didn't match the server's expected struct, causing silent ingestion failures.