@@ -134,6 +134,7 @@ m_LastSunriseSet("")
134
134
{
135
135
m_SecCountdown=-1 ;
136
136
m_stoprequested=false ;
137
+ m_stopRxMessageThread = false ;
137
138
m_verboselevel=EVBL_None;
138
139
139
140
m_bStartHardware=false ;
@@ -160,6 +161,8 @@ m_LastSunriseSet("")
160
161
m_LastUpdateCheck = 0 ;
161
162
m_bHaveUpdate = false ;
162
163
m_iRevision = 0 ;
164
+
165
+ m_rxMessageIdx = 1 ;
163
166
}
164
167
165
168
MainWorker::~MainWorker ()
@@ -884,6 +887,13 @@ bool MainWorker::Start()
884
887
885
888
bool MainWorker::Stop ()
886
889
{
890
+ if (m_rxMessageThread) {
891
+ // Stop RxMessage thread before hardware to avoid NULL pointer exception
892
+ m_stopRxMessageThread = true ;
893
+ UnlockRxMessageQueue ();
894
+ m_rxMessageThread->join ();
895
+ m_rxMessageThread.reset ();
896
+ }
887
897
if (m_thread)
888
898
{
889
899
m_webservers.StopServers ();
@@ -945,7 +955,9 @@ bool MainWorker::StartThread()
945
955
}
946
956
947
957
m_thread = boost::shared_ptr<boost::thread>(new boost::thread (boost::bind (&MainWorker::Do_Work, this )));
948
- return (m_thread!=NULL );
958
+ m_rxMessageThread = boost::shared_ptr<boost::thread>(new boost::thread (boost::bind (&MainWorker::Do_Work_On_Rx_Messages, this )));
959
+
960
+ return (m_thread!=NULL ) && (m_rxMessageThread!=NULL );
949
961
}
950
962
951
963
#define HEX ( x ) \
@@ -1638,9 +1650,215 @@ unsigned long long MainWorker::PerformRealActionFromDomoticzClient(const unsigne
1638
1650
return -1 ;
1639
1651
}
1640
1652
1641
- void MainWorker::DecodeRXMessage (const CDomoticzHardwareBase *pHardware, const unsigned char *pRXCommand) {
1653
+ void MainWorker::DecodeRXMessage (const CDomoticzHardwareBase *pHardware, const unsigned char *pRXCommand)
1654
+ {
1655
+ boost::lock_guard<boost::mutex> l (m_decodeRXMessageMutex);
1642
1656
if ((pHardware == NULL ) || (pRXCommand == NULL ))
1643
1657
return ;
1658
+ if ((pHardware->HwdType == HTYPE_Domoticz) && (pHardware->m_HwdID == 8765 ))
1659
+ {
1660
+ // Directly process the command
1661
+ ProcessRXMessage (pHardware, pRXCommand);
1662
+ }
1663
+ else
1664
+ {
1665
+ ProcessRXMessage (pHardware, pRXCommand);
1666
+ // Submit command without waiting for the command to be processed
1667
+ // PushRxMessage(pHardware, pRXCommand);
1668
+ }
1669
+ }
1670
+
1671
+ void MainWorker::PushRxMessage (const CDomoticzHardwareBase *pHardware, const unsigned char *pRXCommand)
1672
+ {
1673
+ // Check command, submit it without waiting for it to be processed
1674
+ CheckAndPushRxMessage (pHardware, pRXCommand, false );
1675
+ }
1676
+
1677
+ void MainWorker::PushAndWaitRxMessage (const CDomoticzHardwareBase *pHardware, const unsigned char *pRXCommand)
1678
+ {
1679
+ // Check command, submit it and wait for it to be processed
1680
+ CheckAndPushRxMessage (pHardware, pRXCommand, true );
1681
+ }
1682
+
1683
+ void MainWorker::CheckAndPushRxMessage (const CDomoticzHardwareBase *pHardware, const unsigned char *pRXCommand, const bool wait)
1684
+ {
1685
+ if ((pHardware == NULL ) || (pRXCommand == NULL )) {
1686
+ _log.Log (LOG_ERROR, " RxQueue: cannot push message with undefined hardware (%s) or command (%s)" ,
1687
+ (pHardware == NULL ) ? " null" : " not null" ,
1688
+ (pRXCommand == NULL ) ? " null" : " not null" );
1689
+ return ;
1690
+ }
1691
+ if (pHardware->m_HwdID < 1 ) {
1692
+ _log.Log (LOG_ERROR, " RxQueue: cannot push message with invalid hardware id (id=%d, type=%d, name=%s)" ,
1693
+ pHardware->m_HwdID ,
1694
+ pHardware->HwdType ,
1695
+ pHardware->Name .c_str ());
1696
+ return ;
1697
+ }
1698
+
1699
+ // Build queue item
1700
+ _tRxQueueItem rxMessage;
1701
+ rxMessage.rxMessageIdx = m_rxMessageIdx++;
1702
+ rxMessage.hardwareId = pHardware->m_HwdID ;
1703
+ // defensive copy of the command
1704
+ rxMessage.vrxCommand .insert (rxMessage.vrxCommand .begin (), pRXCommand, pRXCommand + pRXCommand[0 ] + 1 );
1705
+ rxMessage.crc = 0x0 ;
1706
+ #ifdef DEBUG_RXQUEUE
1707
+ // CRC
1708
+ boost::crc_optimal<16 , 0x1021 , 0xFFFF , 0 , false , false > crc_ccitt2;
1709
+ crc_ccitt2 = std::for_each (pRXCommand, pRXCommand + pRXCommand[0 ] + 1 , crc_ccitt2);
1710
+ rxMessage.crc = crc_ccitt2 ();
1711
+ #endif
1712
+
1713
+ if (m_stopRxMessageThread) {
1714
+ // Server is stopping
1715
+ return ;
1716
+ }
1717
+
1718
+ // Trigger
1719
+ rxMessage.trigger = NULL ; // Should be initialized to NULL if trigger is no used
1720
+ if (wait) { // add trigger to wait for the message to be processed
1721
+ rxMessage.trigger = new queue_element_trigger ();
1722
+ }
1723
+
1724
+ #ifdef DEBUG_RXQUEUE
1725
+ _log.Log (LOG_STATUS, " RxQueue: push a rxMessage(%lu) (hrdwId=%d, hrdwType=%d, hrdwName=%s, type=%02X, subtype=%02X)" ,
1726
+ rxMessage.rxMessageIdx ,
1727
+ pHardware->m_HwdID ,
1728
+ pHardware->HwdType ,
1729
+ pHardware->Name .c_str (),
1730
+ pRXCommand[1 ],
1731
+ pRXCommand[2 ]);
1732
+ #endif
1733
+
1734
+ // Push item to queue
1735
+ m_rxMessageQueue.push (rxMessage);
1736
+
1737
+ if (rxMessage.trigger != NULL ) {
1738
+ #ifdef DEBUG_RXQUEUE
1739
+ _log.Log (LOG_STATUS, " RxQueue: wait for rxMessage(%lu) to be processed..." , rxMessage.rxMessageIdx );
1740
+ #endif
1741
+ bool moreThanTimeout = true ;
1742
+ while (!rxMessage.trigger ->timed_wait (boost::posix_time::milliseconds (1000 ))) {
1743
+ #ifdef DEBUG_RXQUEUE
1744
+ _log.Log (LOG_STATUS, " RxQueue: wait 1s for rxMessage(%lu) to be processed..." , rxMessage.rxMessageIdx );
1745
+ #endif
1746
+ moreThanTimeout = true ;
1747
+ if (m_stopRxMessageThread) {
1748
+ // Server is stopping
1749
+ break ;
1750
+ }
1751
+ }
1752
+ #ifdef DEBUG_RXQUEUE
1753
+ if (moreThanTimeout) {
1754
+ _log.Log (LOG_STATUS, " RxQueue: rxMessage(%lu) processed" , rxMessage.rxMessageIdx );
1755
+ }
1756
+ #endif
1757
+ delete rxMessage.trigger ;
1758
+ }
1759
+ }
1760
+
1761
+ void MainWorker::UnlockRxMessageQueue ()
1762
+ {
1763
+ #ifdef DEBUG_RXQUEUE
1764
+ _log.Log (LOG_STATUS, " RxQueue: unlock queue using dummy message" );
1765
+ #endif
1766
+ // Push dummy message to unlock queue
1767
+ _tRxQueueItem rxMessage;
1768
+ rxMessage.rxMessageIdx = m_rxMessageIdx++;
1769
+ rxMessage.hardwareId = -1 ;
1770
+ rxMessage.trigger = NULL ;
1771
+ m_rxMessageQueue.push (rxMessage);
1772
+ }
1773
+
1774
+ void MainWorker::Do_Work_On_Rx_Messages ()
1775
+ {
1776
+ _log.Log (LOG_STATUS, " RxQueue: queue worker started..." );
1777
+
1778
+ m_stopRxMessageThread = false ;
1779
+ while (true ) {
1780
+ if (m_stopRxMessageThread) {
1781
+ // Server is stopping
1782
+ break ;
1783
+ }
1784
+
1785
+ // Wait and pop next message or timeout
1786
+ _tRxQueueItem rxQItem;
1787
+ bool hasPopped = m_rxMessageQueue.timed_wait_and_pop <boost::posix_time::milliseconds>(rxQItem,
1788
+ boost::posix_time::milliseconds (5000 ));// (if no message for 2 seconds, returns anyway to check m_stopRxMessageThread)
1789
+
1790
+ if (!hasPopped) {
1791
+ // Timeout occurred : queue is empty
1792
+ #ifdef DEBUG_RXQUEUE
1793
+ // _log.Log(LOG_STATUS, "RxQueue: the queue has been empty for five seconds");
1794
+ #endif
1795
+ continue ;
1796
+ }
1797
+ if (rxQItem.hardwareId == -1 ) {
1798
+ // dummy message
1799
+ #ifdef DEBUG_RXQUEUE
1800
+ _log.Log (LOG_STATUS, " RxQueue: dummy message popped" );
1801
+ #endif
1802
+ continue ;
1803
+ }
1804
+ if (rxQItem.hardwareId < 1 ) {
1805
+ _log.Log (LOG_ERROR, " RxQueue: cannot process invalid hardware id: (%d)" , rxQItem.hardwareId );
1806
+ // cannot process message with invalid id or null message
1807
+ if (rxQItem.trigger != NULL ) rxQItem.trigger ->popped ();
1808
+ continue ;
1809
+ }
1810
+
1811
+ const CDomoticzHardwareBase *pHardware = GetHardware (rxQItem.hardwareId );
1812
+
1813
+ // Check pointers
1814
+ if (pHardware == NULL ) {
1815
+ _log.Log (LOG_ERROR, " RxQueue: cannot retrieve hardware with id: %d" , rxQItem.hardwareId );
1816
+ if (rxQItem.trigger != NULL ) rxQItem.trigger ->popped ();
1817
+ continue ;
1818
+ }
1819
+ if (rxQItem.vrxCommand .empty ()) {
1820
+ _log.Log (LOG_ERROR, " RxQueue: cannot retrieve command with id: %d" , rxQItem.hardwareId );
1821
+ if (rxQItem.trigger != NULL ) rxQItem.trigger ->popped ();
1822
+ continue ;
1823
+ }
1824
+
1825
+ const unsigned char *pRXCommand = &rxQItem.vrxCommand [0 ];
1826
+
1827
+ #ifdef DEBUG_RXQUEUE
1828
+ // CRC
1829
+ boost::uint16_t crc = rxQItem.crc ;
1830
+ boost::crc_optimal<16 , 0x1021 , 0xFFFF , 0 , false , false > crc_ccitt2;
1831
+ crc_ccitt2 = std::for_each (pRXCommand, pRXCommand+rxQItem.vrxCommand .size (), crc_ccitt2);
1832
+ if (crc != crc_ccitt2 ()) {
1833
+ _log.Log (LOG_ERROR, " RxQueue: cannot process invalid rxMessage(%lu) from hardware with id=%d (type %d)" ,
1834
+ rxQItem.rxMessageIdx ,
1835
+ rxQItem.hardwareId ,
1836
+ pHardware->HwdType );
1837
+ if (rxQItem.trigger != NULL ) rxQItem.trigger ->popped ();
1838
+ continue ;
1839
+ }
1840
+
1841
+ _log.Log (LOG_STATUS, " RxQueue: process a rxMessage(%lu) (hrdwId=%d, hrdwType=%d, hrdwName=%s, type=%02X, subtype=%02X)" ,
1842
+ rxQItem.rxMessageIdx ,
1843
+ pHardware->m_HwdID ,
1844
+ pHardware->HwdType ,
1845
+ pHardware->Name .c_str (),
1846
+ pRXCommand[1 ],
1847
+ pRXCommand[2 ]);
1848
+ #endif
1849
+
1850
+ ProcessRXMessage (pHardware, pRXCommand);
1851
+ if (rxQItem.trigger != NULL )
1852
+ {
1853
+ rxQItem.trigger ->popped ();
1854
+ }
1855
+ }
1856
+
1857
+ _log.Log (LOG_STATUS, " RxQueue: queue worker stopped..." );
1858
+ }
1859
+
1860
+ void MainWorker::ProcessRXMessage (const CDomoticzHardwareBase *pHardware, const unsigned char *pRXCommand)
1861
+ {
1644
1862
// current date/time based on current system
1645
1863
size_t Len = pRXCommand[0 ] + 1 ;
1646
1864
0 commit comments