Skip to content

fix: support EventStreamEmitter inside #[ProjectionFlush]#662

Merged
dgafka merged 2 commits intomainfrom
ensure-emitter-in-flush
May 4, 2026
Merged

fix: support EventStreamEmitter inside #[ProjectionFlush]#662
dgafka merged 2 commits intomainfrom
ensure-emitter-in-flush

Conversation

@dgafka
Copy link
Copy Markdown
Member

@dgafka dgafka commented May 4, 2026

Why is this change proposed?

Calling EventStreamEmitter::emit(...) (or linkTo(...)) from a #[ProjectionFlush] method threw MessageHeaderDoesNotExistsException: Header projection.name does not exists from StreamNameMapper. The flush dispatch in EcotoneProjectorExecutor skipped the header context that the event-handler dispatch (project()) sets up — projection.name, projection.live, streamBasedSourced — and never wrapped the dispatch in MessageHeadersPropagatorInterceptor::storeHeaders, so propagation to outbound gateways from inside flush was broken.

This blocks a common pattern: building per-batch state across event handlers, then in flush persisting the state and emitting a single derived event (e.g. "wallet balance changed", "ticket list updated") — exactly the use case the API was designed for.

Description of Changes

  • EcotoneProjectorExecutor::flush() now mirrors the header context of project(): sets PROJECTION_LIVE (false during rebuild), STREAM_BASED_SOURCED, and PROJECTION_NAME (still enterprise-gated via withProjectionName() — open-core flushes don't get it, matching project() behaviour). The dispatch is wrapped in messageHeadersPropagatorInterceptor->storeHeaders() so EventStreamEmitter's PropagateHeaders gateway can merge those headers into outbound emit/linkTo calls.
  • ProjectorExecutor::flush() interface gains bool $isRebuilding = false; ProjectingManager::executePartitionBatch passes $shouldReset so rebuild-time emits are correctly suppressed by the PROJECTION_LIVE filter. Existing InMemoryProjector / EventStoreChannelAdapterProjection updated for the signature.
  • New tests in packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php:
    • emit-from-flush works once per batch (Partitioned + FromAggregateStream)
    • rebuild does not re-emit events from flush
    • non-enterprise bootstrap shows a clear LicensingException ("Using #[ProjectionState] in #[ProjectionFlush] methods requires Ecotone Enterprise licence.")

Usage

#[Partitioned]
#[ProjectionV2('wallet_balance')]
#[FromAggregateStream(Wallet::class)]
class WalletBalanceProjection
{
    #[EventHandler]
    public function whenWalletInitialized(
        WalletWasInitialized $event,
        #[ProjectionState] array $wallet = []
    ): array {
        return ['walletId' => $event->walletId, 'balance' => 0];
    }

    #[EventHandler]
    public function whenMoneyWasAdded(
        MoneyWasAddedToWallet $event,
        #[ProjectionState] array $wallet
    ): array {
        $wallet['balance'] += $event->amount;
        return $wallet;
    }

    #[ProjectionFlush]
    public function flush(
        #[ProjectionState] array $wallet,
        EventStreamEmitter $emitter,
    ): void {
        $this->saveWallet($wallet);
        $emitter->emit([new WalletBalanceWasChanged($wallet['walletId'], $wallet['balance'])]);
    }
}

Use cases

  1. Per-batch derived events — collapse N inbound events into a single outbound event per partition flush (e.g. one WalletBalanceWasChanged per batch instead of one per MoneyWasAdded), reducing downstream consumer pressure.
  2. Atomic persist + publish — a #[ProjectionFlush] runs inside the projection transaction; persisting projection state and emitting the derived event in the same flush keeps both within one boundary.
  3. Safe rebuilds — when a projection is rebuilt via ProjectionRegistry::prepareRebuild(), flush still runs (state is reconstructed) but emits are suppressed via the projection.live filter, so downstream consumers don't see duplicate "balance changed" events.

Flow

sequenceDiagram
    participant PM as ProjectingManager
    participant Exec as EcotoneProjectorExecutor
    participant Interceptor as HeadersPropagator
    participant Flush as #[ProjectionFlush] handler
    participant Emitter as EventStreamEmitter
    participant Store as EventStore

    PM->>Exec: flush(state, isRebuilding)
    Exec->>Interceptor: storeHeaders(projection.name, projection.live, ...)
    Interceptor->>Flush: invoke flush method
    Flush->>Emitter: emit([DerivedEvent])
    Emitter->>Interceptor: propagateHeaders()
    Interceptor-->>Emitter: + projection.name, projection.live
    alt projection.live = true
        Emitter->>Store: append to projection_<name>
    else projection.live = false (rebuild)
        Emitter--xStore: filtered out
    end
Loading

Pull Request Contribution Terms

  • I have read and agree to the contribution terms outlined in CONTRIBUTING.

dgafka added 2 commits May 4, 2026 18:03
Calling EventStreamEmitter::emit() / linkTo() from a #[ProjectionFlush]
method threw "Header projection.name does not exists" because the flush
dispatch in EcotoneProjectorExecutor skipped the header context that
project() sets up (projection.name, projection.live, streamBasedSourced)
and never wrapped the dispatch in MessageHeadersPropagatorInterceptor.

flush() now mirrors project()'s header context — projection.name stays
enterprise-gated via withProjectionName() — and wraps the dispatch in
storeHeaders() so EventStreamEmitter's PropagateHeaders gateway can
merge those headers into outbound emit/linkTo calls. ProjectorExecutor
gains an isRebuilding flag so projection.live is set to false during
rebuild, suppressing flush-time emits via the existing live-filter.
Drop the default values on $userState and $isRebuilding so callers must
explicitly pass both — the rebuild flag in particular should always come
from the surrounding flow rather than silently defaulting to live mode.
@dgafka dgafka merged commit 8f8f734 into main May 4, 2026
8 checks passed
@dgafka dgafka deleted the ensure-emitter-in-flush branch May 4, 2026 16:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant