Skip to content
This repository
Browse code

PT38951557 - sipXrls does not shutdown when signaled with SIGTERM or …

…SIGINT

There were several changes:
StateQueueNotification.h
  - reorganized the stop sequence;
StateQueueClient.h
  - zmqSocket was created but was not used nor closed when type was publisher. This was leading to zmqContext::term to be blocked forever thus leading to rls shutdown being blocked forever.
  The zmqSocket should be created only when type is not publisher.
  - There was a corruption in the destructor of BlockingTcpClient because it was called twice for the same object. This was happening because these objects where created as regular pointers
but where used in a vector as regular pointers and in a queue as shared pointers. That is why the destructor was called twice. The _clientPointers vector was changed to hold shared
pointers instead of regular pointers.
  • Loading branch information...
commit 15e724a82e9b5ca55013a858573e3eda0d9516c0 1 parent f1093d5
dtacalau authored November 07, 2012
69  sipXsqa/include/sqa/StateQueueClient.h
@@ -29,7 +29,6 @@
29 29
 #include <boost/lexical_cast.hpp>
30 30
 #include "ServiceOptions.h"
31 31
 
32  
-
33 32
 #define SQA_LINGER_TIME_MILLIS 5000
34 33
 #define SQA_TERMINATE_STRING "__TERMINATE__"
35 34
 #define SQA_CONN_MAX_READ_BUFF_SIZE 65536
@@ -72,7 +71,11 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
72 71
 
73 72
     ~BlockingTcpClient()
74 73
     {
75  
-      delete _pSocket;
  74
+        if (_pSocket)
  75
+        {
  76
+            delete _pSocket;
  77
+            _pSocket = 0;
  78
+        }
76 79
     }
77 80
 
78 81
     void setReadTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds)
@@ -355,13 +358,13 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
355 358
   bool _terminate;
356 359
   boost::thread _keepAliveThread;
357 360
   zmq::context_t* _zmqContext;
358  
-  zmq::socket_t _zmqSocket;
  361
+  zmq::socket_t* _zmqSocket;
359 362
   boost::thread* _pEventThread;
360 363
   std::string _zmqEventId;
361 364
   std::string _applicationId;
362 365
   typedef BlockingQueue<std::string> EventQueue;
363 366
   EventQueue _eventQueue;
364  
-  std::vector<BlockingTcpClient*> _clientPointers;
  367
+  std::vector<BlockingTcpClient::Ptr> _clientPointers;
365 368
   int _expires;
366 369
   int _subscriptionExpires;
367 370
   int _backoffCount;
@@ -389,7 +392,7 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
389 392
     _terminate(false),
390 393
     _keepAliveThread(boost::bind(&StateQueueClient::keepAliveLoop, this)),
391 394
     _zmqContext(new zmq::context_t(1)),
392  
-    _zmqSocket(*_zmqContext,ZMQ_SUB),
  395
+    _zmqSocket(0),
393 396
     _pEventThread(0),
394 397
     _applicationId(applicationId),
395 398
     _eventQueue(1000),
@@ -399,8 +402,13 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
399 402
     _refreshSignin(false),
400 403
     _currentSigninTick(-1)
401 404
   {
402  
-      int linger = SQA_LINGER_TIME_MILLIS; // milliseconds
403  
-      _zmqSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(int));
  405
+      if (_type != Publisher)
  406
+      {
  407
+        _zmqSocket = new zmq::socket_t(*_zmqContext,ZMQ_SUB);
  408
+		int linger = SQA_LINGER_TIME_MILLIS; // milliseconds
  409
+		_zmqSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(int));
  410
+      }
  411
+
404 412
       for (std::size_t i = 0; i < _poolSize; i++)
405 413
       {
406 414
         BlockingTcpClient* pClient = new BlockingTcpClient(_ioService, readTimeout, writeTimeout, i == 0 ? SQA_KEY_ALPHA : SQA_KEY_DEFAULT );
@@ -409,8 +417,8 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
409 417
         if (_localAddress.empty())
410 418
           _localAddress = pClient->getLocalAddress();
411 419
 
412  
-        _clientPointers.push_back(pClient);
413 420
         BlockingTcpClient::Ptr client(pClient);
  421
+        _clientPointers.push_back(client);
414 422
         _clientPool.enqueue(client);
415 423
       }
416 424
 
@@ -449,7 +457,7 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
449 457
     _terminate(false),
450 458
     _keepAliveThread(boost::bind(&StateQueueClient::keepAliveLoop, this)),
451 459
     _zmqContext(new zmq::context_t(1)),
452  
-    _zmqSocket(*_zmqContext,ZMQ_SUB),
  460
+    _zmqSocket(0),
453 461
     _pEventThread(0),
454 462
     _applicationId(applicationId),
455 463
     _eventQueue(1000),
@@ -459,8 +467,12 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
459 467
     _refreshSignin(false),
460 468
     _currentSigninTick(-1)
461 469
   {
462  
-      int linger = SQA_LINGER_TIME_MILLIS; // milliseconds
463  
-      _zmqSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(int));
  470
+      if (_type != Publisher)
  471
+      {
  472
+        _zmqSocket = new zmq::socket_t(*_zmqContext,ZMQ_SUB);
  473
+        int linger = SQA_LINGER_TIME_MILLIS; // milliseconds
  474
+        _zmqSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(int));
  475
+      }
464 476
 
465 477
       for (std::size_t i = 0; i < _poolSize; i++)
466 478
       {
@@ -472,8 +484,9 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
472 484
         
473 485
         _serviceAddress = pClient->_serviceAddress;
474 486
         _servicePort = pClient->_servicePort;
475  
-        _clientPointers.push_back(pClient);
  487
+
476 488
         BlockingTcpClient::Ptr client(pClient);
  489
+        _clientPointers.push_back(client);
477 490
         _clientPool.enqueue(client);
478 491
       }
479 492
 
@@ -520,6 +533,12 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
520 533
     delete _zmqContext;
521 534
     _zmqContext = 0;
522 535
 
  536
+    if (_zmqSocket)
  537
+    {
  538
+      delete _zmqSocket;
  539
+      _zmqSocket = 0;
  540
+    }
  541
+
523 542
     OS_LOG_INFO(FAC_NET, "StateQueueClient::terminate() waiting for event thread to exit.");
524 543
     if (_pEventThread)
525 544
     {
@@ -528,16 +547,6 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
528 547
       _pEventThread = 0;
529 548
     }
530 549
 
531  
-    //
532  
-    // Delete the client pointers
533  
-    //
534  
-    for (std::vector<BlockingTcpClient*>::const_iterator iter = _clientPointers.begin();
535  
-            iter != _clientPointers.end(); iter++)
536  
-    {
537  
-      BlockingTcpClient* pClient = *iter;
538  
-      delete pClient;
539  
-    }
540  
-
541 550
     OS_LOG_INFO(FAC_NET, "StateQueueClient::terminate() Ok");
542 551
   }
543 552
 
@@ -545,10 +554,10 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
545 554
 
546 555
   bool isConnected()
547 556
   {
548  
-    for (std::vector<BlockingTcpClient*>::const_iterator iter = _clientPointers.begin();
  557
+    for (std::vector<BlockingTcpClient::Ptr>::const_iterator iter = _clientPointers.begin();
549 558
             iter != _clientPointers.end(); iter++)
550 559
     {
551  
-      BlockingTcpClient* pClient = *iter;
  560
+      BlockingTcpClient::Ptr pClient = *iter;
552 561
       if (pClient->isConnected())
553 562
         return true;
554 563
     }
@@ -596,11 +605,12 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
596 605
   bool subscribe(const std::string& eventId, const std::string& sqaAddress)
597 606
   {
598 607
     assert(_type != Publisher);
  608
+
599 609
     OS_LOG_INFO(FAC_NET, "StateQueueClient::subscribe eventId=" << eventId << " address=" << sqaAddress);
600 610
     try
601 611
     {
602  
-      _zmqSocket.connect(sqaAddress.c_str());
603  
-      _zmqSocket.setsockopt(ZMQ_SUBSCRIBE, eventId.c_str(), eventId.size());
  612
+      _zmqSocket->connect(sqaAddress.c_str());
  613
+      _zmqSocket->setsockopt(ZMQ_SUBSCRIBE, eventId.c_str(), eventId.size());
604 614
 
605 615
     }catch(std::exception e)
606 616
     {
@@ -780,7 +790,7 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
780 790
       do_pop(firstHit, 0, SQA_TERMINATE_STRING, SQA_TERMINATE_STRING);
781 791
     }
782 792
 
783  
-    _zmqSocket.close();
  793
+    _zmqSocket->close();
784 794
 
785 795
     OS_LOG_INFO(FAC_NET, "StateQueueClient::eventLoop TERMINATED.");
786 796
   }
@@ -886,7 +896,6 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
886 896
   {
887 897
     assert(_type != Publisher);
888 898
 
889  
-
890 899
     try
891 900
     {
892 901
       if (!zmq_receive(_zmqSocket, id))
@@ -955,10 +964,10 @@ class StateQueueClient : public boost::enable_shared_from_this<StateQueueClient>
955 964
     return (rc);
956 965
   }
957 966
 
958  
-  static bool zmq_receive (zmq::socket_t & socket, std::string& value)
  967
+  static bool zmq_receive (zmq::socket_t *socket, std::string& value)
959 968
   {
960 969
       zmq::message_t message;
961  
-      socket.recv(&message);
  970
+      socket->recv(&message);
962 971
       if (!message.size())
963 972
         return false;
964 973
       value = std::string(static_cast<char*>(message.data()), message.size());
4  sipXsqa/include/sqa/StateQueueNotification.h
@@ -155,10 +155,10 @@ inline void StateQueueNotification::stop()
155 155
     NotifyData exitSignal;
156 156
     exitSignal.key = _exitString;
157 157
     enqueue(exitSignal);
  158
+    _isRunning = false;
  159
+    _queueThread->join();
158 160
     delete _pPublisher;
159 161
     _pPublisher = 0;
160  
-    _queueThread->join();
161  
-    _isRunning = false;
162 162
   }
163 163
 }
164 164
 

0 notes on commit 15e724a

Please sign in to comment.
Something went wrong with that request. Please try again.