Permalink
Browse files

websocket, feat: support compress handshake in ws.upgrade.

  • Loading branch information...
xicilion committed Oct 21, 2017
1 parent e266204 commit 76efe05aa1c56c3835422a605d3ae4ecc3640e5c
@@ -22,16 +22,18 @@ class WebSocket : public WebSocket_base {
, m_protocol(protocol)
, m_origin(origin)
, m_masked(true)
, m_compress(false)
, m_maxSize(67108864)
, m_readyState(ws_base::_CONNECTING)
{
}
WebSocket(Stream_base* stream, exlib::string protocol, AsyncEvent* ac)
WebSocket(Stream_base* stream, bool compress, exlib::string protocol, AsyncEvent* ac)
: m_stream(stream)
, m_ac(ac)
, m_protocol(protocol)
, m_masked(false)
, m_compress(compress)
, m_maxSize(67108864)
, m_readyState(ws_base::_OPEN)
{
@@ -77,6 +79,7 @@ class WebSocket : public WebSocket_base {
exlib::string m_origin;
bool m_masked;
bool m_compress;
int32_t m_maxSize;
exlib::atomic m_readyState;
@@ -26,7 +26,7 @@ class asyncSend : public AsyncState {
, m_end(false)
{
m_data = new Buffer(data);
m_msg = new WebSocketMessage(type, m_this->m_masked, false, 0);
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
set(fill);
}
@@ -37,7 +37,7 @@ class asyncSend : public AsyncState {
, m_end(false)
{
m_data = new Buffer(data);
m_msg = new WebSocketMessage(type, m_this->m_masked, false, 0);
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
set(fill);
}
@@ -47,7 +47,7 @@ class asyncSend : public AsyncState {
, m_this(pThis)
, m_end(end)
{
m_msg = new WebSocketMessage(type, m_this->m_masked, false, 0);
m_msg = new WebSocketMessage(type, m_this->m_masked, m_this->m_compress, 0);
if (body)
m_msg->set_body(body);
@@ -40,6 +40,7 @@ result_t WebSocketHandler::invoke(object_base* v, obj_ptr<Handler_base>& retVal,
: AsyncState(ac)
, m_pThis(pThis)
, m_httpreq(req)
, m_compress(false)
{
obj_ptr<Message_base> rep;
@@ -108,6 +109,15 @@ result_t WebSocketHandler::invoke(object_base* v, obj_ptr<Handler_base>& retVal,
pThis->m_httprep->addHeader("Upgrade", "websocket");
pThis->m_httprep->set_upgrade(true);
hr = pThis->m_httpreq->firstHeader("Sec-WebSocket-Extensions", v);
if (hr < 0)
return hr;
if (hr != CALL_RETURN_NULL && !qstricmp(v.string().c_str(), "permessage-deflate", 18)) {
pThis->m_httprep->addHeader("Sec-WebSocket-Extensions", "permessage-deflate");
pThis->m_compress = true;
}
pThis->set(accept);
return pThis->m_httprep->sendTo(pThis->m_stm, pThis);
}
@@ -119,7 +129,7 @@ result_t WebSocketHandler::invoke(object_base* v, obj_ptr<Handler_base>& retVal,
pThis->done(CALL_RETURN_NULL);
obj_ptr<WebSocketHandler> pHandler = pThis->m_pThis;
obj_ptr<WebSocket> sock = new WebSocket(pThis->m_stm, "", pThis);
obj_ptr<WebSocket> sock = new WebSocket(pThis->m_stm, pThis->m_compress, "", pThis);
Variant v = sock;
pHandler->_emit("accept", &v, 1);
@@ -134,6 +144,7 @@ result_t WebSocketHandler::invoke(object_base* v, obj_ptr<Handler_base>& retVal,
obj_ptr<HttpRequest_base> m_httpreq;
obj_ptr<HttpResponse_base> m_httprep;
obj_ptr<Stream_base> m_stm;
bool m_compress;
};
if (ac->isSync())
@@ -534,7 +534,7 @@ result_t WebSocketMessage::get_response(obj_ptr<Message_base>& retVal)
if (type == ws_base::_PING)
type = ws_base::_PONG;
m_message->m_response = new WebSocketMessage(type, false, m_maxSize, true);
m_message->m_response = new WebSocketMessage(type, false, false, m_maxSize, true);
}
return m_message->get_response(retVal);
View
@@ -210,27 +210,35 @@ describe('ws', () => {
});
describe('WebSocketHandler', () => {
function connect() {
function connect(compress) {
var headers = {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13"
};
if (compress)
headers["Sec-WebSocket-Extensions"] = "permessage-deflate";
var rep = http.get("http://127.0.0.1:" + (8813 + base_port) + "/ws", {
headers: {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13"
}
headers
});
assert.equal(rep.firstHeader("Sec-WebSocket-Accept"), "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=");
assert.equal(rep.firstHeader("Upgrade"), "websocket");
if (compress)
assert.equal(rep.firstHeader("Sec-WebSocket-Extensions"), "permessage-deflate");
assert.equal(rep.statusCode, 101);
assert.equal(rep.upgrade, true);
return rep.stream;
}
function test_msg(s, n, masked) {
function test_msg(s, n, compress) {
var msg = new ws.Message();
msg.type = ws.TEXT;
msg.masked = masked;
msg.compress = compress;
msg.masked = true;
var buf = new Buffer(n);
for (var i = 0; i < n; i++) {
@@ -244,6 +252,9 @@ describe('ws', () => {
var msg = new ws.Message();
msg.readFrom(s);
if (compress)
assert.isTrue(msg.compress);
assert.equal(msg.body.readAll().toString(), buf.toString());
}
@@ -265,62 +276,107 @@ describe('ws', () => {
});
describe("handshake", () => {
it("echo", () => {
var s = connect();
test_msg(s, 10, true);
test_msg(s, 100, true);
test_msg(s, 125, true);
test_msg(s, 126, true);
test_msg(s, 65535, true);
test_msg(s, 65536, true);
s.stream.close();
});
it("missing Upgrade header.", () => {
var rep = http.get("http://127.0.0.1:" + (8813 + base_port) + "/ws", {
"Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13"
headers: {
"Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13"
}
});
assert.equal(rep.statusCode, 500);
rep.stream.stream.close();
});
it("invalid connection header.", () => {
var rep = http.get("http://127.0.0.1:" + (8813 + base_port) + "/ws", {
"Upgrade": "websocket",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13"
headers: {
"Upgrade": "websocket",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13"
}
});
assert.equal(rep.statusCode, 500);
rep.stream.stream.close();
});
it("missing Sec-WebSocket-Key header.", () => {
var rep = http.get("http://127.0.0.1:" + (8813 + base_port) + "/ws", {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Version": "13"
headers: {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Version": "13"
}
});
assert.equal(rep.statusCode, 500);
rep.stream.stream.close();
});
it("missing Sec-WebSocket-Version header.", () => {
var rep = http.get("http://127.0.0.1:" + (8813 + base_port) + "/ws", {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ=="
headers: {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ=="
}
});
assert.equal(rep.statusCode, 500);
rep.stream.stream.close();
});
it("support Sec-WebSocket-Extensions header.", () => {
var rep = http.get("http://127.0.0.1:" + (8813 + base_port) + "/ws", {
headers: {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13",
"Sec-WebSocket-Extensions": "permessage-deflate"
}
});
assert.equal(rep.statusCode, 101);
assert.equal(rep.firstHeader("Sec-WebSocket-Extensions"), "permessage-deflate");
var rep = http.get("http://127.0.0.1:" + (8813 + base_port) + "/ws", {
headers: {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13",
"Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits"
}
});
assert.equal(rep.statusCode, 101);
assert.equal(rep.firstHeader("Sec-WebSocket-Extensions"), "permessage-deflate");
});
});
it("echo", () => {
var s = connect();
test_msg(s, 10, false);
test_msg(s, 100, false);
test_msg(s, 125, false);
test_msg(s, 126, false);
test_msg(s, 65535, false);
test_msg(s, 65536, false);
s.stream.close();
});
it("echo compress", () => {
var s = connect(true);
test_msg(s, 10, true);
test_msg(s, 100, true);
test_msg(s, 125, true);
test_msg(s, 126, true);
test_msg(s, 65535, true);
test_msg(s, 65536, true);
s.stream.close();
});
it("ping", () => {

0 comments on commit 76efe05

Please sign in to comment.