Skip to content

release(eda): merge Postgres EDA adapter from develop into main#15

Merged
ancongui merged 15 commits into
mainfrom
develop
May 18, 2026
Merged

release(eda): merge Postgres EDA adapter from develop into main#15
ancongui merged 15 commits into
mainfrom
develop

Conversation

@ancongui
Copy link
Copy Markdown
Contributor

Summary

Promotes the PostgreSQL EDA adapter (and the related resilience cache fix) from `develop` into `main`. PR #14 was reviewed, CI is green, and develop now contains:

  • `POSTGRES` as a first-class EDA transport (publisher + consumer) backed by a transactional outbox table and `LISTEN`/`NOTIFY`.
  • Auto-schema provisioning (table, indexes, NOTIFY function/trigger), polling fallback, retry/dead-letter handling, and dynamic channel subscription via `@EventListener`.
  • Spring Boot auto-configurations gated by `firefly.eda.publishers.postgres.default.` / `firefly.eda.consumer.postgres.default.` properties (no leakage into `spring.r2dbc.*`).
  • Bug fix: `EventPublisherFactory` now normalises the connection ID up front so the publisher cache and resilience4j entry name stay in lockstep, eliminating the flaky `ResilienceIntegrationTest` failures that surfaced once the parent POM became resolvable in CI.
  • Docs (README, `docs/CONFIGURATION.md`, `docs/PUBLISHER_TYPES.md`) updated with full property reference and behavior notes.

Test plan

casc84ab and others added 15 commits April 27, 2026 10:51
…ules

Marks the EDA-provided ObjectMapper as @primary so that consumer applications
with multiple ObjectMapper beans (e.g. cache, orchestration persistence) can
resolve unqualified @Autowired ObjectMapper without a NoUniqueBeanDefinition
exception.

Switches from manual JavaTimeModule registration to findAndRegisterModules()
so Jdk8Module and other classpath-discoverable Jackson modules are picked up
automatically. The change is additive-safe: existing payloads continue to
(de)serialize identically, only Optional<>, jsr310 and parameter-names
support is added.
…outbox

Adds POSTGRES as a first-class EDA transport alongside KAFKA and RABBITMQ.
Publishing performs a single INSERT into a configurable outbox table; an
AFTER INSERT trigger calls pg_notify(channel, id) so consumers LISTENing on
the same channel pick up the event id and fetch the payload. The outbox
table also tracks attempts, processed/failed timestamps, and a status field
that transitions to DEAD_LETTER after configurable retries.

The consumer holds one long-lived R2DBC connection for LISTEN and uses a
pool for outbox queries; a periodic poll catches rows missed by NOTIFY
(consumer offline at insert time, payload truncation, etc.). Channel names
are derived deterministically from destinations to fit PostgreSQL's 63-byte
identifier limit. POSTGRES is added to the AUTO publisher selection chain
between RABBITMQ and APPLICATION_EVENT, and Spring Boot's default
R2dbcAutoConfiguration is excluded in tests since the EDA module manages
its own connection factories under firefly.eda.* properties.

Includes Testcontainers integration tests covering publish, consumer
dispatch over NOTIFY, ack semantics, deterministic channel mapping, and
health reporting. README, CONFIGURATION.md, and PUBLISHER_TYPES.md updated
with the new transport's properties and behavior.
PostgresIntegrationTest holds a live LISTEN connection and a polling
subscription for the duration of the test. When the Spring TestContext
cache shares its context across other @SpringBootTest classes, that
lingering background work was pushing ResilienceIntegrationTest's context
out of the cache on CI (visible as a 56ms run of 12 tests where the
resilience4j registry was empty), causing five flaky failures.

Marking the test class with @DirtiesContext(AFTER_CLASS) evicts its
context as soon as it finishes so it can no longer crowd or interfere
with the rest of the integration suite.
… cache

EventPublisherFactory.publisherCache uses computeIfAbsent, so the first
caller to populate a slot wins. When Spring's TestContext cache reused a
context where the resilience factory was momentarily unavailable (e.g.,
bean initialisation order across cached contexts on CI), the cache stored
the bare publisher. Subsequent calls -- including assertions in
ResilienceIntegrationTest -- returned the unwrapped instance, so no
retry/circuit breaker entries were ever registered in the resilience4j
registries, and the tests failed with "Expecting actual not to be null".

The factory now validates that the cached publisher matches the current
resilience configuration (i.e., is a ResilientEventPublisher when the
resilience factory is available) before returning it, and recreates the
publisher otherwise. DestinationAwarePublisher delegates are unwrapped
for the check so dynamic-destination publishers still benefit. Resilience
state is preserved by name in the resilience4j registry, so recreation is
safe.
… changes

Spring's TestContext cache occasionally hands EventPublisherFactory a
publisher that was wrapped around a different context's resilience4j
registries. The wrapping looks correct (instanceof ResilientEventPublisher)
but the autowired CircuitBreakerRegistry/RetryRegistry in the active
context never see the registrations, so assertions in
ResilienceIntegrationTest could not locate the named instances and failed
with "Expecting actual not to be null".

Track the last-seen ResilientEventPublisherFactory instance and clear the
publisher cache whenever the resolved factory identity changes. This keeps
the cache (so callers still get a stable publisher instance per type +
connection) but guarantees the resilience entries land in the registries
the test is actually inspecting. Equality-based tests that rely on
caching (e.g., ComprehensiveDynamicTopicTest) continue to pass.
The publisher cache key normalised null -> defaultConnectionId ("default"),
but the resilience4j entry name kept the raw connectionId. As a result,
getPublisher(type, null) and getPublisher(type, "default") collided in the
cache yet produced different circuit-breaker / retry names. Whichever caller
populated the cache first won, leaving subsequent assertions hunting for a
named instance that did not exist (or, worse, finding the wrong one and
seeing zero metrics because publish flowed through the other instance).

This was the cause of the intermittent ResilienceIntegrationTest failures
on CI: an earlier test in the shared Spring context called the factory with
an explicit "default" connectionId, caching a publisher whose circuit
breaker was named "eda-publisher-application_event_default". When
ResilienceIntegrationTest later asked for the publisher with null and
queried for "eda-publisher-application_event_null", it found nothing.

The factory now resolves the connectionId once, up front, and uses the
resolved value for both the cache key and the resilience4j entry name.
ResilienceIntegrationTest is updated to read the suffix from the cache key
directly instead of assuming the asymmetric "default <-> null" mapping.
feat(eda): add PostgreSQL LISTEN/NOTIFY transport with transactional outbox
@ancongui ancongui merged commit cb593b2 into main May 18, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants