Permalink
Browse files

websocket, feat: hold the process while websocket is not disconnected.

  • Loading branch information...
xicilion committed Nov 28, 2017
1 parent c73b24c commit 38bdf89608b1a0edcdcf5c955b26bfed6497fa61
Showing with 111 additions and 38 deletions.
  1. +2 −2 fibjs/include/WebSocket.h
  2. +21 −7 fibjs/src/websocket/WebSocket.cpp
  3. +8 −0 test/process/exec18.js
  4. +14 −0 test/process/exec19.js
  5. +63 −29 test/process_test.js
  6. +3 −0 test/ws_test.js
@@ -48,7 +48,7 @@ class WebSocket : public WebSocket_base {
// object_base
virtual result_t onEventChange(v8::Local<v8::Function> func, exlib::string ev, exlib::string type)
{
startRecv();
startRecv(holder());
return 0;
}
@@ -76,7 +76,7 @@ class WebSocket : public WebSocket_base {
EVENT_FUNC(error);
public:
void startRecv();
void startRecv(Isolate* isolate);
void endConnect(int32_t code, exlib::string reason);
void endConnect(SeekableStream_base* body);
void enableCompress();
@@ -177,14 +177,20 @@ result_t WebSocket_base::_new(exlib::string url, exlib::string protocol, exlib::
{
class asyncConnect : public AsyncState {
public:
asyncConnect(WebSocket* pThis)
asyncConnect(WebSocket* pThis, Isolate* isolate)
: AsyncState(NULL)
, m_this(pThis)
{
m_isolate = Isolate::current();
m_isolate = isolate;
m_isolate->Ref();
set(handshake);
}
~asyncConnect()
{
m_isolate->Unref();
}
virtual Isolate* isolate()
{
assert(m_isolate);
@@ -303,7 +309,7 @@ result_t WebSocket_base::_new(exlib::string url, exlib::string protocol, exlib::
pThis->m_this->m_readyState = ws_base::_OPEN;
pThis->m_this->_emit("open", NULL, 0);
pThis->m_this->startRecv();
pThis->m_this->startRecv(pThis->m_isolate);
return pThis->done(0);
}
@@ -325,24 +331,31 @@ result_t WebSocket_base::_new(exlib::string url, exlib::string protocol, exlib::
obj_ptr<WebSocket> sock = new WebSocket(url, protocol, origin);
sock->wrap(This);
(new asyncConnect(sock))->apost(0);
(new asyncConnect(sock, sock->holder()))->apost(0);
retVal = sock;
return 0;
}
void WebSocket::startRecv()
void WebSocket::startRecv(Isolate* isolate)
{
class asyncRead : public AsyncState {
public:
asyncRead(WebSocket* pThis)
asyncRead(WebSocket* pThis, Isolate* isolate)
: AsyncState(NULL)
, m_this(pThis)
{
m_isolate = isolate;
m_isolate->Ref();
set(recv);
}
~asyncRead()
{
m_isolate->Unref();
}
static int32_t recv(AsyncState* pState, int32_t n)
{
asyncRead* pThis = (asyncRead*)pState;
@@ -407,12 +420,13 @@ void WebSocket::startRecv()
}
private:
Isolate* m_isolate;
obj_ptr<WebSocket> m_this;
obj_ptr<WebSocketMessage> m_msg;
};
if (m_stream && m_readState.xchg(ws_base::_OPEN) != ws_base::_OPEN)
(new asyncRead(this))->apost(0);
(new asyncRead(this, isolate))->apost(0);
}
void WebSocket::endConnect(int32_t code, exlib::string reason)
View
@@ -0,0 +1,8 @@
var ws = require('ws');
process.exitCode = 18;
var conn = new ws.Socket("ws://999.99.999.999/not_exists");
conn.onerror = e => {
console.log(1800);
}
View
@@ -0,0 +1,14 @@
var ws = require('ws');
process.exitCode = 19;
var conn = new ws.Socket("ws://127.0.0.1:8899/ws");
conn.onopen = () => {
conn.send('hello');
}
conn.onmessage = function (msg) {
console.log(1900);
conn.close();
httpd.stop();
};
View
@@ -5,6 +5,8 @@ var process = require('process');
var coroutine = require("coroutine");
var path = require('path');
var json = require('json');
var ws = require('ws');
var http = require('http');
var cmd;
var s;
@@ -14,6 +16,14 @@ describe('process', () => {
cmd = process.execPath;
});
var ss = [];
after(() => {
ss.forEach((s) => {
s.close();
});
});
it("hrtime", () => {
var start = process.hrtime();
assert.isArray(start);
@@ -71,40 +81,64 @@ describe('process', () => {
});
});
it("multi fiber", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec7.js')]);
assert.equal(p.readLine(), "100");
assert.equal(p.wait(), 7);
});
describe('process holding', () => {
it("multi fiber", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec7.js')]);
assert.equal(p.readLine(), "100");
assert.equal(p.wait(), 7);
});
it("pendding async", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec8.js')]);
assert.equal(p.readLine(), "200");
assert.equal(p.wait(), 8);
});
it("pendding callback", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec8.js')]);
assert.equal(p.readLine(), "200");
assert.equal(p.wait(), 8);
});
it("setTimeout", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec9.js')]);
assert.equal(p.readLine(), "300");
assert.equal(p.wait(), 9);
});
it("setTimeout", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec9.js')]);
assert.equal(p.readLine(), "300");
assert.equal(p.wait(), 9);
});
it("setInterval", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec10.js')]);
assert.equal(p.readLine(), "400");
assert.equal(p.wait(), 10);
});
it("setInterval", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec10.js')]);
assert.equal(p.readLine(), "400");
assert.equal(p.wait(), 10);
});
it("setImmediate", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec11.js')]);
assert.equal(p.readLine(), "500");
assert.equal(p.wait(), 11);
});
it("setImmediate", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec11.js')]);
assert.equal(p.readLine(), "500");
assert.equal(p.wait(), 11);
});
it("bugfix: multi fiber async", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec12.js')]);
assert.equal(p.readLine(), "600");
assert.equal(p.wait(), 12);
it("websocket connect", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec18.js')]);
assert.equal(p.readLine(), "1800");
assert.equal(p.wait(), 18);
});
it("websocket disconnect", () => {
var httpd = new http.Server(8899, {
"/ws": ws.upgrade((s) => {
s.onmessage = function (msg) {
s.send(msg);
};
})
});
ss.push(httpd.socket);
httpd.run(() => {});
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec19.js')]);
assert.equal(p.readLine(), "1900");
assert.equal(p.wait(), 19);
});
it("bugfix: multi fiber async", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec12.js')]);
assert.equal(p.readLine(), "600");
assert.equal(p.wait(), 12);
});
});
it("start", () => {
View
@@ -337,6 +337,7 @@ describe('ws', () => {
assert.equal(rep.statusCode, 101);
assert.equal(rep.firstHeader("Sec-WebSocket-Extensions"), "permessage-deflate");
rep.socket.close();
var rep = http.get("http://127.0.0.1:" + (8813 + base_port) + "/ws", {
headers: {
@@ -350,6 +351,7 @@ describe('ws', () => {
assert.equal(rep.statusCode, 101);
assert.equal(rep.firstHeader("Sec-WebSocket-Extensions"), "permessage-deflate");
rep.socket.close();
});
});
@@ -690,6 +692,7 @@ describe('ws', () => {
ev.wait();
console.timeEnd('ws');
s.close();
});
describe('onerror', () => {

0 comments on commit 38bdf89

Please sign in to comment.