Permalink
Browse files

websocket, feat: WebSocketMessage.readFrom support compress message.

  • Loading branch information...
xicilion committed Oct 20, 2017
1 parent 4fe56e6 commit 2e17b82fad6b2845e594c025ea051970356771f1
@@ -15,8 +15,9 @@ namespace fibjs {
class WebSocketMessage : public WebSocketMessage_base {
public:
WebSocketMessage(int32_t type, bool masked, int32_t maxSize, bool bRep = false)
WebSocketMessage(int32_t type, bool masked, bool compress, int32_t maxSize, bool bRep = false)
: m_masked(masked)
, m_compress(compress)
, m_maxSize(maxSize)
, m_error(0)
, m_bRep(bRep)
@@ -56,6 +57,8 @@ class WebSocketMessage : public WebSocketMessage_base {
// WebSocketMessage_base
virtual result_t get_masked(bool& retVal);
virtual result_t set_masked(bool newVal);
virtual result_t get_compress(bool& retVal);
virtual result_t set_compress(bool newVal);
virtual result_t get_maxSize(int32_t& retVal);
virtual result_t set_maxSize(int32_t newVal);
@@ -65,6 +68,7 @@ class WebSocketMessage : public WebSocketMessage_base {
public:
obj_ptr<Stream_base> m_stm;
bool m_masked;
bool m_compress;
int32_t m_maxSize;
int32_t m_error;
@@ -25,9 +25,11 @@ class WebSocketMessage_base : public Message_base {
public:
// WebSocketMessage_base
static result_t _new(int32_t type, bool masked, int32_t maxSize, obj_ptr<WebSocketMessage_base>& retVal, v8::Local<v8::Object> This = v8::Local<v8::Object>());
static result_t _new(int32_t type, bool masked, bool compress, int32_t maxSize, obj_ptr<WebSocketMessage_base>& retVal, v8::Local<v8::Object> This = v8::Local<v8::Object>());
virtual result_t get_masked(bool& retVal) = 0;
virtual result_t set_masked(bool newVal) = 0;
virtual result_t get_compress(bool& retVal) = 0;
virtual result_t set_compress(bool newVal) = 0;
virtual result_t get_maxSize(int32_t& retVal) = 0;
virtual result_t set_maxSize(int32_t newVal) = 0;
@@ -39,6 +41,8 @@ class WebSocketMessage_base : public Message_base {
static void s__new(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_get_masked(v8::Local<v8::String> property, const v8::PropertyCallbackInfo<v8::Value>& args);
static void s_set_masked(v8::Local<v8::String> property, v8::Local<v8::Value> value, const v8::PropertyCallbackInfo<void>& args);
static void s_get_compress(v8::Local<v8::String> property, const v8::PropertyCallbackInfo<v8::Value>& args);
static void s_set_compress(v8::Local<v8::String> property, v8::Local<v8::Value> value, const v8::PropertyCallbackInfo<void>& args);
static void s_get_maxSize(v8::Local<v8::String> property, const v8::PropertyCallbackInfo<v8::Value>& args);
static void s_set_maxSize(v8::Local<v8::String> property, v8::Local<v8::Value> value, const v8::PropertyCallbackInfo<void>& args);
};
@@ -51,6 +55,7 @@ inline ClassInfo& WebSocketMessage_base::class_info()
{
static ClassData::ClassProperty s_property[] = {
{ "masked", s_get_masked, s_set_masked, false },
{ "compress", s_get_compress, s_set_compress, false },
{ "maxSize", s_get_maxSize, s_set_maxSize, false }
};
@@ -78,13 +83,14 @@ void WebSocketMessage_base::__new(const T& args)
METHOD_NAME("new WebSocketMessage()");
CONSTRUCT_ENTER();
METHOD_OVER(3, 0);
METHOD_OVER(4, 0);
OPT_ARG(int32_t, 0, ws_base::_BINARY);
OPT_ARG(bool, 1, true);
OPT_ARG(int32_t, 2, 67108864);
OPT_ARG(bool, 2, false);
OPT_ARG(int32_t, 3, 67108864);
hr = _new(v0, v1, v2, vr, args.This());
hr = _new(v0, v1, v2, v3, vr, args.This());
CONSTRUCT_RETURN();
}
@@ -114,6 +120,31 @@ inline void WebSocketMessage_base::s_set_masked(v8::Local<v8::String> property,
PROPERTY_SET_LEAVE();
}
inline void WebSocketMessage_base::s_get_compress(v8::Local<v8::String> property, const v8::PropertyCallbackInfo<v8::Value>& args)
{
bool vr;
METHOD_NAME("WebSocketMessage.compress");
METHOD_INSTANCE(WebSocketMessage_base);
PROPERTY_ENTER();
hr = pInst->get_compress(vr);
METHOD_RETURN();
}
inline void WebSocketMessage_base::s_set_compress(v8::Local<v8::String> property, v8::Local<v8::Value> value, const v8::PropertyCallbackInfo<void>& args)
{
METHOD_NAME("WebSocketMessage.compress");
METHOD_INSTANCE(WebSocketMessage_base);
PROPERTY_ENTER();
PROPERTY_VAL(bool);
hr = pInst->set_compress(v0);
PROPERTY_SET_LEAVE();
}
inline void WebSocketMessage_base::s_get_maxSize(v8::Local<v8::String> property, const v8::PropertyCallbackInfo<v8::Value>& args)
{
int32_t vr;
@@ -26,7 +26,7 @@ class asyncSend : public AsyncState {
, m_end(false)
{
m_data = new Buffer(data);
m_msg = new WebSocketMessage(type, m_this->m_masked, 0);
m_msg = new WebSocketMessage(type, m_this->m_masked, false, 0);
set(fill);
}
@@ -37,7 +37,7 @@ class asyncSend : public AsyncState {
, m_end(false)
{
m_data = new Buffer(data);
m_msg = new WebSocketMessage(type, m_this->m_masked, 0);
m_msg = new WebSocketMessage(type, m_this->m_masked, false, 0);
set(fill);
}
@@ -47,7 +47,7 @@ class asyncSend : public AsyncState {
, m_this(pThis)
, m_end(end)
{
m_msg = new WebSocketMessage(type, m_this->m_masked, 0);
m_msg = new WebSocketMessage(type, m_this->m_masked, false, 0);
if (body)
m_msg->set_body(body);
@@ -264,7 +264,7 @@ void WebSocket::startRecv()
{
asyncRead* pThis = (asyncRead*)pState;
pThis->m_msg = new WebSocketMessage(ws_base::_TEXT, false, pThis->m_this->m_maxSize);
pThis->m_msg = new WebSocketMessage(ws_base::_TEXT, false, false, pThis->m_this->m_maxSize);
pThis->set(event);
return pThis->m_msg->readFrom(pThis->m_this->m_stream, pThis);
@@ -7,17 +7,18 @@
#include "object.h"
#include "ifs/io.h"
#include "ifs/zlib.h"
#include "WebSocketMessage.h"
#include "Buffer.h"
#include "MemoryStream.h"
namespace fibjs {
result_t WebSocketMessage_base::_new(int32_t type, bool masked, int32_t maxSize,
result_t WebSocketMessage_base::_new(int32_t type, bool masked, bool compress, int32_t maxSize,
obj_ptr<WebSocketMessage_base>& retVal,
v8::Local<v8::Object> This)
{
retVal = new WebSocketMessage(type, masked, maxSize);
retVal = new WebSocketMessage(type, masked, compress, maxSize);
return 0;
}
@@ -316,6 +317,7 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
, m_mask(0)
{
m_pThis->get_body(m_body);
m_stm1 = m_body;
set(head);
}
@@ -344,7 +346,10 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
pThis->m_buffer.Release();
ch = strBuffer[0];
if (ch & 0x70) {
if (ch & 0x40) {
pThis->m_stm1 = new MemoryStream();
pThis->m_pThis->m_compress = true;
} else if (ch & 0x70) {
pThis->m_pThis->m_error = 1007;
return CHECK_ERROR(Runtime::setError("WebSocketMessage: non-zero RSV values."));
}
@@ -383,7 +388,7 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
return pThis->m_stm->read(sz, pThis->m_buffer, pThis);
}
pThis->set(body);
pThis->set(copy);
return 0;
}
@@ -413,11 +418,11 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
if (pThis->m_masked)
memcpy(&pThis->m_mask, &strBuffer[pos], 4);
pThis->set(body);
pThis->set(copy);
return 0;
}
static int32_t body(AsyncState* pState, int32_t n)
static int32_t copy(AsyncState* pState, int32_t n)
{
asyncReadFrom* pThis = (asyncReadFrom*)pState;
@@ -426,11 +431,11 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
return CHECK_ERROR(Runtime::setError("WebSocketMessage: Message Too Big."));
}
pThis->set(body_end);
return copy(pThis->m_stm, pThis->m_body, pThis->m_size, pThis->m_mask, pThis);
pThis->set(copy_end);
return WebSocketMessage::copy(pThis->m_stm, pThis->m_stm1, pThis->m_size, pThis->m_mask, pThis);
}
static int32_t body_end(AsyncState* pState, int32_t n)
static int32_t copy_end(AsyncState* pState, int32_t n)
{
asyncReadFrom* pThis = (asyncReadFrom*)pState;
@@ -442,13 +447,34 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
return 0;
}
if (pThis->m_pThis->m_compress)
pThis->set(inflate);
else
pThis->set(body_end);
return 0;
}
static int32_t inflate(AsyncState* pState, int32_t n)
{
asyncReadFrom* pThis = (asyncReadFrom*)pState;
pThis->set(body_end);
pThis->m_stm1->rewind();
return zlib_base::inflateRawTo(pThis->m_stm1, pThis->m_body, pThis);
}
static int32_t body_end(AsyncState* pState, int32_t n)
{
asyncReadFrom* pThis = (asyncReadFrom*)pState;
pThis->m_body->rewind();
return pThis->done();
}
public:
WebSocketMessage* m_pThis;
obj_ptr<Stream_base> m_stm;
obj_ptr<SeekableStream_base> m_stm1;
obj_ptr<SeekableStream_base> m_body;
obj_ptr<Buffer_base> m_buffer;
bool m_fin;
@@ -520,6 +546,18 @@ result_t WebSocketMessage::set_masked(bool newVal)
return 0;
}
result_t WebSocketMessage::get_compress(bool& retVal)
{
retVal = m_compress;
return 0;
}
result_t WebSocketMessage::set_compress(bool newVal)
{
m_compress = newVal;
return 0;
}
result_t WebSocketMessage::get_maxSize(int32_t& retVal)
{
retVal = m_maxSize;
@@ -12,13 +12,17 @@ interface WebSocketMessage : Message
/*! @brief 包处理消息对象构造函数
@param type websocket 消息类型,缺省为 websocket.BINARY
@param masked websocket 消息掩码,缺省为 true
@param compress 标记消息是否压缩,缺省为 false
@param maxSize 最大包尺寸,以 MB 为单位,缺省为 67108864(64M)
*/
WebSocketMessage(Integer type = ws.BINARY, Boolean masked = true, Integer maxSize = 67108864);
WebSocketMessage(Integer type = ws.BINARY, Boolean masked = true, Boolean compress = false, Integer maxSize = 67108864);
/*! @brief 查询和读取 websocket 掩码标记,缺省为 true */
Boolean masked;
/*! @brief 查询和读取 websocket 压缩状态,缺省为 false */
Boolean compress;
/*! @brief 查询和设置最大包尺寸,以字节为单位,缺省为 67108864(64M) */
Integer maxSize;
};
Oops, something went wrong.

0 comments on commit 2e17b82

Please sign in to comment.