diff --git a/capiscio_sdk/events.py b/capiscio_sdk/events.py index 5c29616..9350eac 100644 --- a/capiscio_sdk/events.py +++ b/capiscio_sdk/events.py @@ -136,18 +136,20 @@ def emit( if not self.enabled: return False + # Build a flat event matching the registry's db.Event struct. + # See capiscio-server internal/db/models.go for JSON field tags. event = { "id": str(uuid.uuid4()), - "type": event_type, "agentId": self.agent_id, + "traceId": correlation_id or str(uuid.uuid4()), + "eventType": event_type, + "severity": "info", "timestamp": datetime.now(timezone.utc).isoformat(), - "data": data or {}, + "payload": dict(data) if data else {}, } if task_id: - event["taskId"] = task_id - if correlation_id: - event["correlationId"] = correlation_id + event["payload"]["task_id"] = task_id with self._batch_lock: self._batch.append(event) @@ -189,22 +191,30 @@ def flush(self) -> bool: "X-Capiscio-Registry-Key": self.api_key, } - # Send batch - response = self._client.post( - f"{self.server_url}/v1/events", - headers=headers, - json={"events": events_to_send}, - ) + # The registry decodes the POST body as a flat db.Event struct. + # Send each event individually. + failed: list = [] + for event in events_to_send: + try: + response = self._client.post( + f"{self.server_url}/v1/events", + headers=headers, + json=event, + ) + if response.status_code not in (200, 201, 202): + logger.warning(f"Failed to send event: {response.status_code} {response.text[:200]}") + failed.append(event) + except Exception as e: + logger.error(f"Error sending event: {e}") + failed.append(event) - if response.status_code in (200, 201, 202): - logger.debug(f"Sent {len(events_to_send)} events") - return True - else: - logger.warning(f"Failed to send events: {response.status_code} {response.text}") - # Re-queue events on failure + if failed: with self._batch_lock: - self._batch.extend(events_to_send) + self._batch.extend(failed) return False + + logger.debug(f"Sent {len(events_to_send)} events") + return True except Exception as e: logger.error(f"Error sending events: {e}") diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py index 329fecf..ff969a8 100644 --- a/tests/unit/test_events.py +++ b/tests/unit/test_events.py @@ -95,10 +95,12 @@ def test_emit_adds_to_batch(self): assert len(emitter._batch) == 1 event = emitter._batch[0] - assert event["type"] == "test_event" + assert event["eventType"] == "test_event" assert event["agentId"] == "test-agent" - assert event["data"] == {"key": "value"} + assert event["payload"] == {"key": "value"} + assert event["severity"] == "info" assert "id" in event + assert "traceId" in event assert "timestamp" in event def test_emit_with_task_id(self): @@ -111,7 +113,7 @@ def test_emit_with_task_id(self): emitter.emit("test_event", {}, task_id="task-123") - assert emitter._batch[0]["taskId"] == "task-123" + assert emitter._batch[0]["payload"]["task_id"] == "task-123" def test_emit_with_correlation_id(self): """Test emit with correlation_id.""" @@ -123,7 +125,7 @@ def test_emit_with_correlation_id(self): emitter.emit("test_event", {}, correlation_id="corr-456") - assert emitter._batch[0]["correlationId"] == "corr-456" + assert emitter._batch[0]["traceId"] == "corr-456" def test_emit_flushes_when_batch_full(self): """Test that emit flushes when batch is full.""" @@ -175,8 +177,8 @@ def test_flush_sends_events(self): # Add events directly to batch emitter._batch = [ - {"id": "1", "type": "event1", "data": {}}, - {"id": "2", "type": "event2", "data": {}}, + {"id": "1", "eventType": "event1", "payload": {}}, + {"id": "2", "eventType": "event2", "payload": {}}, ] mock_response = MagicMock() @@ -187,14 +189,13 @@ def test_flush_sends_events(self): assert result is True assert len(emitter._batch) == 0 - mock_post.assert_called_once() + assert mock_post.call_count == 2 # Individual POST per event - call_kwargs = mock_post.call_args[1] - assert call_kwargs["json"]["events"] == [ - {"id": "1", "type": "event1", "data": {}}, - {"id": "2", "type": "event2", "data": {}}, - ] - assert call_kwargs["headers"]["X-Capiscio-Registry-Key"] == "sk_test" + # Verify each event sent individually + calls = mock_post.call_args_list + assert calls[0][1]["json"] == {"id": "1", "eventType": "event1", "payload": {}} + assert calls[1][1]["json"] == {"id": "2", "eventType": "event2", "payload": {}} + assert calls[0][1]["headers"]["X-Capiscio-Registry-Key"] == "sk_test" def test_flush_requeues_on_failure(self): """Test that flush requeues events on server error.""" @@ -204,7 +205,7 @@ def test_flush_requeues_on_failure(self): agent_id="test-agent", ) - events = [{"id": "1", "type": "event1", "data": {}}] + events = [{"id": "1", "eventType": "event1", "payload": {}}] emitter._batch = events.copy() mock_response = MagicMock() @@ -225,7 +226,7 @@ def test_flush_requeues_on_exception(self): agent_id="test-agent", ) - emitter._batch = [{"id": "1", "type": "event1", "data": {}}] + emitter._batch = [{"id": "1", "eventType": "event1", "payload": {}}] with patch.object(emitter._client, "post", side_effect=Exception("Network error")): result = emitter.flush() @@ -243,7 +244,7 @@ def test_flush_disabled(self): ) # Manually add event (bypassing emit's disabled check) - emitter._batch = [{"id": "1", "type": "event1", "data": {}}] + emitter._batch = [{"id": "1", "eventType": "event1", "payload": {}}] result = emitter.flush()