Permalink
Browse files

websocket, feat: use locker to keep the message sequence.

  • Loading branch information...
xicilion committed Oct 24, 2017
1 parent 344f0ae commit d69b32c6ec570dc9195e18df9f3d3bec3b21f3f5
Showing with 44 additions and 19 deletions.
  1. +3 −2 fibjs/include/AsyncCall.h
  2. +1 −0 fibjs/include/WebSocket.h
  3. +16 −16 fibjs/src/websocket/WebSocket.cpp
  4. +23 −0 test/ws_test.js
  5. +1 −1 vender
@@ -261,9 +261,10 @@ class AsyncState : public AsyncEvent {
apost(0);
}
result_t lock(exlib::Locker& l)
void lock(exlib::Locker& l)
{
return l.lock(this) ? 0 : CALL_E_PENDDING;
if (l.lock(this))
apost(0);
}
void unlock(exlib::Locker& l)
@@ -73,6 +73,7 @@ class WebSocket : public WebSocket_base {
public:
obj_ptr<Stream_base> m_stream;
AsyncEvent* m_ac;
exlib::Locker m_sendLocker;
exlib::string m_url;
exlib::string m_protocol;
@@ -25,21 +25,24 @@ class asyncSend : public AsyncState {
, m_this(pThis)
, m_end(false)
{
m_data = new Buffer(data);
obj_ptr<Buffer_base> _data = new Buffer(data);
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
m_msg->cc_write(_data);
set(fill);
set(send);
lock(m_this->m_sendLocker);
}
asyncSend(WebSocket* pThis, Buffer_base* data, int32_t type = ws_base::_BINARY)
: AsyncState(NULL)
, m_this(pThis)
, m_end(false)
{
m_data = new Buffer(data);
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
m_msg->cc_write(data);
set(fill);
set(send);
lock(m_this->m_sendLocker);
}
asyncSend(WebSocket* pThis, SeekableStream_base* body, int32_t type, bool end = false)
@@ -52,14 +55,12 @@ class asyncSend : public AsyncState {
m_msg->set_body(body);
set(send);
lock(m_this->m_sendLocker);
}
static int32_t fill(AsyncState* pState, int32_t n)
~asyncSend()
{
asyncSend* pThis = (asyncSend*)pState;
pThis->set(send);
return pThis->m_msg->write(pThis->m_data, pThis);
unlock(m_this->m_sendLocker);
}
static int32_t send(AsyncState* pState, int32_t n)
@@ -91,7 +92,6 @@ class asyncSend : public AsyncState {
private:
obj_ptr<WebSocketMessage> m_msg;
obj_ptr<Buffer_base> m_data;
obj_ptr<WebSocket> m_this;
bool m_end;
};
@@ -250,7 +250,7 @@ result_t WebSocket_base::_new(exlib::string url, exlib::string protocol, exlib::
obj_ptr<WebSocket> sock = new WebSocket(url, protocol, origin);
sock->wrap(This);
(new asyncConnect(sock))->post(0);
(new asyncConnect(sock))->apost(0);
retVal = sock;
@@ -296,7 +296,7 @@ void WebSocket::startRecv()
case ws_base::_PING: {
obj_ptr<SeekableStream_base> body;
pThis->m_msg->get_body(body);
(new asyncSend(pThis->m_this, body, ws_base::_PONG))->post(0);
new asyncSend(pThis->m_this, body, ws_base::_PONG);
break;
}
case ws_base::_CLOSE: {
@@ -305,7 +305,7 @@ void WebSocket::startRecv()
if (pThis->m_this->m_readyState.CompareAndSwap(ws_base::_OPEN, ws_base::_CLOSING)
== ws_base::_OPEN)
(new asyncSend(pThis->m_this, body, ws_base::_CLOSE, true))->post(0);
new asyncSend(pThis->m_this, body, ws_base::_CLOSE, true);
else
pThis->m_this->endConnect(body);
@@ -433,7 +433,7 @@ result_t WebSocket::close(int32_t code, exlib::string reason)
buf[0] = (code >> 8) & 255;
buf[1] = code & 255;
(new asyncSend(this, buf, ws_base::_CLOSE))->post(0);
new asyncSend(this, buf, ws_base::_CLOSE);
return 0;
}
@@ -442,7 +442,7 @@ result_t WebSocket::send(exlib::string data)
if (m_readyState != ws_base::_OPEN)
return CHECK_ERROR(CALL_E_INVALID_CALL);
(new asyncSend(this, data))->post(0);
new asyncSend(this, data);
return 0;
}
@@ -451,7 +451,7 @@ result_t WebSocket::send(Buffer_base* data)
if (m_readyState != ws_base::_OPEN)
return CHECK_ERROR(CALL_E_INVALID_CALL);
(new asyncSend(this, data))->post(0);
new asyncSend(this, data);
return 0;
}
}
View
@@ -665,6 +665,29 @@ describe('ws', () => {
assert.equal(s.readyState, ws.CLOSED);
});
it('keep the sequence', () => {
var ev = new coroutine.Event();
var cnt = 1000;
var n = 0;
var s = new ws.Socket("ws://127.0.0.1:" + (8814 + base_port) + "/ws", "test");
s.onopen = () => {
console.time('ws');
for (var i = 0; i < cnt; i++)
s.send(i);
};
s.onmessage = (msg) => {
assert.equal(msg.data, n);
n++;
if (n == cnt)
ev.set();
};
ev.wait();
console.timeEnd('ws');
});
describe('onerror', () => {
it("open", () => {
var t = false;
2 vender
Submodule vender updated 1 files
+4 −4 zmq/zmq.vcxproj

0 comments on commit d69b32c

Please sign in to comment.