Permalink
Browse files

websocket, feat: Send multiple websocket packets together.

  • Loading branch information...
xicilion committed Oct 24, 2017
1 parent d69b32c commit dbff1aa286d42261408620c6438c66f925c786f6
@@ -261,10 +261,9 @@ class AsyncState : public AsyncEvent {
apost(0);
}
void lock(exlib::Locker& l)
result_t lock(exlib::Locker& l)
{
if (l.lock(this))
apost(0);
return l.lock(this) ? 0 : CALL_E_PENDDING;
}
void unlock(exlib::Locker& l)
@@ -73,7 +73,10 @@ class WebSocket : public WebSocket_base {
public:
obj_ptr<Stream_base> m_stream;
AsyncEvent* m_ac;
exlib::Locker m_sendLocker;
obj_ptr<SeekableStream_base> m_buffer;
exlib::Locker m_lockBuffer;
exlib::Locker m_lockSend;
exlib::string m_url;
exlib::string m_protocol;
@@ -14,6 +14,7 @@
#include "Map.h"
#include <mbedtls/mbedtls/sha1.h>
#include "encoding.h"
#include "MemoryStream.h"
#include <stdlib.h>
namespace fibjs {
@@ -23,59 +24,118 @@ class asyncSend : public AsyncState {
asyncSend(WebSocket* pThis, exlib::string data, int32_t type = ws_base::_TEXT)
: AsyncState(NULL)
, m_this(pThis)
, m_end(false)
, m_type(type)
{
obj_ptr<Buffer_base> _data = new Buffer(data);
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
m_msg->cc_write(_data);
set(send);
lock(m_this->m_sendLocker);
start();
}
asyncSend(WebSocket* pThis, Buffer_base* data, int32_t type = ws_base::_BINARY)
: AsyncState(NULL)
, m_this(pThis)
, m_end(false)
, m_type(type)
{
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
m_msg->cc_write(data);
set(send);
lock(m_this->m_sendLocker);
start();
}
asyncSend(WebSocket* pThis, SeekableStream_base* body, int32_t type, bool end = false)
asyncSend(WebSocket* pThis, int32_t code, exlib::string reason)
: AsyncState(NULL)
, m_this(pThis)
, m_end(end)
, m_type(ws_base::_CLOSE)
{
exlib::string buf = " " + reason;
buf[0] = (code >> 8) & 255;
buf[1] = code & 255;
obj_ptr<Buffer_base> data = new Buffer(buf);
m_msg = new WebSocketMessage(ws_base::_CLOSE, m_this->m_masked, m_this->m_compress, 0);
m_msg->cc_write(data);
start();
}
asyncSend(WebSocket* pThis, SeekableStream_base* body, int32_t type)
: AsyncState(NULL)
, m_this(pThis)
, m_type(type)
{
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
if (body)
m_msg->set_body(body);
set(send);
lock(m_this->m_sendLocker);
start();
}
~asyncSend()
{
unlock(m_this->m_sendLocker);
unlock(m_this->m_lockSend);
}
void start()
{
set(encode);
if (m_this->m_lockBuffer.lock(this))
apost(0);
}
static int32_t encode(AsyncState* pState, int32_t n)
{
asyncSend* pThis = (asyncSend*)pState;
if (!pThis->m_this->m_buffer)
pThis->m_this->m_buffer = new MemoryStream();
pThis->set(encode_ok);
return pThis->m_msg->sendTo(pThis->m_this->m_buffer, pThis);
}
static int32_t encode_ok(AsyncState* pState, int32_t n)
{
asyncSend* pThis = (asyncSend*)pState;
pThis->unlock(pThis->m_this->m_lockBuffer);
pThis->set(lock_buffer);
return pThis->lock(pThis->m_this->m_lockSend);
}
static int32_t lock_buffer(AsyncState* pState, int32_t n)
{
asyncSend* pThis = (asyncSend*)pState;
pThis->set(send);
return pThis->lock(pThis->m_this->m_lockBuffer);
}
static int32_t send(AsyncState* pState, int32_t n)
{
asyncSend* pThis = (asyncSend*)pState;
static int32_t cnt;
pThis->m_buffer = pThis->m_this->m_buffer;
pThis->m_this->m_buffer.Release();
pThis->unlock(pThis->m_this->m_lockBuffer);
pThis->set(ok);
return pThis->m_msg->sendTo(pThis->m_this->m_stream, pThis);
if (!pThis->m_buffer)
return 0;
pThis->m_buffer->rewind();
return pThis->m_buffer->copyTo(pThis->m_this->m_stream, -1, pThis->m_size, pThis);
}
static int32_t ok(AsyncState* pState, int32_t n)
{
asyncSend* pThis = (asyncSend*)pState;
if (pThis->m_end) {
if (pThis->m_type == ws_base::_CLOSE) {
obj_ptr<SeekableStream_base> body;
pThis->m_msg->get_body(body);
@@ -93,7 +153,9 @@ class asyncSend : public AsyncState {
private:
obj_ptr<WebSocketMessage> m_msg;
obj_ptr<WebSocket> m_this;
bool m_end;
obj_ptr<SeekableStream_base> m_buffer;
int32_t m_type;
int64_t m_size;
};
result_t WebSocket_base::_new(exlib::string url, exlib::string protocol, exlib::string origin,
@@ -303,9 +365,8 @@ void WebSocket::startRecv()
obj_ptr<SeekableStream_base> body;
pThis->m_msg->get_body(body);
if (pThis->m_this->m_readyState.CompareAndSwap(ws_base::_OPEN, ws_base::_CLOSING)
== ws_base::_OPEN)
new asyncSend(pThis->m_this, body, ws_base::_CLOSE, true);
if (pThis->m_this->m_readyState.CompareAndSwap(ws_base::_OPEN, ws_base::_CLOSING) == ws_base::_OPEN)
new asyncSend(pThis->m_this, body, ws_base::_CLOSE);
else
pThis->m_this->endConnect(body);
@@ -428,12 +489,7 @@ result_t WebSocket::close(int32_t code, exlib::string reason)
if (code != 1000 && (code < 3000 || code > 4999))
return CHECK_ERROR(Runtime::setError("websocket: The code must be either 1000, or between 3000 and 4999."));
exlib::string buf = " " + reason;
buf[0] = (code >> 8) & 255;
buf[1] = code & 255;
new asyncSend(this, buf, ws_base::_CLOSE);
new asyncSend(this, code, reason);
return 0;
}
@@ -204,8 +204,6 @@ result_t WebSocketMessage::sendTo(Stream_base* stm, AsyncEvent* ac)
{
m_pThis->get_body(m_body);
m_ms = new MemoryStream();
if (m_pThis->m_compress) {
m_zip = new MemoryStream();
set(deflate);
@@ -281,31 +279,20 @@ result_t WebSocketMessage::sendTo(Stream_base* stm, AsyncEvent* ac)
pThis->m_buffer = new Buffer((const char*)buf, pos);
pThis->set(sendData);
return pThis->m_ms->write(pThis->m_buffer, pThis);
return pThis->m_stm->write(pThis->m_buffer, pThis);
}
static int32_t sendData(AsyncState* pState, int32_t n)
{
asyncSendTo* pThis = (asyncSendTo*)pState;
pThis->set(sendToStream);
pThis->done();
pThis->m_zip->rewind();
return copy(pThis->m_zip, pThis->m_ms, pThis->m_size, pThis->m_mask, pThis);
}
static int32_t sendToStream(AsyncState* pState, int32_t n)
{
asyncSendTo* pThis = (asyncSendTo*)pState;
pThis->m_ms->rewind();
pThis->set(NULL);
return io_base::copyStream(pThis->m_ms, pThis->m_stm, -1, pThis->m_size, pThis);
return copy(pThis->m_zip, pThis->m_stm, pThis->m_size, pThis->m_mask, pThis);
}
public:
obj_ptr<SeekableStream_base> m_zip;
obj_ptr<MemoryStream_base> m_ms;
WebSocketMessage* m_pThis;
obj_ptr<Stream_base> m_stm;
obj_ptr<SeekableStream_base> m_body;

0 comments on commit dbff1aa

Please sign in to comment.