Multi-hosted endpoint lifecycle, keyed DI isolation, and slot-scoped logging#7633
Merged
Conversation
8ccd077 to
6087365
Compare
danielmarbach
commented
Mar 2, 2026
| var instancePump = CreateReceiver(consecutiveFailuresConfiguration, instanceSpecificPump); | ||
| var instanceProcessingLogSlot = CreateReceiverProcessingLogSlot(endpointLogSlot, InstanceSpecificReceiverId); | ||
| var instancePump = CreateReceiver(consecutiveFailuresConfiguration, instanceSpecificPump, instanceProcessingLogSlot); | ||
| var instancePipelineExecutor = new MainPipelineExecutor(builder, pipelineCache, messageOperations, configuration.PipelineCompletedSubscribers, receivePipeline, activityFactory, pipelineMetrics, envelopeUnwrapper); |
Contributor
Author
There was a problem hiding this comment.
This is strictly speaking not necessary but felt cleaner.
Contributor
Author
|
The gap I can think of currently is that if someone uses the extension method outside a generic host with their custom service collection and provider (for example, WPF), then they have no way to start the endpoint. This could be mitigated by providing a small abstraction like this /// <summary>
/// Represents a lifecycle abstraction for endpoints that use an externally managed container.
/// </summary>
public interface IExternallyManagedEndpointLifecycle : IAsyncDisposable
{
/// <summary>
/// Creates and initializes the endpoint using the provided service provider.
/// </summary>
/// <param name="builder">The <see cref="IServiceProvider"/> instance used to resolve dependencies.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe.</param>
Task Create(IServiceProvider builder, CancellationToken cancellationToken = default);
/// <summary>
/// Starts the endpoint.
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe.</param>
Task Start(CancellationToken cancellationToken = default);
/// <summary>
/// Stops the endpoint.
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe.</param>
Task Stop(CancellationToken cancellationToken = default);
}
sealed class ExternallyManagedEndpointLifecycle(Func<IServiceProvider, IEndpointLifecycle> endpointLifecycleFactory) : IExternallyManagedEndpointLifecycle
{
public async Task Create(IServiceProvider builder, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(builder);
if (endpointLifecycle is null)
{
await createSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
endpointLifecycle ??= endpointLifecycleFactory(builder);
}
finally
{
createSemaphore.Release();
}
}
await endpointLifecycle.Create(cancellationToken).ConfigureAwait(false);
}
public async Task Start(CancellationToken cancellationToken = default)
{
if (endpointLifecycle is null)
{
throw new InvalidOperationException("The endpoint must be created before it can be started.");
}
_ = await endpointLifecycle.CreateAndStart(cancellationToken).ConfigureAwait(false);
}
public async Task Stop(CancellationToken cancellationToken = default)
{
if (endpointLifecycle is null)
{
return;
}
await endpointLifecycle.Stop(cancellationToken).ConfigureAwait(false);
}
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref isDisposed, 1) == 1)
{
return;
}
if (endpointLifecycle is not null)
{
await endpointLifecycle.DisposeAsync().ConfigureAwait(false);
}
createSemaphore.Dispose();
}
readonly SemaphoreSlim createSemaphore = new(1, 1);
volatile IEndpointLifecycle? endpointLifecycle;
int isDisposed;
}that is consistently registered in the service collection, like services.AddSingleton<IExternallyManagedEndpointLifecycle>(_ => new ExternallyManagedEndpointLifecycle(provider => new BaseEndpointLifecycle(externallyManagedContainerHost, provider)));or during multi hosting services.AddKeyedSingleton<IExternallyManagedEndpointLifecycle>(endpointIdentifier, (_, _) => new ExternallyManagedEndpointLifecycle(provider => new EndpointLifecycle(externallyManagedContainerHost, provider, endpointIdentifier, keyedServices))); |
Contributor
Author
|
@bording and I discussed the above and concluded it is better to not introduce confusing abstractions given that people can pull in the generic host as a library and call the service collection extensions available in this PR. |
DavidBoike
reviewed
Mar 4, 2026
…removing `HostAwareMessageSession`.
…ing` integration.
…te endpoint contextual logging.
This reverts commit de220a0.
…tralize logic and simplify usage across multiple components
…nsistency in endpoint lifecycle management
… and improve separation of concerns
…ispose handling - Introduced unit tests for `EndpointHostedService` to validate stop and dispose behavior. - Integrated `Stop` method into `IEndpointLifecycle` and updated implementations to support proper endpoint shutdown. - Improved concurrency safety in `RunningEndpointInstance.Stop`.
…ency and state handling
…roper disposed endpoint handling
…o prevent duplicates and ensure proper source filtering
…safety after await
…og handling during endpoint shutdown
…ollectionAdapter` validation during acceptance tests
…isses explicit flush during factory registration
…onAdapter` validation during endpoint creation
… hardware, consolidating volatile reads/writes and introducing `CachedSlot` for atomic context+logger updates.
6087365 to
277b311
Compare
…eters, keyed services support, and advanced scenarios
DavidBoike
approved these changes
Mar 5, 2026
This was referenced May 21, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR consolidates and hardens the hosting work needed to run multiple NServiceBus endpoints in the same .NET host while preserving endpoint isolation for dependency resolution, message sessions, and logging context.
The branch replaces ad-hoc startup orchestration with an explicit endpoint lifecycle model (
Create->Start->Stop->Dispose), introduces keyed endpoint registration viaAddNServiceBusEndpoint, and reworks logging to use endpoint-scoped slots that safely bridge intoMicrosoft.Extensions.Loggingscopes.In addition to feature work, the PR includes concurrency and correctness fixes for startup/shutdown races, logging cache visibility on weakly-ordered hardware, and endpoint stop semantics.
Problem Statement
When multiple endpoints are hosted in one process, the runtime needs to guarantee:
IMessageSessionbinding per endpoint instancePrevious structure made these concerns harder to reason about because creation/start/stop responsibilities were spread across multiple code paths and logging context attachment was not fully integrated with keyed multi-host scenarios.
Architecture
1) Endpoint lifecycle model
Lifecycle handling is now centralized behind
IEndpointLifecycleand used byEndpointHostedService.Create(...)prepares aStartableEndpointStart(...)starts it and returnsIEndpointInstanceStop(...)gracefully shuts down if startedDisposeAsync(...)is idempotent and guarantees cleanupCore flow:
EndpointStartupRunner.Create()performs one-time creation (semaphore + double-check)EndpointPreparation.Prepare()resolves logging factory for the endpoint slot, runs installers, runs setupStartableEndpoint.Start()is guarded by a semaphore and cached instanceRunningEndpointInstance.Stop()handles concurrent stop calls and slot unregistrationThis removes duplication between internal/external container paths and makes lifecycle state transitions explicit.
2) Multi-host endpoint registration
ServiceCollectionExtensions.AddNServiceBusEndpoint(...)is introduced as the primary registration surface for host-based scenarios.It validates and enforces:
Registration behavior:
This gives a consistent host integration model while preventing common misconfigurations early.
3) Keyed service isolation layer
The keyed hosting path now uses:
KeyedServiceCollectionAdapterKeyedServiceProviderAdapterKeyedServiceScopeFactoryKeyedServiceKeyNotable characteristics:
baseKey, optionalserviceKey) to isolate per-endpoint servicesIEnumerable<T>behavior aligned with keyed resolution including an explicitAnykey pathThe result is predictable service resolution for each hosted endpoint, including nested factories and scoped dependencies.
4) Slot-based logging and MEL scope bridge
Logging was refactored around endpoint slots to preserve endpoint identity across runtime operations.
LogManagernow tracks slot contexts and per-slot factoriesUnregisterSlot)MicrosoftLoggerFactoryAdapterimplements slot scope bridging so MEL receives structured scope valuesReceiveComponentnow wraps message receivers (LogWrappedMessageReceiver) so message and error callbacks execute within the correct slot scope, including satellite and instance-specific receiver contexts.Concurrency and Correctness Improvements
RunningEndpointInstance.Stop()now handlesStoppingstate correctly and avoids duplicate stop pipelinesMessageSessioninitialization gating remains async-safe and cancellation-token linking is centralizedLogManagercache publication/read ordering was tightened for correctness on weakly-ordered architectures (e.g., ARM)Public API Impact
Additions:
ServiceCollectionExtensions.AddNServiceBusEndpoint(this IServiceCollection, EndpointConfiguration, object? endpointIdentifier = null)KeyedServiceKey(public) withAnyandAnyKey(...)Behavioral impact:
IHostedLifecycleServiceNo behavior interface changes were introduced for pipeline/message handling contracts.
Tests and Validation
The PR includes focused tests across hosting, logging, lifecycle, and DI boundaries, including:
IMessageSessionresolution for multi-hosted endpointsEndpointHostedServicestart/stop/dispose behaviorKeyedServiceProviderAdapterresolution semanticsLogWrappedMessageReceiverslot scoping in receive callbacksAcceptance coverage and API approvals were updated accordingly.
Migration Notes
AddNServiceBusEndpoint(...).endpointIdentifierper endpointThese constraints are enforced to keep endpoint isolation deterministic and prevent cross-endpoint bleed-through.
Design Decision: Endpoint Identifier and Keyed Services
During the review of this PR, we revisited the original design decisions around the
endpointIdentifierused for keyed service registration. After re-evaluating the tradeoffs and validating the assumptions against real-world scenarios, we have decided to retain the current design without changes.Decision
No changes are required. The
endpointIdentifierremains of typeobject?, and its usage and semantics stay as originally designed.Context
This PR introduces a hosting model that enables multiple NServiceBus endpoints to run within the same .NET host while preserving strict isolation of dependency resolution, message sessions, and logging.
Keyed services are fundamental to achieving this isolation when multiple endpoints coexist in a single process.
Rationale
1. Alignment with Microsoft Dependency Injection
The design intentionally mirrors the capabilities of
Microsoft.Extensions.DependencyInjection, which supports keyed services using anobjectas the key. Maintaining this alignment ensures:Restricting the identifier to
stringwould diverge from established DI patterns and unnecessarily limit extensibility.2. Flexibility for Advanced Scenarios
Allowing an
objectas the key enables sophisticated use cases such as multi-tenancy and dynamic multi-hosting. For example, tenant context objects can be used directly as keys to resolve tenant-specific dependencies.This capability integrates naturally with
[FromKeyedServices], including inherited key resolution, ensuring the correct services are resolved automatically at runtime.3. Pit of Success for Typical Users
Although
objectis supported, the recommended and most common choice remains the endpoint name, which is astring. This provides a simple and predictable default while preserving extensibility for advanced users.This approach balances usability with flexibility without imposing unnecessary constraints.
4. Keyed Services as an Advanced Concept
Keyed services are intentionally introduced only when hosting multiple endpoints. This follows a progressive complexity model similar to multi-tenancy, where identifiers become necessary as architectural complexity increases.
This design prevents exposing advanced concepts to users who do not need them.
5. Migration and Backward Compatibility
We revisited whether the endpoint name should always serve as the default identifier. While convenient, enforcing this universally would introduce drawbacks.
Specifically, it would require keyed services even in single-endpoint hosting scenarios, leading to:
Migration from
NServiceBus.Extensions.Hostingwould become harder due to the need to adopt keyed dependency resolution.Keyed services would be exposed to users who do not benefit from them.
Applications that inject
IMessageSessionorITransactionalSessionin generic hosting scenarios would break and require updates to use keyed resolutions.To avoid these issues, keyed services are introduced only when multiple endpoints are hosted, preserving backward compatibility and ensuring a smooth migration path.
Considered Alternatives
Restricting the Identifier to
stringAdvantages:
Disadvantages:
After careful consideration, these drawbacks were determined to outweigh the benefits.
Conclusion
The existing design provides the best balance between usability, flexibility, and platform alignment.
object?string)