-
Notifications
You must be signed in to change notification settings - Fork 0
fix: align EventEmitter with server db.Event struct #63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -136,18 +136,20 @@ def emit( | |
| if not self.enabled: | ||
| return False | ||
|
|
||
| # Build a flat event matching the registry's db.Event struct. | ||
| # See capiscio-server internal/db/models.go for JSON field tags. | ||
| event = { | ||
| "id": str(uuid.uuid4()), | ||
| "type": event_type, | ||
| "agentId": self.agent_id, | ||
| "traceId": correlation_id or str(uuid.uuid4()), | ||
| "eventType": event_type, | ||
| "severity": "info", | ||
| "timestamp": datetime.now(timezone.utc).isoformat(), | ||
| "data": data or {}, | ||
| "payload": dict(data) if data else {}, | ||
| } | ||
|
|
||
| if task_id: | ||
| event["taskId"] = task_id | ||
| if correlation_id: | ||
| event["correlationId"] = correlation_id | ||
| event["payload"]["task_id"] = task_id | ||
|
|
||
| with self._batch_lock: | ||
| self._batch.append(event) | ||
|
|
@@ -189,22 +191,30 @@ def flush(self) -> bool: | |
| "X-Capiscio-Registry-Key": self.api_key, | ||
| } | ||
|
|
||
| # Send batch | ||
| response = self._client.post( | ||
| f"{self.server_url}/v1/events", | ||
| headers=headers, | ||
| json={"events": events_to_send}, | ||
| ) | ||
| # The registry decodes the POST body as a flat db.Event struct. | ||
| # Send each event individually. | ||
| failed: list = [] | ||
| for event in events_to_send: | ||
| try: | ||
| response = self._client.post( | ||
| f"{self.server_url}/v1/events", | ||
| headers=headers, | ||
| json=event, | ||
| ) | ||
|
Comment on lines
+194
to
+203
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Valid concern. In practice, the registry responds in <100ms so 10 sequential POSTs complete in ~1s. The 10s timeout is inherited from the httpx client default and only applies on failure. That said, adding a per-event timeout (e.g., 2s) and/or concurrent sends via a thread pool would be good hardening. Will track for a follow-up — not blocking for this fix since the previous batch approach had the same 10s single-request timeout and would also block on a slow/unreachable server. |
||
| if response.status_code not in (200, 201, 202): | ||
| logger.warning(f"Failed to send event: {response.status_code} {response.text[:200]}") | ||
| failed.append(event) | ||
| except Exception as e: | ||
| logger.error(f"Error sending event: {e}") | ||
| failed.append(event) | ||
|
|
||
| if response.status_code in (200, 201, 202): | ||
| logger.debug(f"Sent {len(events_to_send)} events") | ||
| return True | ||
| else: | ||
| logger.warning(f"Failed to send events: {response.status_code} {response.text}") | ||
| # Re-queue events on failure | ||
| if failed: | ||
| with self._batch_lock: | ||
| self._batch.extend(events_to_send) | ||
| self._batch.extend(failed) | ||
| return False | ||
|
|
||
| logger.debug(f"Sent {len(events_to_send)} events") | ||
| return True | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error sending events: {e}") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already fixed in acc5375 — tests updated to assert individual POSTs and new field names.