Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Published events are lost when published in a separate thread #5121

Closed
ngallegos opened this issue Feb 28, 2018 · 5 comments
Closed

Published events are lost when published in a separate thread #5121

ngallegos opened this issue Feb 28, 2018 · 5 comments

Comments

@ngallegos
Copy link

I've been working on getting long running messages working with NServiceBus on an Azure transport. Based off this document, I thought I could get away with firing off the long process in a separate thread, marking the event handler task as complete and then listening for custom OperationStarted or OperationComplete events. I noticed the OperationComplete event is not received by my handlers most cases. In fact, the only time it is received is when I publish it immediately after the OperationStarted event is published. Any actual processing in between somehow prevents the completion event from being received. Here is my code:

Abstract class used for long running messages

public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class
{
    protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>();

    public Task Handle(TMessage message, IMessageHandlerContext context)
    {
        var opStarted = new OperationStarted
        {
            OperationID = Guid.NewGuid(),
            OperationType = typeof(TMessage).FullName
        };
        var errors = new List<string>();
        // Fire off the long running task in a separate thread
        Task.Run(() =>
            {
                try
                {
                    _logger.Info($"Operation Started: {JsonConvert.SerializeObject(opStarted)}");
                    context.Publish(opStarted);
                    ProcessMessage(message, context);
                }
                catch (Exception ex)
                {
                    errors.Add(ex.Message);
                }
                finally
                {
                    var opComplete = new OperationComplete
                    {
                        OperationType = typeof(TMessage).FullName,
                        OperationID = opStarted.OperationID,
                        Errors = errors
                    };

                    context.Publish(opComplete);

                    _logger.Info($"Operation Complete: {JsonConvert.SerializeObject(opComplete)}");
                }
            });

        return Task.CompletedTask;
    }

    protected abstract void ProcessMessage(TMessage message, IMessageHandlerContext context);
}

Test Implementation

public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand>
{
    protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context)
    {
        // If I remove this, or lessen it to something like 200 milliseconds, the 
        // OperationComplete event gets handled
        Thread.Sleep(1000);
    }
}

Operation Events

public sealed class OperationComplete : IEvent
{
    public Guid OperationID { get; set; }
    public string OperationType { get; set; }
    public bool Success => !Errors?.Any() ?? true;
    public List<string> Errors { get; set; } = new List<string>();
    public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow;
}

public sealed class OperationStarted : IEvent
{
    public Guid OperationID { get; set; }
    public string OperationType { get; set; }
    public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow;
}

Handlers

public class OperationHandler : IHandleMessages<OperationStarted>
, IHandleMessages<OperationComplete>
{
    static ILog logger = LogManager.GetLogger<OperationHandler>();

    public Task Handle(OperationStarted message, IMessageHandlerContext context)
    {
        return PrintJsonMessage(message);
    }

    public Task Handle(OperationComplete message, IMessageHandlerContext context)
    {
        // This is not hit if ProcessMessage takes too long
        return PrintJsonMessage(message);
    }

    private Task PrintJsonMessage<T>(T message) where T : class
    {
        var msgObj = new
        {
            Message = typeof(T).Name,
            Data = message
        };
        logger.Info(JsonConvert.SerializeObject(msgObj, Formatting.Indented));
        return Task.CompletedTask;
    }

}

I'm certain that the context.Publish() calls are being hit because the _logger.Info() calls are printing messages to my test console. I've also verified they are hit with breakpoints. In my testing, anything that runs longer than 500 milliseconds prevents the handling of the OperationComplete event.

If anyone can offer suggestions as to why the OperationComplete event is not hitting the handler when any significant amount of time has passed in the ProcessMessage implementation, I'd be extremely grateful to hear them. Thanks!

@bording
Copy link
Member

bording commented Feb 28, 2018

@ngallegos What you're trying to do isn't going to work. When using the IMessageHandlerContext instance to send/publish messages from inside a handler, all of those messages have to be sent before the handler method returns.

You are calling Task.Run without awaiting the task, so your handler can complete execution before the code inside the Task.Run completes. This introduces a race condition where only the Publish calls that finish before your handler returns will actually be sent.

The sample you've linked to shows an approach that can work for handling long-running operations.

@ngallegos
Copy link
Author

Ah okay. Thanks @bording! The problem with waiting for the task to complete is that it brings me back to the issue of a long running job inside a message handler. If the job runs for too long, the Azure queue will release the message and it will be processed again. I was hoping to avoid the exact solution in the sample because it looks as though I'd need to implement a separate queuing/processing system to handle these jobs. I've been using NServiceBus to migrate away from our existing queuing/processing system and was hoping that long-running jobs could migrate too. Sounds like I may be out of luck there.

I did see something about message lock renewal, but the document suggests it should be avoided. Do you know why that is?

@SeanFeldman
Copy link
Contributor

SeanFeldman commented Mar 1, 2018

AutoRenewTimeout is not a guarateed operation. It's initiated by client and as such can fail.

@yvesgoeleven
Copy link
Contributor

yvesgoeleven commented Mar 1, 2018

@ngallegos you should use the timeout feature on a watchdog saga to check the completion of the spun off task. That is guaranteed to be invoked. That's how the sample does it as well.

@ngallegos
Copy link
Author

Great, thank you @SeanFeldman and @yvesgoeleven! I think the saga timeout will work nicely. I must have missed that in the sample. Thanks again for your time and responses.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants