Permalink
Browse files

websocket, feat: support compress in WebSocketMessage.sendTo.

  • Loading branch information...
xicilion committed Oct 21, 2017
1 parent 2e17b82 commit e2662049c5f0e57efcc4a95336b02866cc493fb3
Showing with 48 additions and 25 deletions.
  1. +33 −12 fibjs/src/websocket/WebSocketMessage.cpp
  2. +15 −13 test/ws_test.js
@@ -203,12 +203,25 @@ result_t WebSocketMessage::sendTo(Stream_base* stm, AsyncEvent* ac)
, m_mask(0)
{
m_pThis->get_body(m_body);
m_body->rewind();
m_body->size(m_size);
m_ms = new MemoryStream();
set(head);
if (m_pThis->m_compress) {
m_zip = new MemoryStream();
set(deflate);
} else {
m_zip = m_body;
set(head);
}
}
static int32_t deflate(AsyncState* pState, int32_t n)
{
asyncSendTo* pThis = (asyncSendTo*)pState;
pThis->set(head);
pThis->m_body->rewind();
return zlib_base::deflateRawTo(pThis->m_body, pThis->m_zip, zlib_base::_DEFAULT_COMPRESSION, pThis);
}
static int32_t head(AsyncState* pState, int32_t n)
@@ -220,9 +233,15 @@ result_t WebSocketMessage::sendTo(Stream_base* stm, AsyncEvent* ac)
int32_t type;
pThis->m_pThis->get_type(type);
buf[0] = 0x80 | (type & 0x0f);
if (pThis->m_pThis->m_compress)
buf[0] = 0xc0 | (type & 0x0f);
else
buf[0] = 0x80 | (type & 0x0f);
int64_t size;
pThis->m_zip->size(size);
pThis->m_size = size;
int64_t size = pThis->m_size;
if (size < 126) {
buf[1] = (uint8_t)size;
pos = 2;
@@ -270,7 +289,8 @@ result_t WebSocketMessage::sendTo(Stream_base* stm, AsyncEvent* ac)
asyncSendTo* pThis = (asyncSendTo*)pState;
pThis->set(sendToStream);
return copy(pThis->m_body, pThis->m_ms, pThis->m_size, pThis->m_mask, pThis);
pThis->m_zip->rewind();
return copy(pThis->m_zip, pThis->m_ms, pThis->m_size, pThis->m_mask, pThis);
}
static int32_t sendToStream(AsyncState* pState, int32_t n)
@@ -284,6 +304,7 @@ result_t WebSocketMessage::sendTo(Stream_base* stm, AsyncEvent* ac)
}
public:
obj_ptr<SeekableStream_base> m_zip;
obj_ptr<MemoryStream_base> m_ms;
WebSocketMessage* m_pThis;
obj_ptr<Stream_base> m_stm;
@@ -317,7 +338,7 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
, m_mask(0)
{
m_pThis->get_body(m_body);
m_stm1 = m_body;
m_zip = m_body;
set(head);
}
@@ -347,7 +368,7 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
ch = strBuffer[0];
if (ch & 0x40) {
pThis->m_stm1 = new MemoryStream();
pThis->m_zip = new MemoryStream();
pThis->m_pThis->m_compress = true;
} else if (ch & 0x70) {
pThis->m_pThis->m_error = 1007;
@@ -432,7 +453,7 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
}
pThis->set(copy_end);
return WebSocketMessage::copy(pThis->m_stm, pThis->m_stm1, pThis->m_size, pThis->m_mask, pThis);
return WebSocketMessage::copy(pThis->m_stm, pThis->m_zip, pThis->m_size, pThis->m_mask, pThis);
}
static int32_t copy_end(AsyncState* pState, int32_t n)
@@ -459,8 +480,8 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
asyncReadFrom* pThis = (asyncReadFrom*)pState;
pThis->set(body_end);
pThis->m_stm1->rewind();
return zlib_base::inflateRawTo(pThis->m_stm1, pThis->m_body, pThis);
pThis->m_zip->rewind();
return zlib_base::inflateRawTo(pThis->m_zip, pThis->m_body, pThis);
}
static int32_t body_end(AsyncState* pState, int32_t n)
@@ -474,7 +495,7 @@ result_t WebSocketMessage::readFrom(Stream_base* stm, AsyncEvent* ac)
public:
WebSocketMessage* m_pThis;
obj_ptr<Stream_base> m_stm;
obj_ptr<SeekableStream_base> m_stm1;
obj_ptr<SeekableStream_base> m_zip;
obj_ptr<SeekableStream_base> m_body;
obj_ptr<Buffer_base> m_buffer;
bool m_fin;
View
@@ -170,10 +170,11 @@ describe('ws', () => {
});
it("sendTo", () => {
function test_msg(n, masked) {
function test_msg(n, masked, compress) {
var msg = new ws.Message();
msg.type = ws.TEXT;
msg.masked = masked;
msg.compress = compress;
var buf = new Buffer(n);
for (var i = 0; i < n; i++) {
@@ -192,18 +193,19 @@ describe('ws', () => {
assert.equal(msg.body.readAll().toString(), buf.toString());
}
test_msg(10);
test_msg(10, true);
test_msg(100);
test_msg(100, true);
test_msg(125);
test_msg(125, true);
test_msg(126);
test_msg(126, true);
test_msg(65535);
test_msg(65535, true);
test_msg(65536);
test_msg(65536, true);
function test_msg_1(n) {
test_msg(n, false, false);
test_msg(n, true, false);
test_msg(n, false, true);
test_msg(n, true, true);
}
test_msg_1(10);
test_msg_1(100);
test_msg_1(125);
test_msg_1(126);
test_msg_1(65535);
test_msg_1(65536);
});
});

0 comments on commit e266204

Please sign in to comment.