Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Durable In/Outbox behavior seems to keep messages in SQL forever. #689

Closed
mhartwig22 opened this issue Jan 15, 2024 · 7 comments · Fixed by #846
Closed

Durable In/Outbox behavior seems to keep messages in SQL forever. #689

mhartwig22 opened this issue Jan 15, 2024 · 7 comments · Fixed by #846

Comments

@mhartwig22
Copy link

mhartwig22 commented Jan 15, 2024

Hi,

Describe the bug
I'm currently testing wolverine 1.13.2 in conjunction with Kafka and SQL Server/EFC integration.
The message transport is working great and as expected so far, messages arrive in the corresponding Kafka topic and also get consumed from the topic correctly.
When I turn on the durable in- and outbox, it would seem that messages which have been published successfully, get persisted in the wolverine_outgoing_envelopes table in Sql server and never deleted.

To Reproduce
Wolverine startup configuration:

            //integration Wolverine with the DbContext directly
            builder.Services.AddDbContextWithWolverineIntegration<SampleDbContext>(x =>
            {
                x.UseSqlServer(connectionString);
            });
            //Configure wolverineFx for kafka support as well as durable in- and outbox
            builder.Host.UseWolverine(opts =>
            {
                var brokerString = "broker-hosts";
                opts.UseKafka(brokerString)
                    .ConfigureConsumers(o =>
                    {
                        o.GroupId = "ThisIsATest";
                    })
                    .ConfigureProducers(o =>
                    {
                        o.Partitioner = Partitioner.ConsistentRandom;
                    })
                    .ConfigureClient(o =>
                    {
                        o.SaslMechanism = SaslMechanism.ScramSha512;
                        o.SaslUsername = "username";
                        o.SaslPassword = "****";
                        o.SecurityProtocol = SecurityProtocol.SaslSsl;
                    })
                    ;
                opts.PublishMessage<TestEvent>()
                    .ToKafkaTopic("events")
                    .UseDurableOutbox()
                    ;
                    opts.ListenToKafkaTopic("events")
                    .MessageBatchSize(100)
                    .MaximumParallelMessages(1, ProcessingOrder.StrictOrdered)
                    .ListenerCount(1)
                          .UseDurableInbox()
                          ;
                opts.Discovery.IncludeAssembly(typeof(Program).Assembly); 
                opts.UseEntityFrameworkCoreTransactions();           
                opts.PersistMessagesWithSqlServer(connectionString);   
            });

Message publishing code (outside of a handler, MVC model)

  public class SomeClass
 {   
       private readonly IDbContextOutbox<SampleDbContext> _outbox;
       public SomeClass( IDbContextOutbox<SampleDbContext> outbox)
        {
            _outbox = outbox;
        }

        public async Task OnPost()
        {
                var testEvent = TestEvent.Create();
                await _outbox.SendAsync(testEvent); //or PublishAsync - same behavior
            
            await _outbox.SaveChangesAndFlushMessagesAsync();
        }
}

This is whats stored in SQL server wolverine_outgoing_envelopes table
image

Equally, when receiving messages with DurableInbox enabled (and handling them successfully), they seem to remain in the wolverine_incoming_envelopes table:
image

When I then stop the application gracefully, I get the below log entry:

info: Wolverine.SqlServer.Persistence.SqlServerMessageStore[0]
      Reassigned 100 incoming messages from 39 and endpoint at kafka://topic/events to any node in the durable inbox
info: Wolverine.Transports.ListeningAgent[0]

on app startup, these messages are all picked up again, reassigned and replayed (as duplicates). The replay (re-assignment) doesn't happen every time though and is harder to reproduce but my main question is why these messages remain persisted in the first place (which they always seem to do).

Expected behavior
Messages persisted in in- and outbox, but deleted once sent successfully to subscriber (or successfully received and handled).
Is there a time limit of when these messages get deleted? I waited 12 hours now and the messages are still there and I'm concerned about replays and database filling up in this scenario.

Environment
Local Development under VStudio (Win 10). Local SQL Server 15.0.2104. Single node/app.

@jeremydmiller
Copy link
Member

I honestly don't know, I haven't seen that behavior before. There's a little bit of a delay on the inbox to use completed messages in idempotency checks, but the outgoing messages should be deleted as soon as they are sent out.

Are you seeing any error messages? Are you doing anything like running under a debugger and shutting down the app through the debugger where it's not shutting down gracefully by chance?

@jeremydmiller
Copy link
Member

@mhartwig22 The pic up there of the inbox table is normal operation, see how the status says "Handled"? So that's fine. The outgoing behavior is a problem though. So back to my question, how are you shutting down your app? You might try running in the "Solo" mode in development:

https://wolverine.netlify.app/guide/durability/leadership-and-troubleshooting.html#solo-mode

@mhartwig22
Copy link
Author

Hi Jeremy,

Thanks for getting back on this.

So back to my question, how are you shutting down your app?

While running these tests under the debugger, I'm always trying to make sure I'm shutting down the application gracefully.

I just rerun a test with WolvFx DB recreated, brand new kafka topic - basically everything reset.
Starting the app, emitting messages. I see the messages delivered to Kafka fine (and consumed back by the app as well) and they are also in the outgoing table in SQL. Nothing in the logs to notice. All messages seem to remain in the outgoing table.
I waited a few minutes and shut down the app gracefully, this is what appeared in the logs:

info: Microsoft.Hosting.Lifetime[0]
      Application is shutting down...
info: Wolverine.SqlServer.Persistence.SqlServerMessageStore[0]
      Reassigned 10000 incoming messages from 1 and endpoint at kafka://topic/events to any node in the durable inbox
info: Wolverine.Transports.ListeningAgent[0]
      Stopped message listener at kafka://topic/events
info: Wolverine.Transports.ListeningAgent[0]
      Stopped message listener at dbcontrol://f4f4c149-deb7-445a-817d-7900502f16f3/

...\net8.0\EventDrivenArchExample.exe (process 34932) exited with code 0.

Upon restarting the app, the messages get re-delivered to both, kafka and the application.
There are several log entries like:

info: Wolverine.RDBMS.DurabilityAgent[0]
      Found recoverable outgoing messages in the outbox for kafka://topic/events
info: Wolverine.RDBMS.DurabilityAgent[0]
      Recovered 100 messages from outbox for destination kafka://topic/events while discarding 0 expired messages

This leads to all messages being replayed and when finished, the messages again remain in the outgoing table.
Waiting for a few minutes to then shutdown the app, ends up with this log output

      Application is shutting down...
info: Wolverine.SqlServer.Persistence.SqlServerMessageStore[0]
      Reassigned 3700 incoming messages from 2 and endpoint at kafka://topic/events to any node in the durable inbox
info: Wolverine.Transports.ListeningAgent[0]
      Stopped message listener at kafka://topic/events
info: Wolverine.Transports.ListeningAgent[0]
      Stopped message listener at dbcontrol://30042aa7-9d5c-4270-b2da-b39d0ca3ffc4/

Now its about incoming messages.

Starting the app yet again results in all outgoing re-processed yet again:

Wolverine.Runtime.Agents.NodeAgentController: Information: Successfully started agent wolverinedb://default/
Wolverine.Runtime.WolverineRuntime: Information: Successfully started agent wolverinedb://default/ on node 0be879b5-631a-4772-893a-dec386085e5a
Wolverine.Runtime.WolverineRuntime: Information: Releasing node ownership in the inbox/outbox from dormant node 2
...
...
...
Wolverine.RDBMS.DurabilityAgent: Information: Found recoverable outgoing messages in the outbox for kafka://topic/events
Wolverine.RDBMS.DurabilityAgent: Information: Recovered 100 messages from outbox for destination kafka://topic/events while discarding 0 expired messages

There are no errors to be seen in the log anywhere.
Like I said, it works all perfectly fine - apart from leaving the messages behind and replaying them.

I dont know if there is an issue with the housekeeping code somewhere not being executed or something. It's hard to tell as I dont know the internals of WolverineFx.
We're so close to use WolverineFx in production and I'm keen as I love the library. If there is anything I can do to assist with testing, getting to the bottom of this, let me know Jeremy.

@XeNz
Copy link

XeNz commented Apr 11, 2024

I have the same exact situation where the outbox does not get cleared, and the messages in the outbox keep getting pushed to the queue.

The configuration I am trying is very similar to OP's.

Program.cs:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddResourceSetupOnStartup();

var connectionString = builder.Configuration.GetConnectionString("SqlServer")!;

builder.Host.UseWolverine((context, opts) =>
{
    opts.PersistMessagesWithSqlServer(connectionString);
    opts.UseEntityFrameworkCoreTransactions();

    opts.ConfigureKafka("localhost:19092")
        .ConfigureClient(config =>
        {
            config.BootstrapServers = "localhost:19092";
            config.BrokerAddressFamily = BrokerAddressFamily.V4;
        });
 
    opts.PublishMessage<OrderCreated>().ToKafkaTopic(nameof(OrderCreated))
        .TelemetryEnabled(true);
         
    opts.ListenToKafkaTopic("OrderInventoryAllocated")
        .UseDurableInbox();
    
    opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
    opts.Policies.AutoApplyTransactions();
    if (context.HostingEnvironment.IsDevelopment())
    {
        opts.Durability.Mode = DurabilityMode.Solo;
    }
});
builder.Host.ApplyOaktonExtensions();

builder.Services.AddDbContextWithWolverineIntegration<OrderDbContext>(x => x.UseSqlServer(connectionString));

var app = builder.Build();

await app.Services.GetRequiredService<OrderDbContext>().Database.EnsureCreatedAsync();
app.MapWolverineEndpoints();
app.UseSwagger();
app.UseSwaggerUI();

await app.RunOaktonCommands(args);

Handler:

public class CreateOrderRequestHandler
{
    [WolverinePost("/order")]
    [Transactional]
    public async Task<OrderCreated> Handle(CreateOrder command,OrderDbContext dbContext, IMessageBus messageBus)
    {
        var items = command.Items
            .Select(item => new OrderLineItem(item.ProductId, item.Amount))
            .ToList();

        var order = new Order(DateTimeOffset.UtcNow, items);
        dbContext.Add(order);

        var orderCreated = new OrderCreated(order.Id);
        await messageBus.PublishAsync(orderCreated);
        return orderCreated;
    }
}

Sending one request to my handler results in an infinite processing of the outboxmessage:

info: Wolverine.RDBMS.DurabilityAgent[0]
      Found recoverable outgoing messages in the outbox for kafka://topic/OrderCreated
info: Wolverine.RDBMS.DurabilityAgent[0]
      Recovered 1 messages from outbox for destination kafka://topic/OrderCreated while discarding 0 expired messages
info: Wolverine.Runtime.WolverineRuntime[0]
      Releasing node ownership in the inbox/outbox from dormant node 1
info: Wolverine.RDBMS.DurabilityAgent[0]
      Found recoverable outgoing messages in the outbox for kafka://topic/OrderCreated
info: Wolverine.RDBMS.DurabilityAgent[0]
      Recovered 1 messages from outbox for destination kafka://topic/OrderCreated while discarding 0 expired messages

Environment:

  • Local Development under Jetbrains Rider 2024.1.
  • Running azure-sql-edge:latest and redpanda:v23.3.11 (as drop-in for Kafka) docker containers
  • PackageReferences:
        <PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.1" />
        <PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.1">
            <PrivateAssets>all</PrivateAssets>
            <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
        </PackageReference>
        <PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.1" />
        <PackageReference Include="Swashbuckle.AspNetCore" Version="6.5.0"/>
        <PackageReference Include="WolverineFx" Version="2.3.0"/>
        <PackageReference Include="WolverineFx.EntityFrameworkCore" Version="2.3.0"/>
        <PackageReference Include="WolverineFx.Http" Version="2.3.0"/>
        <PackageReference Include="WolverineFx.SqlServer" Version="2.3.0"/>
        <PackageReference Include="WolverineFx.Kafka" Version="2.3.0"/>

@ilya-edrets
Copy link

ilya-edrets commented Apr 16, 2024

I run into the same problem with the outbox and spent some time troubleshooting. The reason for the problem is the KafkaSenderProtocol class. The SendBatchAsync method takes an ISenderCallback callback, but it doesn't notify the caller about the successful send.

If the service is restarted, Wolverine begins to resend all stuck messages.

Unfortunately, Wolverine doesn't provide any options to replace KafkaSenderProtocol with a custom implementation (via DI, for example). So, I tried a dirty hack to replace the method via Harmony.

[HarmonyPatch(typeof(KafkaSenderProtocol), nameof(KafkaSenderProtocol.SendBatchAsync))]
#pragma warning disable
public class Patch
{
    public static void Postfix(ref Task __result, ISenderCallback callback, OutgoingMessageBatch batch)
    {
        __result.ContinueWith(x => callback.MarkSuccessfulAsync(batch));
    }
}

public static class Patcher
{
    [ModuleInitializer]
    public static void Patch()
    {
        var harmony = new Harmony("wolverine-outbox-fix");
        var assembly = Assembly.GetExecutingAssembly();
        harmony.PatchAll(assembly);
    }
}

I'm going to create a merge request with the fix.

@jeremydmiller
Copy link
Member

Pfft, @ilya-edrets got it. I'm pushing that change in Wolverine 2.6. Running the tests right now. Feeling more than a little bit embarrassed over this one...

@Djoums
Copy link

Djoums commented Apr 30, 2024

Pfft, @ilya-edrets got it. I'm pushing that change in Wolverine 2.6. Running the tests right now. Feeling more than a little bit embarrassed over this one...

Can we get this fix in the 1.20.x version too plz ? I have to use .Net 6 on my project, and unfortunately I can't use the proposed Harmony fix as it makes the application generate InvalidProgramExceptions when published in production.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants