Skip to content

How to implement NetMQ(ZeroMQ) solution properly? #1118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
BondByteBlaster opened this issue Mar 1, 2025 · 0 comments
Open

How to implement NetMQ(ZeroMQ) solution properly? #1118

BondByteBlaster opened this issue Mar 1, 2025 · 0 comments

Comments

@BondByteBlaster
Copy link

I'm using ZeroMQ (NetMQ, latest version) in a Client (WinForms) / Service (WCF) setup. The current implementation has multiple thread loops for message handling and a heartbeat mechanism, but it's messy and sometimes loses connection, likely due to a thread lock (not reproducible in tests).

I've replaced some thread loops with NetMQPoller, using NetMQQueue.ReceivedReady instead of polling loops, and added NetMQMonitor to detect lost connections. This appears to improve stability, and resource cleanup is better with using statements and NetMQConfig.Cleanup(false), preventing ports from getting locked beyond TIME_WAIT (2–4 min).

However, this hasn't been "tested" in production yet, so there may still be issues.

Below is the Connect method for a client-side socket:

public Task EstablishConnection(string endpoint, CancellationToken token)
{
    var connectionTask = new TaskCompletionSource<bool>();
    _logger.Log(LogState.Info, "Initializing request socket.");
    
    Task.Run(() =>
    {
        try
        {
            _isShuttingDown = false;
            _shutdownSignal = new TaskCompletionSource<bool>();
            Thread.CurrentThread.Name = this.GetType().Name;
            
            using (_messageQueue = new NetMQQueue<IRequestMessage>())
            using (var socket = new RequestSocket(endpoint))
            using (_eventPoller = new NetMQPoller { _messageQueue })
            using (var eventMonitor = new NetMQMonitor(socket, $"inproc://monitor.client.{Guid.NewGuid()}", SocketEvents.Disconnected | SocketEvents.BindFailed | SocketEvents.AcceptFailed))
            {
                _messageQueue.ReceiveReady += (s, e) =>
                {
                    ProcessClientRequests(socket);
                };
                
                eventMonitor.EventReceived += (s, e) =>
                {
                    string errorMessage = $"RequestSocket encountered an issue: {e.SocketEvent.ToString()}";
                    _logger.LogException(errorMessage, new Exception(errorMessage));
                    _eventPoller.Stop();
                };

                connectionTask.TrySetResult(true);
                _isRunning = true;
                
                using (var cancelRegistration = token.Register(() => _eventPoller.Stop()))
                {
                    _eventPoller.Run();
                }
                
                lock (_shutdownLock)
                {
                    _isShuttingDown = true;
                }
            }
        }
        catch (Exception ex)
        {
            _logger?.LogException("Client request processing failed", ex, fatal: true);
        }
        finally
        {
            _isRunning = false;
            _shutdownSignal.TrySetResult(true);
            
            if (!connectionTask.Task.IsCompleted)
                connectionTask.SetResult(false);
            
            _logger.Log(LogState.Info, "Request socket shutting down.");
        }
    }, token);
    NetMQConfig.Cleanup(false)
    return connectionTask.Task;
}

public void ProcessClientRequests(RequestSocket socket)
{
    ProcessQueuedMessages(socket);
    LoopCompleted?.Invoke();
}

private void ProcessQueuedMessages(RequestSocket clientSocket)
{
    try
    {
        while (_messageQueue.TryDequeue(out IRequestMessage message, TimeSpan.Zero))
        {
            try
            {
                _logger?.Log(LogState.Info, $"Processing queued request: {message.Request.GetType()}, Session ID: {message.SessionId}, Timeout: {message.Timeout}");
                
                var response = clientSocket.SendRequest(message.Request, message.SessionId, message.Timeout);
                message.Complete(response);
                
                LastSentMessageTimestamp = DateTime.Now;
                Thread.Sleep(10); // Prevents excessive CPU usage
            }
            catch (Exception ex)
            {
                _logger?.LogException("Error while sending request.", ex, fatal: true);
                message.Fail(ex);
            }
        }
    }
    catch (Exception ex)
    {
        _logger?.LogException("Queued message processing failure", ex, fatal: true);
    }
}

Here is an example of the client subscriberSocket that implements the netMQTimer to check subscribers:

public Task<bool> EstablishConnection(string endpoint, Guid serverIdentifier, Guid sessionIdentifier, CancellationToken cancellationToken, TimeSpan connectionTimeout, ClientSessionInfo previousSession)
{
    _endpoint = endpoint;
    _activeServerId = serverIdentifier;
    _shutdownSignal = new TaskCompletionSource<bool>();
    var connectionTask = new TaskCompletionSource<bool>();

    Task.Run(() =>
    {
        try
        {
            _isRunning = true;
            using (_subscriberSocket = new SubscriberSocket(endpoint))
            {
                _subscriberSocket.Subscribe(TopicConstants.Heartbeat);
                _subscriberSocket.Subscribe(sessionIdentifier.ToString());
                var receivedMessage = _subscriberSocket.RecievePubMessage(connectionTimeout);
                if (receivedMessage == null)
                {
                    connectionTask.SetResult(false);
                    return;
                }
                LastReceivedMessageTimestamp = DateTime.Now;
                connectionTask.SetResult(true);
                Task.Run(() => HandleCallbacks(cancellationToken), cancellationToken);
                RestorePreviousSession(previousSession);
                
                using (_eventPoller = new NetMQPoller { _subscriberSocket })
                using (var monitor = new NetMQMonitor(_subscriberSocket, $"inproc://monitor.client.{Guid.NewGuid()}", SocketEvents.Disconnected | SocketEvents.BindFailed | SocketEvents.AcceptFailed))
                {
                    _subscriberSocket.ReceiveReady += (s, e) =>
                    {
                        var message = _subscriberSocket.RecievePubMessage(TimeSpan.Zero);
                        ProcessClientMessage(message);
                    };
                    
                    monitor.EventReceived += (s, e) =>
                    {
                        string errorMessage = $"SubscriberSocket encountered an issue: {e.SocketEvent.ToString()}";
                        _logger.LogException(errorMessage, new Exception(errorMessage));
                        _eventPoller.Stop();
                    };
                    
                    monitor.AttachToPoller(_eventPoller);
                    
                    var timer = new NetMQTimer(100);
                    timer.Elapsed += (s, e) =>
                    {
                        try
                        {
                            if (_subscriberSocket != null)
                                ManageSubscriptionRequests(_subscriberSocket);
                        }
                        catch (Exception ex)
                        {
                            _logger.LogException("Subscription request handling failed", ex, fatal: false);
                        }
                    };
                    _eventPoller.Add(timer);
                    
                    using (var registration = cancellationToken.Register(() =>
                    {
                        if (_eventPoller != null && !_eventPoller.IsDisposed)
                        {
                            _eventPoller.Stop();
                        }
                    }))
                    {
                        _eventPoller.Run();
                    }
                }
            }
        }
        catch (Exception ex)
        {
            _logger.LogException("SubscriberSocketService: Connection failure", ex);
        }
        finally
        {
            _isRunning = false;
            _shutdownSignal.SetResult(true);
            connectionTask.TrySetResult(false);
        }
    }, cancellationToken);

    return connectionTask.Task;
}

The last example show the publishersocket of the server with a heartbeat implementation, maybe this can be replaced with the NetMQMonitor?

public override void Execute(NetMQSocket socket)
{
    _lastHeartbeatSent = DateTime.Now;

    try
    {
        _logger?.Log(LogState.Info, "Initializing event poller.");
        using (_eventPoller = new NetMQPoller())
        {
            _logger?.Log(LogState.Info, "Setting up message queue.");
            using (_messageQueue = new NetMQQueue<PublisherMessage>())
            {
                _messageQueue.ReceiveReady += (s, e) => HandleMessageQueue(socket);
                _eventPoller.Add(_messageQueue);

                var heartbeatTimer = new NetMQTimer(_config.Options.Publisher.HeartbeatInterval);
                heartbeatTimer.Elapsed += (s, e) =>
                {
                    if (DateTime.Now - _lastHeartbeatSent >= _config.Options.Publisher.HeartbeatInterval)
                    {
                        SendHeartbeat();
                    }
                };
                _eventPoller.Add(heartbeatTimer);

                try
                {
                    _logger?.Log(LogState.Info, "Starting event poller.");
                    _eventPoller.Run();
                }
                finally
                {
                    _shutdownSignal.SetResult(true);
                }
            }
        }
    }
    catch(Exception ex)
    {
        _logger?.LogException("Execution error in event loop", ex);
        throw;
    }
}

Is this the right approach? Can NetMQMonitor reliably detect lost connections, or is a manual heartbeat still needed for stability? Ensuring an automatic reconnection is critical if a disconnect occurs. I know running both sockets under one poller (client/server) is likely better, but I don’t have time for major code changes right now.

Note, this is with .net framework 4.8.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant