Permalink
Browse files

PT38951557 - sipXrls does not shutdown when signaled with SIGTERM or …

…SIGINT

There were several changes:
StateQueueNotification.h
  - reorganized the stop sequence;
StateQueueClient.h
  - zmqSocket was created but was not used nor closed when type was publisher. This was leading to zmqContext::term to be blocked forever thus leading to rls shutdown being blocked forever.
  The zmqSocket should be created only when type is not publisher.
  - There was a corruption in the destructor of BlockingTcpClient because it was called twice for the same object. This was happening because these objects where created as regular pointers
but where used in a vector as regular pointers and in a queue as shared pointers. That is why the destructor was called twice. The _clientPointers vector was changed to hold shared
pointers instead of regular pointers.
  • Loading branch information...
1 parent f1093d5 commit 15e724a82e9b5ca55013a858573e3eda0d9516c0 @dtacalau dtacalau committed Nov 7, 2012
Showing with 41 additions and 32 deletions.
  1. +39 −30 sipXsqa/include/sqa/StateQueueClient.h
  2. +2 −2 sipXsqa/include/sqa/StateQueueNotification.h
@@ -29,7 +29,6 @@
#include <boost/lexical_cast.hpp>
#include "ServiceOptions.h"
-
#define SQA_LINGER_TIME_MILLIS 5000
#define SQA_TERMINATE_STRING "__TERMINATE__"
#define SQA_CONN_MAX_READ_BUFF_SIZE 65536
@@ -72,7 +71,11 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
~BlockingTcpClient()
{
- delete _pSocket;
+ if (_pSocket)
+ {
+ delete _pSocket;
+ _pSocket = 0;
+ }
}
void setReadTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds)
@@ -355,13 +358,13 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
bool _terminate;
boost::thread _keepAliveThread;
zmq::context_t* _zmqContext;
- zmq::socket_t _zmqSocket;
+ zmq::socket_t* _zmqSocket;
boost::thread* _pEventThread;
std::string _zmqEventId;
std::string _applicationId;
typedef BlockingQueue<std::string> EventQueue;
EventQueue _eventQueue;
- std::vector<BlockingTcpClient*> _clientPointers;
+ std::vector<BlockingTcpClient::Ptr> _clientPointers;
int _expires;
int _subscriptionExpires;
int _backoffCount;
@@ -389,7 +392,7 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
_terminate(false),
_keepAliveThread(boost::bind(&StateQueueClient::keepAliveLoop, this)),
_zmqContext(new zmq::context_t(1)),
- _zmqSocket(*_zmqContext,ZMQ_SUB),
+ _zmqSocket(0),
_pEventThread(0),
_applicationId(applicationId),
_eventQueue(1000),
@@ -399,8 +402,13 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
_refreshSignin(false),
_currentSigninTick(-1)
{
- int linger = SQA_LINGER_TIME_MILLIS; // milliseconds
- _zmqSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(int));
+ if (_type != Publisher)
+ {
+ _zmqSocket = new zmq::socket_t(*_zmqContext,ZMQ_SUB);
+ int linger = SQA_LINGER_TIME_MILLIS; // milliseconds
+ _zmqSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(int));
+ }
+
for (std::size_t i = 0; i < _poolSize; i++)
{
BlockingTcpClient* pClient = new BlockingTcpClient(_ioService, readTimeout, writeTimeout, i == 0 ? SQA_KEY_ALPHA : SQA_KEY_DEFAULT );
@@ -409,8 +417,8 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
if (_localAddress.empty())
_localAddress = pClient->getLocalAddress();
- _clientPointers.push_back(pClient);
BlockingTcpClient::Ptr client(pClient);
+ _clientPointers.push_back(client);
_clientPool.enqueue(client);
}
@@ -449,7 +457,7 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
_terminate(false),
_keepAliveThread(boost::bind(&StateQueueClient::keepAliveLoop, this)),
_zmqContext(new zmq::context_t(1)),
- _zmqSocket(*_zmqContext,ZMQ_SUB),
+ _zmqSocket(0),
_pEventThread(0),
_applicationId(applicationId),
_eventQueue(1000),
@@ -459,8 +467,12 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
_refreshSignin(false),
_currentSigninTick(-1)
{
- int linger = SQA_LINGER_TIME_MILLIS; // milliseconds
- _zmqSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(int));
+ if (_type != Publisher)
+ {
+ _zmqSocket = new zmq::socket_t(*_zmqContext,ZMQ_SUB);
+ int linger = SQA_LINGER_TIME_MILLIS; // milliseconds
+ _zmqSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(int));
+ }
for (std::size_t i = 0; i < _poolSize; i++)
{
@@ -472,8 +484,9 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
_serviceAddress = pClient->_serviceAddress;
_servicePort = pClient->_servicePort;
- _clientPointers.push_back(pClient);
+
BlockingTcpClient::Ptr client(pClient);
+ _clientPointers.push_back(client);
_clientPool.enqueue(client);
}
@@ -520,6 +533,12 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
delete _zmqContext;
_zmqContext = 0;
+ if (_zmqSocket)
+ {
+ delete _zmqSocket;
+ _zmqSocket = 0;
+ }
+
OS_LOG_INFO(FAC_NET, "StateQueueClient::terminate() waiting for event thread to exit.");
if (_pEventThread)
{
@@ -528,27 +547,17 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
_pEventThread = 0;
}
- //
- // Delete the client pointers
- //
- for (std::vector<BlockingTcpClient*>::const_iterator iter = _clientPointers.begin();
- iter != _clientPointers.end(); iter++)
- {
- BlockingTcpClient* pClient = *iter;
- delete pClient;
- }
-
OS_LOG_INFO(FAC_NET, "StateQueueClient::terminate() Ok");
}
void setExpires(int expires) { _expires = expires; }
bool isConnected()
{
- for (std::vector<BlockingTcpClient*>::const_iterator iter = _clientPointers.begin();
+ for (std::vector<BlockingTcpClient::Ptr>::const_iterator iter = _clientPointers.begin();
iter != _clientPointers.end(); iter++)
{
- BlockingTcpClient* pClient = *iter;
+ BlockingTcpClient::Ptr pClient = *iter;
if (pClient->isConnected())
return true;
}
@@ -596,11 +605,12 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
bool subscribe(const std::string& eventId, const std::string& sqaAddress)
{
assert(_type != Publisher);
+
OS_LOG_INFO(FAC_NET, "StateQueueClient::subscribe eventId=" << eventId << " address=" << sqaAddress);
try
{
- _zmqSocket.connect(sqaAddress.c_str());
- _zmqSocket.setsockopt(ZMQ_SUBSCRIBE, eventId.c_str(), eventId.size());
+ _zmqSocket->connect(sqaAddress.c_str());
+ _zmqSocket->setsockopt(ZMQ_SUBSCRIBE, eventId.c_str(), eventId.size());
}catch(std::exception e)
{
@@ -780,7 +790,7 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
do_pop(firstHit, 0, SQA_TERMINATE_STRING, SQA_TERMINATE_STRING);
}
- _zmqSocket.close();
+ _zmqSocket->close();
OS_LOG_INFO(FAC_NET, "StateQueueClient::eventLoop TERMINATED.");
}
@@ -886,7 +896,6 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
{
assert(_type != Publisher);
-
try
{
if (!zmq_receive(_zmqSocket, id))
@@ -955,10 +964,10 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
return (rc);
}
- static bool zmq_receive (zmq::socket_t & socket, std::string& value)
+ static bool zmq_receive (zmq::socket_t *socket, std::string& value)
{
zmq::message_t message;
- socket.recv(&message);
+ socket->recv(&message);
if (!message.size())
return false;
value = std::string(static_cast<char*>(message.data()), message.size());
@@ -155,10 +155,10 @@ inline void StateQueueNotification::stop()
NotifyData exitSignal;
exitSignal.key = _exitString;
enqueue(exitSignal);
+ _isRunning = false;
+ _queueThread->join();
delete _pPublisher;
_pPublisher = 0;
- _queueThread->join();
- _isRunning = false;
}
}

0 comments on commit 15e724a

Please sign in to comment.