Skip to content

feat: derive tenant header from inbound messages via #[WithTenantResolver]#664

Merged
dgafka merged 14 commits intomainfrom
feat/external-message-tenant-mapping
May 7, 2026
Merged

feat: derive tenant header from inbound messages via #[WithTenantResolver]#664
dgafka merged 14 commits intomainfrom
feat/external-message-tenant-mapping

Conversation

@dgafka
Copy link
Copy Markdown
Member

@dgafka dgafka commented May 6, 2026

Why is this change proposed?

External messages (Kafka, AMQP, Scheduled poller) often don't carry the
multi-tenant header — the tenant has to be derived from something else,
like the originating topic or a payload field. Today this fails because
the connection-switching Around interceptors fire before any user
code can populate the header, throwing "Lack of context about tenant
in Message Headers".

This change introduces a first-class, declarative #[WithTenantResolver]
attribute that runs the user's expression against the inbound message
before any tenant-aware interceptor sees it.

Description of Changes

  • New #[WithTenantResolver(expression: "...")] attribute (Apache, in
    ecotone/dbal). Place it on inbound channel adapter methods.
  • New MultiTenantHeaderResolver Before interceptor (Apache):
    evaluates the expression against payload/headers, injects the
    configured tenant header into the message. Explicit tenant headers
    always win; null expression results are a no-op; non-scalar results
    throw a meaningful InvalidArgumentException.
  • ScheduledModule now propagates method-level annotations to the
    inbound channel adapter gateway, mirroring KafkaModule/RabbitConsumerModule.
  • Validation: #[WithTenantResolver] on a non-inbound-adapter method
    (CommandHandler, EventHandler, Asynchronous, etc.) throws a
    ConfigurationException at boot with an explanation of why internal
    Message Channels don't need a resolver.

All new components are Apache-licensed. Combining #[WithTenantResolver]
with an Apache adapter like #[Scheduled] requires no Enterprise
licence; combining it with #[KafkaConsumer] or #[RabbitConsumer]
requires Enterprise only because those adapters themselves do.

Usage

use Ecotone\Dbal\Attribute\WithTenantResolver;
use Ecotone\Kafka\Attribute\KafkaConsumer;
use Ecotone\Messaging\Attribute\Parameter\Headers;

final class OrderEventConsumer
{
    #[KafkaConsumer('orders', topics: ['orders_eu', 'orders_us'])]
    #[WithTenantResolver(expression: "headers['kafka_topic']")]
    public function handle(string $payload, #[Headers] array $headers): void
    {
        // headers['tenant'] is populated from the originating Kafka topic
    }
}
use Ecotone\Dbal\Attribute\WithTenantResolver;
use Ecotone\Messaging\Attribute\Scheduled;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\Support\MessageBuilder;

final class ExternalEventPoller
{
    #[Scheduled(requestChannelName: 'externalArrived', endpointId: 'externalPoller')]
    #[WithTenantResolver(expression: "payload['tenantId']")]
    public function poll(): ?Message
    {
        $event = $this->source->next();
        return $event === null ? null : MessageBuilder::withPayload($event)->build();
    }
}

You can also reference services from the expression:

#[WithTenantResolver(expression: "reference('topicTenantMap').lookup(headers['kafka_topic'])")]

Use cases

  1. Kafka topic per tenant — One #[KafkaConsumer] subscribes to many
    per-tenant topics; the resolver picks the tenant from kafka_topic.
  2. External REST/webhook ingestion — A #[Scheduled] poller pulls
    pending events from a third-party API and tags each with its origin;
    the resolver translates origin → tenant before downstream handlers run.
  3. AMQP per-tenant queues#[AmqpConsumer] consumes from queues
    named after tenants; resolver reads the routing key.

Flow

sequenceDiagram
    participant Broker as External broker / scheduler
    participant Adapter as Inbound channel adapter gateway
    participant Resolver as MultiTenantHeaderResolver (Before)
    participant Channel as Request channel
    participant Handler as User handler
    participant Tenant as MultiTenantConnectionFactory

    Broker->>Adapter: inbound message
    Adapter->>Resolver: pointcut WithTenantResolver matches
    Resolver->>Resolver: evaluate expression on payload+headers
    Resolver-->>Adapter: tenant header injected
    Adapter->>Channel: forward message
    Channel->>Handler: dispatch
    Handler->>Tenant: getConnection() — tenant context resolved
Loading

Out of scope (for follow-up)

#[WithTenantResolver] on #[Asynchronous] handlers is rejected by the
new placement validation. Internal Message Channels already carry the
tenant context propagated from the originating bus call, so a resolver
there would have nothing meaningful to derive. If an asynchronous
handler is processing externally-arrived messages, the resolver belongs
on the inbound channel adapter that produces them.

Pull Request Contribution Terms

  • I have read and agree to the contribution terms outlined in CONTRIBUTING.

dgafka and others added 14 commits April 30, 2026 11:05
Capture the agreed design for #[WithTenantResolver(expression: ...)] that
lets handlers derive the tenant header from inbound headers (e.g. kafka_topic)
in time for multi-tenant connection switching, addressing the case where
externally-arriving async messages lack a tenant header.
…ttribute

Drop the runtime-resolution / Enterprise mechanism in favour of a method-level
attribute placed alongside #[KafkaConsumer] / #[RabbitConsumer]. Channel-adapter
modules already propagate getAllAnnotationDefinitions() to the gateway, so a
Before interceptor with pointcut WithTenantResolver::class fires only on tagged
consumer methods with zero overhead elsewhere and no core framework changes.
Step-by-step TDD plan for the WithTenantResolver attribute design:
attribute + resolver service + module wiring + behaviour tests covering
explicit-header preservation, null-result fallthrough, reference()-based
mapper, and unannotated-handler isolation.
…lver]

Adds a declarative #[WithTenantResolver(expression: "...")] attribute for
inbound channel adapter methods (Kafka/AMQP/Scheduled). The resolver runs
as a Before interceptor on the inbound gateway, evaluating the expression
against payload+headers and injecting the configured tenant header into
the message before any tenant-aware Around interceptor sees it.

ScheduledModule now propagates handler-method annotations to its channel
adapter gateway, mirroring KafkaModule/RabbitConsumerModule. Placement
validation rejects #[WithTenantResolver] on synchronous/asynchronous
handlers with an explanatory ConfigurationException.
These are local brainstorming and implementation-plan notes used during
development, not user-facing documentation. Add the directory to
.gitignore so they don't ship with PRs.
Counter-based regression guard: even though ScheduledModule now propagates
handler annotations into the channel adapter's endpoint annotations
while InboundChannelAdapterBuilder separately exposes the annotated
interface, a single registered Before interceptor still produces exactly
one match per target invocation. Pointcut matching is boolean.
Earlier version asserted on InboundChannelAdapterBuilder::getEndpointAnnotations()
directly — testing an internal storage primitive rather than user-observable
behaviour. Replaced with a flow test that bootstraps EcotoneLite, runs the
scheduled poller, and verifies a Before interceptor with a marker-attribute
pointcut actually fires. This is what users rely on; how the framework
delivers it internally is free to change.
WithTenantResolver and MultiTenantHeaderResolver are now Enterprise-licensed.
MultiTenantConnectionFactoryModule throws LicensingException at boot if any
method is annotated with WithTenantResolver but no Enterprise licence is
configured. New WithTenantResolverLicensingTest covers both branches.
Existing tests updated to pass LicenceTesting::VALID_LICENCE.
Each test now declares its services next to its assertions instead of
loading them from packages/*/tests/Fixture/MultiTenant/. The anonymous
classes are still picked up by EcotoneLite via \$instance::class, so
behavior is unchanged. Reduces file count and makes scenarios easier
to read end-to-end.
Wires the Scheduled poller -> #[Asynchronous] CommandHandler flow against
two real connections (postgres for tenant_a, mysql for tenant_b) and
asserts that messages tagged source=tenant_a end up inserted in the
postgres database while source=tenant_b inserts land in mysql. Builds
on the DbalMessagingTestCase tenant connection factories already used by
DbalBusinessMethod\MultiTenantTest.

Proves the resolver-injected tenant header drives connection routing
all the way through to the physical database, not just to header
inspection in tests.
Splits ScheduledTenantResolverDatabaseRoutingTest into async and sync
handler variants. Both bootstrap the same Scheduled poller +
WithTenantResolver wiring against the two real tenant connections;
the asynchronous variant routes through an in-memory queue and the
synchronous variant lets the CommandHandler run inline. Either way,
tenant_a and tenant_b inserts must land in the correct physical
database. Message order swapped (tenant_b first) so the test would
fail if routing accidentally relied on round-robin polling order
instead of resolver-injected headers.
@dgafka dgafka merged commit af87b64 into main May 7, 2026
8 of 11 checks passed
@dgafka dgafka deleted the feat/external-message-tenant-mapping branch May 7, 2026 17:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant