feat(engine): add RouterFactorySpi for pluggable stream factory composition#1757
Conversation
…sition Introduce a service-provider interface that allows extensions to contribute to the composition of the engine's BindingHandler stream factory. When a router is selected by name from engine Configuration, it is instantiated via ServiceLoader, supplied with a RouteableContext (config, current default streamFactory, attachComposite/detachComposite), and attached with a synthesized RouterConfig; the resulting BindingHandler is installed as the worker's stream factory. With no router configured, default behavior is preserved. Mirrors the factory + context shape of Store: RouterFactorySpi.create → Router.supply(RouteableContext) → RouterContext.attach(RouterConfig) → BindingHandler. Engine shutdown invokes RouterContext.detach(routerId). Closes #1754
| EngineRouteableContext( | ||
| Configuration config, | ||
| BindingHandler streamFactory, | ||
| EngineContext engineContext) |
There was a problem hiding this comment.
This seems cyclic, since EngineRouteableContext is contributing to the creation of EngineContext.
Perhaps we should pass in these attachComposite and detachComposite methods as lambdas instead and avoid passing EngineContext.
| Router router, | ||
| RouterConfig routerConfig, |
There was a problem hiding this comment.
This feels incorrectly externalized, as it logically belongs to the Engine, should be handled internally instead.
| final RouterFactory routerFactory = RouterFactory.instantiate(); | ||
| final String routerName = config.routerName(); | ||
| Router router = null; | ||
| RouterConfig routerConfig = null; | ||
| if (routerName != null) | ||
| { | ||
| router = routerFactory.create(routerName, config); | ||
| routerConfig = RouterConfig.builder() | ||
| .id(0L) | ||
| .name(routerName) | ||
| .build(); | ||
| } | ||
|
|
There was a problem hiding this comment.
Move to Engine, use to create EngineRouter, pass to EngineWorker.
| return ENGINE_HOST_RESOLVER.get(this)::resolve; | ||
| } | ||
|
|
||
| public String routerName() |
There was a problem hiding this comment.
Method name should align with property name, use router() here.
- rename EngineConfiguration.routerName() to router() to align with property - move RouterFactory loading from EngineBuilder into Engine - EngineRouteableContext takes Consumer<NamespaceConfig> lambdas for attachComposite/detachComposite instead of the full EngineContext (the previous shape was cyclic — EngineRouteableContext is contributing to the creation of EngineContext) - encapsulate router lifecycle in a new EngineRouter class that itself implements BindingHandler with a mutable internal delegate; start() at EngineWorker.onStart, close() at onClose - EngineWorker.streamFactory() returns the EngineRouter so binding factories cache a stable reference at supply time; this fixes the NPE seen in binding-http (HttpServerFactory cached context.streamFactory() during Binding.supply, but the field was previously initialized at the end of the EngineWorker constructor — after supply)
…eRouter - Engine resolves the Router and synthesises RouterConfig from EngineConfiguration.router(), passes both to each EngineWorker - EngineWorker constructs an EngineRouter early (before binding.supply) so the BindingHandler reference cached by bindings is the EngineRouter; this fixes the binding-http NPE - EngineConfiguration.routerName() renamed to router()
Aligns the accessor name with the property name `zilla.engine.router`.
Constructs EngineRouter early (before Binding.supply) so the BindingHandler reference cached by binding factories is the EngineRouter. Calls start() at onStart and close() at onClose to attach/detach the router context.
…nfig Per review: Engine creates EngineRouter and passes to EngineWorker. The worker provides its per-worker default stream factory and routeable context via a new attach(...) method called early in its constructor (before binding.supply).
Engine creates one EngineRouter per worker (in the worker creation loop) using the resolved Router and synthesised RouterConfig, and passes the EngineRouter to EngineWorker.
Constructor parameter changed from (Router, RouterConfig) to EngineRouter. Worker calls engineRouter.attach(defaultStreamFactory, routeable) early in the constructor (before binding.supply) so binding factories cache the EngineRouter as their stable BindingHandler reference.
| private long budgetId; | ||
| private long authorizedId; | ||
|
|
||
| private final EngineRouter engineRouter; |
There was a problem hiding this comment.
| private final EngineRouter engineRouter; | |
| private final EngineRouter router; |
| BindingHandler defaultStreamFactory = this::newStream; | ||
| RouteableContext routeable = new EngineRouteableContext(config, defaultStreamFactory, | ||
| this::attachComposite, this::detachComposite); | ||
| this.engineRouter = engineRouter; | ||
| this.engineRouter.attach(defaultStreamFactory, routeable); | ||
|
|
There was a problem hiding this comment.
| BindingHandler defaultStreamFactory = this::newStream; | |
| RouteableContext routeable = new EngineRouteableContext(config, defaultStreamFactory, | |
| this::attachComposite, this::detachComposite); | |
| this.engineRouter = engineRouter; | |
| this.engineRouter.attach(defaultStreamFactory, routeable); | |
| EngineRouteable routeable = new EngineRouteable(config, this::newStream, | |
| this::attachComposite, this::detachComposite); | |
| this.router = router.supplyContext(routeable); |
| final String routerName = config.router(); | ||
| final Router router = routerName != null | ||
| ? RouterFactory.instantiate().create(routerName, config) | ||
| : null; | ||
| final RouterConfig routerConfig = routerName != null | ||
| ? RouterConfig.builder() | ||
| .id(0L) | ||
| .name(routerName) | ||
| .build() | ||
| : null; | ||
|
|
There was a problem hiding this comment.
Let's do all this inside EngineRouter constructor.
| List<EngineWorker> workers = new ArrayList<>(workerCount); | ||
| for (int workerIndex = 0; workerIndex < workerCount; workerIndex++) | ||
| { | ||
| EngineRouter engineRouter = new EngineRouter(router, routerConfig); |
There was a problem hiding this comment.
| EngineRouter engineRouter = new EngineRouter(router, routerConfig); | |
| EngineRouter router = new EngineRouter(config); |
| @Override | ||
| public MessageConsumer newStream( | ||
| int msgTypeId, | ||
| DirectBuffer buffer, | ||
| int index, | ||
| int length, | ||
| MessageConsumer sender) | ||
| { | ||
| return delegate.newStream(msgTypeId, buffer, index, length, sender); | ||
| } |
| void attach( | ||
| BindingHandler defaultStreamFactory, | ||
| RouteableContext routeable) | ||
| { | ||
| this.defaultDelegate = defaultStreamFactory; | ||
| this.delegate = defaultStreamFactory; | ||
| this.routeable = routeable; | ||
| } |
There was a problem hiding this comment.
| void attach( | |
| BindingHandler defaultStreamFactory, | |
| RouteableContext routeable) | |
| { | |
| this.defaultDelegate = defaultStreamFactory; | |
| this.delegate = defaultStreamFactory; | |
| this.routeable = routeable; | |
| } | |
| BindingHandler attach( | |
| RouterConfig config) | |
| { | |
| return context != null ? context.attach(config) : streamFactory; | |
| } |
| { | ||
| context.detach(config.id); | ||
| context = null; | ||
| delegate = defaultDelegate; |
There was a problem hiding this comment.
| delegate = defaultDelegate; |
| } | ||
| } | ||
|
|
||
| void close() |
There was a problem hiding this comment.
| void close() | |
| void detach(RouterConfig config) |
|
|
||
| void start() | ||
| { | ||
| if (router != null && context == null) | ||
| { | ||
| context = router.supply(routeable); | ||
| delegate = context.attach(config); | ||
| } | ||
| } |
…/attach/detach EngineRouter no longer implements BindingHandler. Two constructors: engine-wide (EngineConfiguration) and per-worker (RouterContext, BindingHandler). Engine-wide exposes supplyContext(routeable) returning per-worker. Per-worker exposes attach(RouterConfig) returning the wrapped BindingHandler (or default) and detach(RouterConfig) for cleanup. Rename EngineRouteableContext to EngineRouteable. Engine constructs one EngineRouter per worker via new EngineRouter(config).
Field renamed engineRouter -> router. Worker captures routerConfig from engine-wide router via router.config(), constructs EngineRouteable, calls router.supplyContext(routeable) to get the per-worker EngineRouter, and router.attach(routerConfig) to obtain the BindingHandler used as streamFactory. Detaches in onClose.
…ntext EngineRouter implements Router as the default no-op pass-through. The new EngineRouterContext returns RouteableContext.streamFactory() from attach() and is a no-op on detach(routerId).
…se routerConfig() Engine resolves the Router (operator-configured via RouterFactory or the default EngineRouter), synthesises a RouterConfig, and exposes routerConfig(). EngineWorker now takes (Router, RouterConfig).
Component supply loops (binding/exporter/guard/vault/catalog/store/model/ metric-group) and EngineRegistry construction move from the constructor to onStart, so the constructor finishes before components start calling back into EngineContext. streamFactory is set in onStart from router.attach (routerConfig); router.detach(routerConfig.id) is called in onClose. EngineWorker now takes (Router, RouterConfig); the per-worker RouterContext is obtained via router.supply(routeable).
| private final Collection<Store> stores; | ||
| private final Collector collector; | ||
| private final Consumer<NamespaceConfig> process; | ||
| private Map<String, MetricGroup> metricGroupsByName; |
There was a problem hiding this comment.
Move newly non-final fields down after all the final fields into a block of their own for readability.
| Map<String, BindingContext> bindingsByType = new LinkedHashMap<>(); | ||
| for (Binding binding : bindings) | ||
| { | ||
| bindingsByType.put(binding.name(), binding.supply(this)); | ||
| } |
There was a problem hiding this comment.
Can we simplify each of these using bindings.stream().collect(toMap(Binding::name, b -> b.supply(this))) for brevity?
| final String routerName = config.router(); | ||
| final Router router = routerName != null | ||
| ? RouterFactory.instantiate().create(routerName, config) | ||
| : new EngineRouter(); |
There was a problem hiding this comment.
Does it make sense to register an SPI for EngineRouter, then default the engine router name to "engine" and not need the special case for null?
…"engine" EngineRouterFactorySpi exposes the default EngineRouter through ServiceLoader and the moditect module-info `provides` clause. zilla.engine.router defaults to "engine" so Engine no longer needs a null special case to instantiate the default Router. RouterFactoryTest expects both "engine" and "test" routers to be discovered.
Engine resolves Router via RouterFactory using config.router() — the default "engine" property value selects the EngineRouter SPI.
- Move metricGroupsByName + usageMetric back to the constructor as final fields (they don't escape `this`) - Group remaining non-final fields (modelsByType, registry, streamFactory) into a dedicated block at the end of the field declarations for readability - Simplify supply loops in onStart with stream().collect(toMap(...)) for bindings, exporters, guards, stores, and models (vault/catalog kept as for-loops because they fan out aliases)
Summary
Adds an SPI surface (
RouterFactorySpi) that lets engine extensions contribute to the composition ofEngineContext.streamFactory(). Mirrors the factory + context shape already used byStore:RouterFactorySpi.create(Configuration) → RouterRouter.supply(RouteableContext) → RouterContextRouterContext.attach(RouterConfig) → BindingHandler(paired withdetach(long routerId))RouteableContextis the under-engine surface available to the router during setup: it exposes the engineConfiguration, the engine's current defaultBindingHandlerstream factory (which the router may wrap), andattachComposite(...)/detachComposite(...)for injecting synthesized namespaces alongside operator-authored ones.When
zilla.engine.routerselects a router by name, the engine instantiates it viaServiceLoader, supplies aRouteableContext, attaches a synthesizedRouterConfig, and installs the returnedBindingHandleras the worker's stream factory. With no router configured, runtime behavior is unchanged.The change is engine-only:
runtime.engine.routerpackage:RouterFactorySpi,RouterFactory,Router,RouterContext,RouteableContextruntime.engine.config.RouterConfig(withid) +RouterConfigBuilderEngineConfigurationaddszilla.engine.routerproperty androuterName()accessorEngineBuilder/Engine/EngineWorkerthread the router through;EngineWorker.streamFactory()returns a field initialised tothis::newStreamand replaced by the wrapped handler when a router attaches;onCloseinvokesRouterContext.detach(routerId)module-infoexports the new package and declaresuses RouterFactorySpitest/internal/router/(TestRouterFactorySpi,TestRouter,TestRouterContext) returns the engine's default handler unchanged; wired viaMETA-INF/servicesRouterFactoryTestcovers discovery, creation, and the unrecognized-name error citingRouterFactory.names()EngineIT.shouldHandshakeWithRouterexercises the full startup path with a router attachedCloses #1754
Test plan
./mvnw -pl runtime/engine clean verify— 319 unit tests, 203 ITs pass; JaCoCo coverage check passesRouterFactoryTest(3 tests): discovery, creation, unrecognized-name errorEngineIT#shouldHandshakeWithRouter: full handshake with the test router attached, default behavior preservedEngineIT#shouldHandshakestill passes with no router configured (no-router default unchanged)EngineConfigurationTest#shouldVerifyConstantsupdated to verify the newzilla.engine.routerproperty nameGenerated by Claude Code