Skip to content

Commit

Permalink
websocket, refactor: refactor asyncSend lock system.
Browse files Browse the repository at this point in the history
  • Loading branch information
xicilion committed Oct 19, 2019
1 parent 9ef4533 commit 30bd406
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 34 deletions.
10 changes: 0 additions & 10 deletions fibjs/include/AsyncCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,6 @@ class AsyncState : public AsyncEvent {
apost(0);
}

result_t lock(exlib::Locker& l)
{
return l.lock(this) ? 0 : CALL_E_PENDDING;
}

void unlock(exlib::Locker& l)
{
l.unlock(this);
}

private:
AsyncEvent* m_ac;
bool m_bAsyncState;
Expand Down
50 changes: 26 additions & 24 deletions fibjs/src/websocket/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class asyncSend : public AsyncState {
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
m_msg->cc_write(_data);

start();
next(start);
}

asyncSend(WebSocket* pThis, Buffer_base* data, int32_t type = ws_base::_BINARY)
Expand All @@ -48,7 +48,7 @@ class asyncSend : public AsyncState {
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
m_msg->cc_write(data);

start();
next(start);
}

asyncSend(WebSocket* pThis, int32_t code, exlib::string reason)
Expand All @@ -66,7 +66,7 @@ class asyncSend : public AsyncState {
m_msg = new WebSocketMessage(ws_base::_CLOSE, m_this->m_masked, m_this->m_compress, 0);
m_msg->cc_write(data);

start();
next(start);
}

asyncSend(WebSocket* pThis, SeekableStream_base* body, int32_t type)
Expand All @@ -78,27 +78,26 @@ class asyncSend : public AsyncState {
if (body)
m_msg->set_body(body);

start();
next(start);
}

~asyncSend()
{
unlock(m_this->m_lockSend);
m_this->m_lockSend.unlock(this);
}

void start()
static int32_t start(AsyncState* pState, int32_t n)
{
next(lock_buffer_for_encode);
if (m_this->m_lockEncode.lock(this))
apost(0);
asyncSend* pThis = (asyncSend*)pState;

return pThis->lock(pThis->m_this->m_lockEncode, pThis->next(lock_buffer_for_encode));
}

static int32_t lock_buffer_for_encode(AsyncState* pState, int32_t n)
{
asyncSend* pThis = (asyncSend*)pState;

pThis->next(encode);
return pThis->lock(pThis->m_this->m_lockBuffer);
return pThis->lock(pThis->m_this->m_lockBuffer, pThis->next(encode));
}

static int32_t encode(AsyncState* pState, int32_t n)
Expand All @@ -114,19 +113,17 @@ class asyncSend : public AsyncState {
static int32_t encode_ok(AsyncState* pState, int32_t n)
{
asyncSend* pThis = (asyncSend*)pState;
pThis->unlock(pThis->m_this->m_lockBuffer);
pThis->unlock(pThis->m_this->m_lockEncode);
pThis->m_this->m_lockBuffer.unlock(pThis);
pThis->m_this->m_lockEncode.unlock(pThis);

pThis->next(lock_buffer_for_send);
return pThis->lock(pThis->m_this->m_lockSend);
return pThis->lock(pThis->m_this->m_lockSend, pThis->next(lock_buffer_for_send));
}

static int32_t lock_buffer_for_send(AsyncState* pState, int32_t n)
{
asyncSend* pThis = (asyncSend*)pState;

pThis->next(send);
return pThis->lock(pThis->m_this->m_lockBuffer);
return pThis->lock(pThis->m_this->m_lockBuffer, pThis->next(send));
}

static int32_t send(AsyncState* pState, int32_t n)
Expand All @@ -135,7 +132,7 @@ class asyncSend : public AsyncState {

pThis->m_buffer = pThis->m_this->m_buffer;
pThis->m_this->m_buffer.Release();
pThis->unlock(pThis->m_this->m_lockBuffer);
pThis->m_this->m_lockBuffer.unlock(pThis);

if (!pThis->m_buffer)
return pThis->next(ok);
Expand All @@ -150,8 +147,8 @@ class asyncSend : public AsyncState {

if (pThis->m_type == ws_base::_CLOSE) {
obj_ptr<SeekableStream_base> body;
pThis->m_msg->get_body(body);

pThis->m_msg->get_body(body);
pThis->m_this->endConnect(body);
}
return pThis->next();
Expand All @@ -163,6 +160,11 @@ class asyncSend : public AsyncState {
return v;
}

result_t lock(exlib::Locker& l, AsyncState* pThis)
{
return l.lock(pThis) ? 0 : CALL_E_PENDDING;
}

private:
obj_ptr<WebSocketMessage> m_msg;
obj_ptr<WebSocket> m_this;
Expand Down Expand Up @@ -410,15 +412,15 @@ void WebSocket::startRecv(Isolate* isolate)
case ws_base::_PING: {
obj_ptr<SeekableStream_base> body;
pThis->m_msg->get_body(body);
new asyncSend(pThis->m_this, body, ws_base::_PONG);
(new asyncSend(pThis->m_this, body, ws_base::_PONG))->post(0);
break;
}
case ws_base::_CLOSE: {
obj_ptr<SeekableStream_base> body;
pThis->m_msg->get_body(body);

if (pThis->m_this->m_closeState.CompareAndSwap(ws_base::_OPEN, ws_base::_CLOSING) == ws_base::_OPEN)
new asyncSend(pThis->m_this, body, ws_base::_CLOSE);
(new asyncSend(pThis->m_this, body, ws_base::_CLOSE))->post(0);
else
pThis->m_this->endConnect(body);

Expand Down Expand Up @@ -549,7 +551,7 @@ result_t WebSocket::close(int32_t code, exlib::string reason)
if (m_readyState.CompareAndSwap(ws_base::_OPEN, ws_base::_CLOSING) != ws_base::_OPEN)
return 0;

new asyncSend(this, code, reason);
(new asyncSend(this, code, reason))->post(0);
return 0;
}

Expand All @@ -564,7 +566,7 @@ result_t WebSocket::send(exlib::string data)
return CHECK_ERROR(Runtime::setError("websocket: WebSocket is in CLOSED state."));
}

new asyncSend(this, data);
(new asyncSend(this, data))->post(0);
return 0;
}

Expand All @@ -579,7 +581,7 @@ result_t WebSocket::send(Buffer_base* data)
return CHECK_ERROR(Runtime::setError("websocket: WebSocket is in CLOSED state."));
}

new asyncSend(this, data);
(new asyncSend(this, data))->post(0);
return 0;
}

Expand Down

0 comments on commit 30bd406

Please sign in to comment.