feat(eda): Kafka/Postgres/Redis adapters + W3C tracing + stock health indicators#12
Merged
Merged
Conversation
… indicators Pyfly's EDA module previously shipped only an in-memory ``EventPublisher``. Downstream services were rolling their own queue layers (e.g. flydesk-idp's bespoke ``JobQueue``) because the abstraction was sound but had no production-grade adapter. This change closes the gap and also promotes a few cross-cutting pieces that downstream services kept re-implementing. EDA adapters (``pyfly.eda.adapters``) * ``KafkaEventBus`` -- wraps aiokafka, JSON-encoded ``EventEnvelope`` on a fixed list of topics, ``fnmatch`` event_type filtering inside the consumer. * ``PostgresEventBus`` -- durable outbox in ``pyfly_eda_outbox`` + per-group cursor in ``pyfly_eda_offsets``, ``pg_notify`` for low-latency wake-ups, periodic poll fallback. Listener attaches unconditionally at ``start()``; ``subscribe()`` pokes the consume loop so handlers registered after pyfly's auto-startup still pick up pending events (at-least-once semantics with the cursor advanced after dispatch). * ``RedisStreamsEventBus`` -- consumer-group XREADGROUP, BUSYGROUP- tolerant ``xgroup_create``, JSON envelope under a single ``envelope`` field, ``XACK`` after dispatch. * ``EdaAutoConfiguration`` registered under ``pyfly.auto_configuration.eda`` -- selects the adapter from ``pyfly.eda.provider`` (memory | kafka | redis | postgres | auto). W3C trace context (``pyfly.observability.correlation`` + ``pyfly.web.adapters.starlette.filters.correlation_filter``) * New ``CorrelationFilter`` is added to the default Starlette / FastAPI filter chain. Parses ``X-Correlation-Id``, ``X-Request-Id``, ``X-Tenant-Id``, ``traceparent`` and ``tracestate``; echoes them on the response. * ``contextvars``-backed accessors (``get_correlation_id``, …) live in ``pyfly.observability.correlation`` and propagate automatically through ``await`` chains and ``asyncio.gather`` fan-outs. * ``current_correlation_context()`` returns the active surface as a header dict, ready to be merged into outbound HTTP calls. Stock health indicators * ``pyfly.data.relational.health.SqlAlchemyHealthIndicator`` -- pings the engine with ``SELECT 1`` and surfaces dialect on the details payload. * ``pyfly.eda.health.EventPublisherHealthIndicator`` -- broker-agnostic probe over the ``EventPublisher`` port (auto-registered by ``EdaHealthAutoConfiguration`` when the actuator subsystem is on). * ``create_app()`` (both Starlette and FastAPI adapters) now exposes a ``app.state.pyfly_install_health_indicators`` callable. Downstream lifespans should invoke it after ``ApplicationContext.start()`` so ``@bean`` ``HealthIndicator`` implementations are picked up by the aggregator (the previous eager scan ran before beans had been instantiated). Tests * 28 new tests across EDA adapters, ``EdaAutoConfiguration`` and the W3C correlation filter (Starlette ``TestClient`` round-trips). * Updated ``tests/config/test_auto.py`` to reflect the two new auto-configurations (``EdaAutoConfiguration`` + ``EdaHealthAutoConfiguration``). * Full pyfly suite stays green (excluding the pre-existing mongomock failures unrelated to this change). End-to-end verified against flydesk-idp on Docker Compose with ``FLYDESK_IDP_EDA_ADAPTER=postgres`` + Anthropic ``claude-sonnet-4-6``: publish-to-dispatch latency is ~80 ms and the multi-stage pipeline (extract + judge + visual + rules) succeeds end-to-end with the cursor advancing in ``pyfly_eda_offsets``.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds production-grade EDA adapters (Kafka, Postgres outbox+LISTEN/NOTIFY, Redis Streams) with auto-configuration, promotes W3C trace-context handling (traceparent/tracestate/X-Correlation-Id/X-Request-Id/X-Tenant-Id) into the default web filter chain, and ships stock SQLAlchemy + EventPublisher health indicators with a lifespan rescan hook. 391 unit tests green; verified end-to-end against a downstream service using the Postgres adapter.