11
11
#include " ../webserver/Base64.h"
12
12
#include " ../main/WebServer.h"
13
13
#include " ../webserver/cWebem.h"
14
+ #include " ../main/localtime_r.h"
14
15
#define __STDC_FORMAT_MACROS
15
16
#include < inttypes.h>
16
17
17
- CInfluxPush::CInfluxPush ()
18
+ CInfluxPush::CInfluxPush ():
19
+ m_InfluxPort(8086 ),
20
+ m_bInfluxDebugActive(false ),
21
+ m_stoprequested(false )
18
22
{
19
23
m_bLinkActive = false ;
20
24
}
21
25
22
26
void CInfluxPush::Start ()
23
27
{
24
- UpdateActive ();
28
+ UpdateSettings ();
25
29
m_sConnection = m_mainworker.sOnDeviceReceived .connect (boost::bind (&CInfluxPush::OnDeviceReceived, this , _1, _2, _3, _4));
30
+ StartThread ();
26
31
}
27
32
28
33
void CInfluxPush::Stop ()
29
34
{
30
35
if (m_sConnection.connected ())
31
36
m_sConnection.disconnect ();
37
+ StopThread ();
32
38
}
33
39
34
- void CInfluxPush::UpdateActive ()
40
+ void CInfluxPush::UpdateSettings ()
35
41
{
36
42
int fActive ;
37
43
m_sql.GetPreferencesVar (" InfluxActive" , fActive );
38
44
m_bLinkActive = (fActive == 1 );
45
+ m_InfluxPort = 8086 ;
46
+ m_sql.GetPreferencesVar (" InfluxIP" , m_InfluxIP);
47
+ m_sql.GetPreferencesVar (" InfluxPort" , m_InfluxPort);
48
+ m_sql.GetPreferencesVar (" InfluxDatabase" , m_InfluxDatabase);
49
+ int InfluxDebugActiveInt;
50
+ m_bInfluxDebugActive = false ;
51
+ m_sql.GetPreferencesVar (" InfluxDebug" , InfluxDebugActiveInt);
52
+ if (InfluxDebugActiveInt == 1 ) {
53
+ m_bInfluxDebugActive = true ;
54
+ }
55
+ m_szURL = " " ;
56
+ if (
57
+ (m_InfluxIP == " " ) ||
58
+ (m_InfluxPort == 0 ) ||
59
+ (m_InfluxDatabase == " " )
60
+ )
61
+ return ;
62
+ std::stringstream sURL ;
63
+ if (m_InfluxIP.find (" ://" ) == std::string::npos)
64
+ sURL << " http://" ;
65
+ sURL << m_InfluxIP << " :" << m_InfluxPort << " /write?db=" << m_InfluxDatabase;
66
+ m_szURL = sURL .str ();
39
67
}
40
68
41
69
void CInfluxPush::OnDeviceReceived (const int m_HwdID, const uint64_t DeviceRowIdx, const std::string &DeviceName, const unsigned char *pRXCommand)
@@ -49,34 +77,14 @@ void CInfluxPush::OnDeviceReceived(const int m_HwdID, const uint64_t DeviceRowId
49
77
50
78
void CInfluxPush::DoInfluxPush ()
51
79
{
52
- std::string InfluxIP = " " ;
53
- int InfluxPort = 8086 ;
54
- std::string InfluxDatabase = " " ;
55
- m_sql.GetPreferencesVar (" InfluxIP" , InfluxIP);
56
- m_sql.GetPreferencesVar (" InfluxPort" , InfluxPort);
57
- m_sql.GetPreferencesVar (" InfluxDatabase" , InfluxDatabase);
58
-
59
-
60
- int InfluxDebugActiveInt;
61
- bool InfluxDebugActive = false ;
62
- m_sql.GetPreferencesVar (" InfluxDebug" , InfluxDebugActiveInt);
63
- if (InfluxDebugActiveInt == 1 ) {
64
- InfluxDebugActive = true ;
65
- }
66
-
67
- if (
68
- (InfluxIP == " " ) ||
69
- (InfluxPort == 0 ) ||
70
- (InfluxDatabase == " " )
71
- )
72
- return ;
73
80
std::vector<std::vector<std::string> > result;
74
81
result = m_sql.safe_query (
75
82
" SELECT A.DeviceID, A.DelimitedValue, B.ID, B.Type, B.SubType, B.nValue, B.sValue, A.TargetType, A.TargetVariable, A.TargetDeviceID, A.TargetProperty, A.IncludeUnit, B.Name, B.SwitchType FROM PushLink as A, DeviceStatus as B "
76
83
" WHERE (A.PushType==1 AND A.DeviceID == '%" PRIu64 " ' AND A.Enabled==1 AND A.DeviceID==B.ID)" ,
77
84
m_DeviceRowIdx);
78
85
if (result.size ()>0 )
79
86
{
87
+ time_t atime = mytime (NULL );
80
88
std::string sendValue;
81
89
std::vector<std::vector<std::string> >::const_iterator itt;
82
90
for (itt=result.begin (); itt!=result.end (); ++itt)
@@ -106,36 +114,98 @@ void CInfluxPush::DoInfluxPush()
106
114
}
107
115
}
108
116
if (sendValue !=" " ) {
109
- std::stringstream szKey;
117
+ std::string szKey;
110
118
std::string vType = DropdownOptionsValue (m_DeviceRowIdx, delpos);
111
119
stdreplace (vType, " " , " -" );
112
120
stdreplace (name, " " , " -" );
113
- szKey << vType << " ,idx=" << sd[0 ] << " ,name=" << name << " value= " << sendValue ;
121
+ szKey = vType + " ,idx=" + sd[0 ] + " ,name=" + name;
114
122
115
- std::string szPostdata = szKey.str ();
116
- std::vector<std::string> ExtraHeaders;
123
+ _tPushItem pItem;
124
+ pItem.skey = szKey;
125
+ pItem.stimestamp = atime;
126
+ pItem.svalue = sendValue;
117
127
118
- std::stringstream sURL ;
119
- if (InfluxIP.find (" ://" ) == std::string::npos)
120
- sURL << " http://" ;
121
- sURL << InfluxIP << " :" << InfluxPort << " /write?db=" << InfluxDatabase;
128
+ if (targetType == 0 )
129
+ {
130
+ // Only send on change
131
+ std::map<std::string, _tPushItem>::iterator itt = m_PushedItems.find (szKey);
132
+ if (itt != m_PushedItems.end ())
133
+ {
134
+ if (sendValue == itt->second .svalue )
135
+ continue ;
136
+ }
137
+ m_PushedItems[szKey] = pItem;
138
+ }
122
139
140
+ boost::lock_guard<boost::mutex> l (m_background_task_mutex);
141
+ if (m_background_task_queue.size () < 50 )
142
+ m_background_task_queue.push_back (pItem);
143
+ }
144
+ }
145
+ }
146
+ }
123
147
124
- if (InfluxDebugActive) {
125
- _log.Log (LOG_NORM, " InfluxLink: value %s" , szPostdata.c_str ());
126
- }
148
+ bool CInfluxPush::StartThread ()
149
+ {
150
+ StopThread ();
151
+ m_stoprequested = false ;
152
+ m_background_task_thread = boost::shared_ptr<boost::thread>(new boost::thread (boost::bind (&CInfluxPush::Do_Work, this )));
153
+ return (m_background_task_thread != NULL );
154
+ }
127
155
128
- std::string sResult ;
129
- if (!HTTPClient::POST (sURL .str (), szPostdata, ExtraHeaders, sResult ))
130
- {
131
- _log.Log (LOG_ERROR, " InfluxLink: Error sending data to InfluxDB server! (check address/port/database)" );
132
- return ;
133
- }
156
+ void CInfluxPush::StopThread ()
157
+ {
158
+ if (m_background_task_thread)
159
+ {
160
+ m_stoprequested = true ;
161
+ m_background_task_thread->join ();
162
+ }
163
+ }
164
+
165
+
166
+ void CInfluxPush::Do_Work ()
167
+ {
168
+ std::vector<_tPushItem> _items2do;
169
+
170
+ while (!m_stoprequested)
171
+ {
172
+ sleep_milliseconds (500 );
173
+
174
+ { // additional scope for lock (accessing size should be within lock too)
175
+ boost::lock_guard<boost::mutex> l (m_background_task_mutex);
176
+ if (m_background_task_queue.empty ())
177
+ continue ;
178
+ _items2do = m_background_task_queue;
179
+ m_background_task_queue.clear ();
180
+ }
181
+
182
+ if (_items2do.size () < 1 ) {
183
+ continue ;
184
+ }
185
+ if (m_szURL.empty ())
186
+ continue ;
187
+
188
+ std::vector<_tPushItem>::iterator itt = _items2do.begin ();
189
+ while (itt != _items2do.end ())
190
+ {
191
+ std::string szPostdata = itt->skey + " value=" + itt->svalue ;
192
+ std::vector<std::string> ExtraHeaders;
193
+
194
+ if (m_bInfluxDebugActive) {
195
+ _log.Log (LOG_NORM, " InfluxLink: value %s" , szPostdata.c_str ());
196
+ }
197
+
198
+ std::string sResult ;
199
+ if (!HTTPClient::POST (m_szURL, szPostdata, ExtraHeaders, sResult ))
200
+ {
201
+ _log.Log (LOG_ERROR, " InfluxLink: Error sending data to InfluxDB server! (check address/port/database)" );
134
202
}
203
+ ++itt;
135
204
}
136
205
}
137
206
}
138
207
208
+
139
209
// Webserver helpers
140
210
namespace http {
141
211
namespace server {
@@ -167,7 +237,7 @@ namespace http {
167
237
m_sql.UpdatePreferencesVar (" InfluxPort" , atoi (port.c_str ()));
168
238
m_sql.UpdatePreferencesVar (" InfluxDatabase" , database.c_str ());
169
239
m_sql.UpdatePreferencesVar (" InfluxDebug" , idebugenabled);
170
- m_mainworker.m_influxpush .UpdateActive ();
240
+ m_mainworker.m_influxpush .UpdateSettings ();
171
241
root[" status" ] = " OK" ;
172
242
root[" title" ] = " SaveInfluxLinkConfig" ;
173
243
}
@@ -253,14 +323,16 @@ namespace http {
253
323
std::string deviceid = request::findValue (&req, " deviceid" );
254
324
int deviceidi = atoi (deviceid.c_str ());
255
325
std::string valuetosend = request::findValue (&req, " valuetosend" );
326
+ std::string targettype = request::findValue (&req, " targettype" );
327
+ int targettypei = atoi (targettype.c_str ());
256
328
std::string linkactive = request::findValue (&req, " linkactive" );
257
329
if (idx == " 0" ) {
258
330
m_sql.safe_query (
259
331
" INSERT INTO PushLink (PushType,DeviceID,DelimitedValue,TargetType,TargetVariable,TargetDeviceID,TargetProperty,IncludeUnit,Enabled) VALUES (%d,'%d',%d,%d,'%q',%d,'%q',%d,%d)" ,
260
332
1 ,
261
333
deviceidi,
262
334
atoi (valuetosend.c_str ()),
263
- 0 ,
335
+ targettypei ,
264
336
" -" ,
265
337
0 ,
266
338
" -" ,
@@ -270,9 +342,10 @@ namespace http {
270
342
}
271
343
else {
272
344
m_sql.safe_query (
273
- " UPDATE PushLink SET DeviceID='%d' , DelimitedValue=%d, Enabled='%d' WHERE (ID == '%q')" ,
345
+ " UPDATE PushLink SET DeviceID=%d , DelimitedValue=%d, TargetType=%d, Enabled=%d WHERE (ID == '%q')" ,
274
346
deviceidi,
275
347
atoi (valuetosend.c_str ()),
348
+ targettypei,
276
349
atoi (linkactive.c_str ()),
277
350
idx.c_str ()
278
351
);
0 commit comments