You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm using ZeroMQ (NetMQ, latest version) in a Client (WinForms) / Service (WCF) setup. The current implementation has multiple thread loops for message handling and a heartbeat mechanism, but it's messy and sometimes loses connection, likely due to a thread lock (not reproducible in tests).
I've replaced some thread loops with NetMQPoller, using NetMQQueue.ReceivedReady instead of polling loops, and added NetMQMonitor to detect lost connections. This appears to improve stability, and resource cleanup is better with using statements and NetMQConfig.Cleanup(false), preventing ports from getting locked beyond TIME_WAIT (2–4 min).
However, this hasn't been "tested" in production yet, so there may still be issues.
Below is the Connect method for a client-side socket:
Here is an example of the client subscriberSocket that implements the netMQTimer to check subscribers:
publicTask<bool>EstablishConnection(stringendpoint,GuidserverIdentifier,GuidsessionIdentifier,CancellationTokencancellationToken,TimeSpanconnectionTimeout,ClientSessionInfopreviousSession){_endpoint=endpoint;_activeServerId=serverIdentifier;_shutdownSignal=newTaskCompletionSource<bool>();varconnectionTask=newTaskCompletionSource<bool>();Task.Run(()=>{try{_isRunning=true;using(_subscriberSocket=newSubscriberSocket(endpoint)){_subscriberSocket.Subscribe(TopicConstants.Heartbeat);_subscriberSocket.Subscribe(sessionIdentifier.ToString());varreceivedMessage=_subscriberSocket.RecievePubMessage(connectionTimeout);if(receivedMessage==null){connectionTask.SetResult(false);return;}LastReceivedMessageTimestamp=DateTime.Now;connectionTask.SetResult(true);Task.Run(()=>HandleCallbacks(cancellationToken),cancellationToken);RestorePreviousSession(previousSession);using(_eventPoller=newNetMQPoller{_subscriberSocket})using(varmonitor=newNetMQMonitor(_subscriberSocket,$"inproc://monitor.client.{Guid.NewGuid()}",SocketEvents.Disconnected|SocketEvents.BindFailed|SocketEvents.AcceptFailed)){_subscriberSocket.ReceiveReady+=(s,e)=>{varmessage=_subscriberSocket.RecievePubMessage(TimeSpan.Zero);ProcessClientMessage(message);};monitor.EventReceived+=(s,e)=>{stringerrorMessage=$"SubscriberSocket encountered an issue: {e.SocketEvent.ToString()}";_logger.LogException(errorMessage,newException(errorMessage));_eventPoller.Stop();};monitor.AttachToPoller(_eventPoller);vartimer=newNetMQTimer(100);timer.Elapsed+=(s,e)=>{try{if(_subscriberSocket!=null)ManageSubscriptionRequests(_subscriberSocket);}catch(Exceptionex){_logger.LogException("Subscription request handling failed",ex,fatal:false);}};_eventPoller.Add(timer);using(varregistration=cancellationToken.Register(()=>{if(_eventPoller!=null&&!_eventPoller.IsDisposed){_eventPoller.Stop();}})){_eventPoller.Run();}}}}catch(Exceptionex){_logger.LogException("SubscriberSocketService: Connection failure",ex);}finally{_isRunning=false;_shutdownSignal.SetResult(true);connectionTask.TrySetResult(false);}},cancellationToken);returnconnectionTask.Task;}
The last example show the publishersocket of the server with a heartbeat implementation, maybe this can be replaced with the NetMQMonitor?
publicoverridevoidExecute(NetMQSocketsocket){_lastHeartbeatSent=DateTime.Now;try{_logger?.Log(LogState.Info,"Initializing event poller.");using(_eventPoller=newNetMQPoller()){_logger?.Log(LogState.Info,"Setting up message queue.");using(_messageQueue=newNetMQQueue<PublisherMessage>()){_messageQueue.ReceiveReady+=(s,e)=>HandleMessageQueue(socket);_eventPoller.Add(_messageQueue);varheartbeatTimer=newNetMQTimer(_config.Options.Publisher.HeartbeatInterval);heartbeatTimer.Elapsed+=(s,e)=>{if(DateTime.Now-_lastHeartbeatSent>=_config.Options.Publisher.HeartbeatInterval){SendHeartbeat();}};_eventPoller.Add(heartbeatTimer);try{_logger?.Log(LogState.Info,"Starting event poller.");_eventPoller.Run();}finally{_shutdownSignal.SetResult(true);}}}}catch(Exceptionex){_logger?.LogException("Execution error in event loop",ex);throw;}}
Is this the right approach? Can NetMQMonitor reliably detect lost connections, or is a manual heartbeat still needed for stability? Ensuring an automatic reconnection is critical if a disconnect occurs. I know running both sockets under one poller (client/server) is likely better, but I don’t have time for major code changes right now.
Note, this is with .net framework 4.8.
The text was updated successfully, but these errors were encountered:
I'm using ZeroMQ (NetMQ, latest version) in a Client (WinForms) / Service (WCF) setup. The current implementation has multiple thread loops for message handling and a heartbeat mechanism, but it's messy and sometimes loses connection, likely due to a thread lock (not reproducible in tests).
I've replaced some thread loops with NetMQPoller, using NetMQQueue.ReceivedReady instead of polling loops, and added NetMQMonitor to detect lost connections. This appears to improve stability, and resource cleanup is better with using statements and NetMQConfig.Cleanup(false), preventing ports from getting locked beyond TIME_WAIT (2–4 min).
However, this hasn't been "tested" in production yet, so there may still be issues.
Below is the Connect method for a client-side socket:
Here is an example of the client subscriberSocket that implements the netMQTimer to check subscribers:
The last example show the publishersocket of the server with a heartbeat implementation, maybe this can be replaced with the NetMQMonitor?
Is this the right approach? Can NetMQMonitor reliably detect lost connections, or is a manual heartbeat still needed for stability? Ensuring an automatic reconnection is critical if a disconnect occurs. I know running both sockets under one poller (client/server) is likely better, but I don’t have time for major code changes right now.
Note, this is with .net framework 4.8.
The text was updated successfully, but these errors were encountered: