Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow outbox to be enabled for send only endpoints via an explicit opt-in setting #6750

Merged
merged 9 commits into from
Jun 8, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace NServiceBus.AcceptanceTests.Outbox
{
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;

public class When_outbox_enabled_for_send_only : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_fail_prerequisites_check()
{
string startupDiagnostics = null;
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(e => e.CustomConfig(c => c.CustomDiagnosticsWriter((d, __) => { startupDiagnostics = d; return Task.CompletedTask; })))
.Done(c => c.EndpointsStarted)
.Run();

StringAssert.Contains("Outbox is only relevant for endpoints receiving messages.", startupDiagnostics);
}

public class Context : ScenarioContext
{
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(c =>
{
c.EnableOutbox();
c.SendOnly();
}).EnableStartupDiagnostics();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
namespace NServiceBus.AcceptanceTests.Outbox
{
using System;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;

public class When_outbox_enabled_with_transaction_mode_above_receive_only : NServiceBusAcceptanceTest
{
[Test]
public void Should_fail_to_start()
{
var exception = Assert.ThrowsAsync<Exception>(async () => await Scenario.Define<Context>()
.WithEndpoint<Endpoint>()
.Done(_ => false)
.Run());

StringAssert.Contains($"Outbox requires transport to be running in `{nameof(TransportTransactionMode.ReceiveOnly)}` mode", exception.Message);
}

public class Context : ScenarioContext
{
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(c =>
{
c.EnableOutbox();
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive;
});
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace NServiceBus.AcceptanceTests.Outbox
{
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;

public class When_outbox_enabled_with_transactions_off : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_fail_prerequisites_check()
{
string startupDiagnostics = null;
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(e => e.CustomConfig(c => c.CustomDiagnosticsWriter((d, __) => { startupDiagnostics = d; return Task.CompletedTask; })))
.Done(c => c.EndpointsStarted)
.Run();

StringAssert.Contains("Outbox isn't needed since the receive transactions have been turned off", startupDiagnostics);
}

public class Context : ScenarioContext
{
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(c =>
{
c.EnableOutbox();
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.None;
}).EnableStartupDiagnostics();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace NServiceBus.AcceptanceTests.Outbox
{
using System;
using AcceptanceTesting;
using EndpointTemplates;
using NServiceBus.Persistence;
using NUnit.Framework;

public class When_outbox_enabled_without_persister_supporting_it : NServiceBusAcceptanceTest
{
[Test]
public void Should_fail_to_start()
{
var exception = Assert.ThrowsAsync<Exception>(async () => await Scenario.Define<Context>()
.WithEndpoint<Endpoint>()
.Done(_ => false)
.Run());

StringAssert.Contains("The selected persistence doesn't have support for outbox storage", exception.Message);
}

public class Context : ScenarioContext
{
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<ServerWithNoDefaultPersistenceDefinitions>(c =>
{
c.EnableOutbox();
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
c.UsePersistence<FakeNoOutboxSupportPersistence>();
});
}
}

class FakeNoOutboxSupportPersistence : PersistenceDefinition
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,47 @@ namespace NServiceBus.AcceptanceTests.Reliability.SynchronizedStorage
using Extensibility;
using Features;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Outbox;
using NUnit.Framework;
using Persistence;

public class When_opening_storage_session_outside_pipeline : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_provide_adapted_session_with_same_scope()
[Test, Combinatorial]
public async Task Should_provide_adapted_session_with_same_scope([Values(true, false)] bool useOutbox, [Values(true, false)] bool sendOnly)
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>()
.WithEndpoint<Endpoint>(e => e.CustomConfig(c =>
{
if (sendOnly)
{
c.SendOnly();
c.GetSettings().Set("Outbox.AllowUseWithoutReceiving", true);
}
if (useOutbox)
{
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
c.EnableOutbox();
}
}))
.Done(c => c.Done)
.Run();

Assert.True(context.SessionNotNullAfterOpening, "The adapted session was null after opening the session.");
Assert.True(context.StorageSessionEqual, "The scoped storage session should be equal.");

if (useOutbox)
{
Assert.True(context.OutboxNotNullAfterOpening, "The scoped storage session should be equal.");
}
}

public class Context : ScenarioContext
{
public bool StorageSessionEqual { get; set; }
public bool SessionNotNullAfterOpening { get; set; }
public bool OutboxNotNullAfterOpening { get; set; }
public bool Done { get; set; }
}

Expand Down Expand Up @@ -75,6 +95,8 @@ protected override async Task OnStart(IMessageSession session, CancellationToken
scenarioContext.StorageSessionEqual =
completableSynchronizedStorageSession == synchronizedStorage;

scenarioContext.OutboxNotNullAfterOpening = scope.ServiceProvider.GetService<IOutboxStorage>() != null;

await completableSynchronizedStorageSession.CompleteAsync(cancellationToken);
}

Expand All @@ -89,4 +111,4 @@ protected override async Task OnStart(IMessageSession session, CancellationToken
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net472;net6.0;net7.0</TargetFrameworks>
Expand Down
22 changes: 17 additions & 5 deletions src/NServiceBus.Core/Reliability/Outbox/Outbox.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
namespace NServiceBus.Features
{
using ConsistencyGuarantees;
using System;
using ConsistencyGuarantees;
using NServiceBus.Settings;
using Transport;

/// <summary>
Expand All @@ -14,19 +15,24 @@ internal Outbox()
Defaults(s =>
{
s.SetDefault(TimeToKeepDeduplicationEntries, TimeSpan.FromDays(5));

s.EnableFeatureByDefault<SynchronizedStorage>();
});
Prerequisite(context => !context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"),

Prerequisite(context => ReceivingEnabled(context.Settings) || AllowUseWithoutReceiving(context.Settings),
"Outbox is only relevant for endpoints receiving messages.");

Prerequisite(c => !c.Settings.GetOrDefault<bool>("Endpoint.SendOnly")
&& c.Settings.GetRequiredTransactionModeForReceives() != TransportTransactionMode.None,
Prerequisite(context => !ReceivingEnabled(context.Settings) || TransactionsEnabled(context.Settings),
"Outbox isn't needed since the receive transactions have been turned off");

DependsOn<SynchronizedStorage>();
}

bool ReceivingEnabled(IReadOnlySettings settings) => !settings.GetOrDefault<bool>("Endpoint.SendOnly");

bool TransactionsEnabled(IReadOnlySettings settings) => settings.GetRequiredTransactionModeForReceives() != TransportTransactionMode.None;

bool AllowUseWithoutReceiving(IReadOnlySettings settings) => settings.GetOrDefault<bool>("Outbox.AllowUseWithoutReceiving");

/// <summary>
/// See <see cref="Feature.Setup" />.
/// </summary>
Expand All @@ -37,6 +43,11 @@ protected internal override void Setup(FeatureConfigurationContext context)
throw new Exception("The selected persistence doesn't have support for outbox storage. Select another persistence or disable the outbox feature using endpointConfiguration.DisableFeature<Outbox>()");
}

if (!ReceivingEnabled(context.Settings))
{
return;
}

// ForceBatchDispatchToBeIsolatedBehavior set the dispatch consistency to isolated which instructs
// the transport to not enlist the outgoing operation in the incoming message transaction. Unfortunately
// this is not enough. We cannot allow the transport to operate in SendsWithAtomicReceive because a transport
Expand All @@ -54,6 +65,7 @@ protected internal override void Setup(FeatureConfigurationContext context)
//note: in the future we should change the persister api to give us a "outbox factory" so that we can register it in DI here instead of relying on the persister to do it
context.Pipeline.Register("ForceBatchDispatchToBeIsolated", new ForceBatchDispatchToBeIsolatedBehavior(), "Makes sure that we dispatch straight to the transport so that we can safely set the outbox record to dispatched once the dispatch pipeline returns.");
}

internal const string TimeToKeepDeduplicationEntries = "Outbox.TimeToKeepDeduplicationEntries";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</ItemGroup>

<ItemGroup>
<Compile Include="../NServiceBus.AcceptanceTests/**/*.cs" Exclude="../NServiceBus.AcceptanceTests/obj/**/*.*" />
<Compile Include="../NServiceBus.AcceptanceTests/**/*.cs" Exclude="../NServiceBus.AcceptanceTests/obj/**/*.*;../NServiceBus.AcceptanceTests/Core/**/*.cs" />
andreasohlund marked this conversation as resolved.
Show resolved Hide resolved
<Compile Remove="../NServiceBus.AcceptanceTests/Core/TestSuiteConstraints.cs" />
</ItemGroup>

Expand Down