Skip to content

Commit 207e4cf

Browse files
committed
Eliminated WriteThread. Crash bug seems to be solved.
1 parent 39ffefe commit 207e4cf

File tree

2 files changed

+40
-76
lines changed

2 files changed

+40
-76
lines changed

webserver/proxyclient.cpp

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
extern std::string szAppVersion;
1212

1313
#define TIMEOUT 60
14+
#define WRITE_QUEUE_SIZE 1024
1415
#define ADDPDUSTRING(value) parameters.AddPart(value)
1516
#define ADDPDUSTRINGBINARY(value) parameters.AddPart(value, false)
1617
#define ADDPDULONG(value) parameters.AddLong(value)
@@ -32,8 +33,10 @@ namespace http {
3233
b_Connected(false),
3334
we_locked_prefs_mutex(false),
3435
timeout_(TIMEOUT),
35-
timer_(io_service, boost::posix_time::seconds(TIMEOUT))
36+
timer_(io_service, boost::posix_time::seconds(TIMEOUT)),
37+
writeQ(WRITE_QUEUE_SIZE)
3638
{
39+
writePdu = NULL;
3740
_apikey = "";
3841
_password = "";
3942
_allowed_subsystems = 0;
@@ -47,7 +50,6 @@ namespace http {
4750
doStop = true;
4851
return;
4952
}
50-
m_writeThread = new boost::thread(boost::bind(&CProxyClient::WriteThread, this));
5153
m_pWebEm = webEm;
5254
m_pDomServ = NULL;
5355
Reconnect();
@@ -132,34 +134,48 @@ namespace http {
132134

133135
void CProxyClient::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
134136
{
137+
boost::mutex::scoped_lock l(writeMutex);
138+
if (bytes_transferred < writePdu->length()) {
139+
_log.Log(LOG_ERROR, "PROXY: Only write %ld of %ld bytes.", bytes_transferred, writePdu->length());
140+
}
141+
delete writePdu;
142+
writePdu = NULL;
135143
if (error) {
136144
_log.Log(LOG_ERROR, "PROXY: Write failed, code = %d, %s", error.value(), error.message().c_str());
137145
}
138-
writeCon.notify_one();
146+
ProxyPdu *pdu;
147+
if (writeQ.pop(pdu)) {
148+
SocketWrite(pdu);
149+
}
139150
}
140151

141-
void CProxyClient::WriteThread()
152+
void CProxyClient::SocketWrite(ProxyPdu *pdu)
142153
{
143-
std::vector<boost::asio::const_buffer> _writebuf;
144-
ProxyPdu *writePdu;
145-
146-
while ((writePdu = writeQ.Take()) != NULL) {
147-
boost::mutex::scoped_lock l(writeMutex);
148-
_writebuf.push_back(boost::asio::buffer(writePdu->content(), writePdu->length()));
149-
boost::asio::async_write(_socket, _writebuf, boost::bind(&CProxyClient::handle_write, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
150-
writeCon.wait(l);
151-
delete writePdu;
152-
}
153-
// end of thread
154+
// do not call directly, use MyWrite()
155+
writePdu = pdu;
156+
_writebuf.clear(); // make sure
157+
_writebuf.push_back(boost::asio::buffer(writePdu->content(), writePdu->length()));
158+
boost::asio::async_write(_socket, _writebuf, boost::bind(&CProxyClient::handle_write, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
154159
}
155160

156161
void CProxyClient::MyWrite(pdu_type type, CValueLengthPart &parameters)
157162
{
163+
boost::mutex::scoped_lock l(writeMutex);
158164
if (!b_Connected) {
159165
return;
160166
}
161-
ProxyPdu *writePdu = new ProxyPdu(type, &parameters);
162-
writeQ.Put(writePdu);
167+
ProxyPdu *pdu= new ProxyPdu(type, &parameters);
168+
if (writePdu) {
169+
// write in progress, add to queue
170+
if (!writeQ.push(pdu)) {
171+
_log.Log(LOG_ERROR, "PROXY: Too many writes at a time, dropping packet.");
172+
delete pdu;
173+
return;
174+
}
175+
}
176+
else {
177+
SocketWrite(pdu);
178+
}
163179
}
164180

165181
void CProxyClient::LoginToService()
@@ -623,8 +639,6 @@ namespace http {
623639

624640
doStop = true;
625641
// signal end of WriteThread
626-
writeQ.Put(NULL);
627-
m_writeThread->join();
628642
_socket.lowest_layer().close();
629643
}
630644

webserver/proxyclient.h

Lines changed: 7 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55
#include <boost/asio.hpp>
66
#include <boost/asio/ssl.hpp>
77
#include <boost/thread.hpp>
8+
#include <boost/lockfree/queue.hpp>
89
#include <list>
910
#include <boost/thread/mutex.hpp>
10-
#include <boost/thread/condition_variable.hpp>
11-
#include <boost/thread/condition.hpp>
1211
#include "proxycommon.h"
1312
#include "cWebem.h"
1413
#include "request_handler.hpp"
@@ -29,44 +28,6 @@ class DomoticzTCP;
2928
namespace http {
3029
namespace server {
3130

32-
template <typename T>
33-
class BlockingQueue
34-
{
35-
public:
36-
BlockingQueue() : queue(), mutex(), cond() {}
37-
~BlockingQueue() {}
38-
39-
public:
40-
41-
void Put(T obj)
42-
{
43-
boost::mutex::scoped_lock lock(mutex);
44-
queue.push_back(obj);
45-
cond.notify_all();
46-
}
47-
48-
T Take()
49-
{
50-
boost::mutex::scoped_lock lock(mutex);
51-
while (queue.size() == 0)
52-
{
53-
cond.wait(lock);
54-
}
55-
56-
T value = queue.front();
57-
queue.pop_front();
58-
59-
return value;
60-
}
61-
62-
private:
63-
std::list<T> queue;
64-
boost::mutex mutex;
65-
boost::condition cond;
66-
67-
};
68-
69-
7031
#define ONPDU(type) case type: Handle##type(#type, part); break;
7132
#define PDUPROTO(type) virtual void Handle##type(const char *pduname, CValueLengthPart &part);
7233
#define PDUFUNCTION(type) void CProxyClient::Handle##type(const char *pduname, CValueLengthPart &part)
@@ -96,11 +57,13 @@ namespace http {
9657
void handle_write(const boost::system::error_code& error,
9758
size_t bytes_transferred);
9859
boost::mutex writeMutex;
99-
boost::condition_variable writeCon;
10060

10161
void ReadMore();
10262

10363
void MyWrite(pdu_type type, CValueLengthPart &parameters);
64+
void SocketWrite(ProxyPdu *pdu);
65+
std::vector<boost::asio::const_buffer> _writebuf;
66+
ProxyPdu *writePdu;
10467
void LoginToService();
10568

10669
PDUPROTO(PDU_REQUEST)
@@ -113,20 +76,9 @@ namespace http {
11376
PDUPROTO(PDU_SERV_RECEIVE)
11477
PDUPROTO(PDU_SERV_SEND)
11578
PDUPROTO(PDU_SERV_ROSTERIND)
116-
void GetRequest(const std::string originatingip, boost::asio::mutable_buffers_1 _buf, http::server::reply &reply_);
117-
#if 0
118-
void HandleRequest(ProxyPdu *pdu);
119-
void HandleAssignkey(ProxyPdu *pdu);
120-
void HandleEnquire(ProxyPdu *pdu);
121-
void HandleAuthresp(ProxyPdu *pdu);
122-
void HandleServConnect(ProxyPdu *pdu);
123-
void HandleServConnectResp(ProxyPdu *pdu);
124-
void HandleServDisconnect(ProxyPdu *pdu);
125-
void HandleServReceive(ProxyPdu *pdu);
126-
void HandleServSend(ProxyPdu *pdu);
127-
void HandleServRosterInd(ProxyPdu *pdu);
128-
#endif
79+
void GetRequest(const std::string originatingip, boost::asio::mutable_buffers_1 _buf, http::server::reply &reply_);
12980
void SendServDisconnect(const std::string &token, int reason);
81+
13082
void PduHandler(ProxyPdu &pdu);
13183

13284
int _allowed_subsystems;
@@ -149,9 +101,7 @@ namespace http {
149101
int timeout_;
150102
bool b_Connected;
151103

152-
void WriteThread();
153-
boost::thread *m_writeThread;
154-
BlockingQueue<ProxyPdu *> writeQ;
104+
boost::lockfree::queue<ProxyPdu *> writeQ;
155105
};
156106

157107
class CProxyManager {

0 commit comments

Comments
 (0)