Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When self hosting messages are lost after IStartableBus.Shutdown() followed with IStartableBus.Start() #1885

Closed
chrisbednarski opened this issue Jan 6, 2014 · 9 comments
Assignees
Labels
Milestone

Comments

@chrisbednarski
Copy link
Contributor

When self hosting, all messages are lost after restarting the bus (ie calling Shutdown followed by Start on IStartableBus.

Messages are removed from the queue but no handlers are called.

Workaround

private static void SubscribeToEvent(IStartableBus bus, string eventName, string methodName)
{
    var unicastBus = (UnicastBus)bus;
    var transportType = unicastBus.Transport.GetType();
    var eventInfo = transportType.GetEvent(eventName);

    var handler = unicastBus.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
    var d = Delegate.CreateDelegate(eventInfo.EventHandlerType, bus, handler);

    object[] methodArgs = { d };
    var removeHandler = eventInfo.GetRemoveMethod();
    removeHandler.Invoke(unicastBus.Transport, methodArgs);

    var addHandler = eventInfo.GetAddMethod();
    addHandler.Invoke(unicastBus.Transport, methodArgs);
}

StartableBus.Start(() =>
    {
        SubscribeToEvent(StartableBus, "StartedMessageProcessing", "TransportStartedMessageProcessing");
        SubscribeToEvent(StartableBus, "TransportMessageReceived", "TransportMessageReceived");
        SubscribeToEvent(StartableBus, "FinishedMessageProcessing", "TransportFinishedMessageProcessing");
        SubscribeToEvent(StartableBus, "FailedMessageProcessing", "TransportFailedMessageProcessing");
    });

Reproduced

Reproduced on latest MSMQ gateway sample by modifying SiteB.Program

namespace SiteB
{
    using System;
    using NServiceBus;

    class Program
    {
        static void Main()
        {
            var bus = Configure.With()
                .DefaultBuilder()
                .UseTransport<Msmq>()
                .UnicastBus()
                .FileShareDataBus(".\\databus")
                .UseInMemoryTimeoutPersister()
                .RunGateway() //this line configures the gateway

                    // This tells NServiceBus to use memory to persist & deduplicate messages arriving from NServiceBus v3.X.
                    // If omitted, RavenDB will be used by default. Required for backwards compatibility
                    .UseInMemoryGatewayPersister()

                    // This tells NServiceBus to use memory to deduplicate message ids arriving from NServiceBus v4.X.
                    // If omitted, RavenDB will be used by default
                    .UseInMemoryGatewayDeduplication()

                // Uncomment lines below to use NHibernate persister & deduplication for gateway messages
                // (create a new database called gateway in \SQLEXPRESS - see App.config for connection strings and other settings)
                //    .UseNHibernateGatewayPersister()
                //    .UseNHibernateGatewayDeduplication()

                .CreateBus();

            bus.Start();
            bus.Shutdown(); //  <= added this and line below
            bus.Start();

            Console.WriteLine("Waiting for price updates from the headquarter - press any key to exit");

            Console.ReadLine();
        }
    }
}

Haven't tested on any other transports but I think this will be the case across all transports.

PipelineFactory.InvokeReceivePhysicalMessagePipeline(); is never called after the restart

@chrisbednarski
Copy link
Contributor Author

Browsing through the UnicastBus.cs, InnerShutdown unsubscribes from TransportMessageReceived, but the event is only subscribed to when the transport is first assigned, not during the Start process.

@andreasohlund
Copy link
Member

Restarting endpoints has never really been a supported scenario so we're contemplating going back to a dispose instead of shutdown. That said this is obviously something that we need to fix!

What's your use case for starting/stopping/starting the endpoint?

Sent from my iPhone

On 6 jan 2014, at 08:00, chrisbednarski notifications@github.com wrote:

Browsing through the UnicastBus.cs, InnerShutdown unsubscribes from TransportMessageReceived, but the event is only subscribed to when the transport is first assigned, not during the start process.


Reply to this email directly or view it on GitHub.

@chrisbednarski
Copy link
Contributor Author

Three words, legacy system integration. Having the bus turned off allows us to better manage risk.

We need to reliably turn on/off the bus in a host process while running certain tasks that require exclusive access to a legacy database (eg. to operate on raw database files). These tasks normally run as an end-of-day process, but may also be invoked on demand. It's a critical requirement while we're transitioning to a sane database.

@chrisbednarski
Copy link
Contributor Author

Any idea which release this will go into?

@ghost ghost assigned johnsimons Jan 7, 2014
@chrisbednarski
Copy link
Contributor Author

Thanks @johnsimons

Workaround:

private static void SubscribeToEvent(IStartableBus bus, string eventName, string methodName)
{
    var unicastBus = (UnicastBus)bus;
    var transportType = unicastBus.Transport.GetType();
    var eventInfo = transportType.GetEvent(eventName);

    var handler = unicastBus.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
    var d = Delegate.CreateDelegate(eventInfo.EventHandlerType, bus, handler);

    object[] methodArgs = { d };
    var removeHandler = eventInfo.GetRemoveMethod();
    removeHandler.Invoke(unicastBus.Transport, methodArgs);

    var addHandler = eventInfo.GetAddMethod();
    addHandler.Invoke(unicastBus.Transport, methodArgs);
}

StartableBus.Start(() =>
    {
        SubscribeToEvent(StartableBus, "StartedMessageProcessing", "TransportStartedMessageProcessing");
        SubscribeToEvent(StartableBus, "TransportMessageReceived", "TransportMessageReceived");
        SubscribeToEvent(StartableBus, "FinishedMessageProcessing", "TransportFinishedMessageProcessing");
        SubscribeToEvent(StartableBus, "FailedMessageProcessing", "TransportFailedMessageProcessing");
    });

@johnsimons
Copy link
Member

Hi @chrisbednarski, are you able to test my fix, and report back?

@chrisbednarski
Copy link
Contributor Author

@johnsimons, my hacky workaround and your fix result in pretty much the same behavior. Messages are no longer lost. This is great. However, I've found other major problems today!

If a saga message handler requests any timeouts, the message being handled is rolled back and always ends up in error q. (nb. I'm pretty sure this will happen when a regular message handler requests timeouts as well)

TimeoutPushed.BeginInvoke throws System.ArgumentException saying The delegate must have only one target. inside DefaultTimeoutManager.PushTimeout()

This is because TimeoutPersisterReceiver.Stop does not unsubscribe from TimeoutManager.TimeoutPushed event

Stack trace below:

NServiceBus.Core.dll!NServiceBus.Timeout.Core.DefaultTimeoutManager.PushTimeout(NServiceBus.Timeout.Core.TimeoutData timeout) Line 42 + 0x8a bytes  C#  
NServiceBus.Core.dll!NServiceBus.Timeout.Hosting.Windows.TimeoutMessageProcessor.HandleInternal(NServiceBus.TransportMessage message) Line 121  C#  
NServiceBus.Core.dll!NServiceBus.Timeout.Hosting.Windows.TimeoutMessageProcessor.Handle(NServiceBus.TransportMessage message) Line 52   C#
NServiceBus.Core.dll!NServiceBus.Satellites.SatelliteLauncher.HandleMessageReceived(object sender, NServiceBus.Unicast.Transport.TransportMessageReceivedEventArgs e, NServiceBus.Satellites.ISatellite satellite) Line 92 + 0x20 bytes C#
NServiceBus.Core.dll!NServiceBus.Satellites.SatelliteLauncher.StartSatellite.AnonymousMethod__7(object o, NServiceBus.Unicast.Transport.TransportMessageReceivedEventArgs e) Line 107 + 0x37 bytes  C#  
NServiceBus.Core.dll!NServiceBus.Unicast.Transport.TransportReceiver.OnTransportMessageReceived(NServiceBus.TransportMessage msg) Line 453 + 0x33 bytes C#  
NServiceBus.Core.dll!NServiceBus.Unicast.Transport.TransportReceiver.ProcessMessage(NServiceBus.TransportMessage message) Line 351 + 0xc bytes  C#  
NServiceBus.Core.dll!NServiceBus.Unicast.Transport.TransportReceiver.TryProcess(NServiceBus.TransportMessage message) Line 260  C#  
NServiceBus.Core.dll!NServiceBus.Transports.Msmq.MsmqDequeueStrategy.ProcessMessage(NServiceBus.TransportMessage message) Line 288 + 0x11 bytes C#  
NServiceBus.Core.dll!NServiceBus.Transports.Msmq.MsmqDequeueStrategy.Action() Line 239 + 0xc bytes  C#
 mscorlib.dll!System.Threading.Tasks.Task.InnerInvoke() + 0x49 bytes    
 mscorlib.dll!System.Threading.Tasks.Task.Execute() + 0x2e bytes    
// lines removed for brevity
 System.Threading.ContextCallback callback, object state) + 0x41 bytes  
 mscorlib.dll!System.Threading.ThreadHelper.ThreadStart(object obj) + 0x4e bytes    
    [Native to Managed Transition]  
 kernel32.dll!@BaseThreadInitThunk@12()  + 0x12 bytes   
 ntdll.dll!___RtlUserThreadStart@8()  + 0x27 bytes  
 ntdll.dll!__RtlUserThreadStart@8()  + 0x1b bytes   

@johnsimons
Copy link
Member

I have just fixed #1889 but I couldn't tell you if there will be other ones!

@chrisbednarski
Copy link
Contributor Author

Understand @johnsimons . Thanks.

@SimonCropp SimonCropp added the Bug label Feb 11, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants