Skip to content

Commit

Permalink
websocket, feat: hold the process while websocket is not disconnected.
Browse files Browse the repository at this point in the history
  • Loading branch information
xicilion committed Nov 28, 2017
1 parent c73b24c commit 38bdf89
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 38 deletions.
4 changes: 2 additions & 2 deletions fibjs/include/WebSocket.h
Expand Up @@ -48,7 +48,7 @@ class WebSocket : public WebSocket_base {
// object_base
virtual result_t onEventChange(v8::Local<v8::Function> func, exlib::string ev, exlib::string type)
{
startRecv();
startRecv(holder());
return 0;
}

Expand Down Expand Up @@ -76,7 +76,7 @@ class WebSocket : public WebSocket_base {
EVENT_FUNC(error);

public:
void startRecv();
void startRecv(Isolate* isolate);
void endConnect(int32_t code, exlib::string reason);
void endConnect(SeekableStream_base* body);
void enableCompress();
Expand Down
28 changes: 21 additions & 7 deletions fibjs/src/websocket/WebSocket.cpp
Expand Up @@ -177,14 +177,20 @@ result_t WebSocket_base::_new(exlib::string url, exlib::string protocol, exlib::
{
class asyncConnect : public AsyncState {
public:
asyncConnect(WebSocket* pThis)
asyncConnect(WebSocket* pThis, Isolate* isolate)
: AsyncState(NULL)
, m_this(pThis)
{
m_isolate = Isolate::current();
m_isolate = isolate;
m_isolate->Ref();
set(handshake);
}

~asyncConnect()
{
m_isolate->Unref();
}

virtual Isolate* isolate()
{
assert(m_isolate);
Expand Down Expand Up @@ -303,7 +309,7 @@ result_t WebSocket_base::_new(exlib::string url, exlib::string protocol, exlib::
pThis->m_this->m_readyState = ws_base::_OPEN;
pThis->m_this->_emit("open", NULL, 0);

pThis->m_this->startRecv();
pThis->m_this->startRecv(pThis->m_isolate);

return pThis->done(0);
}
Expand All @@ -325,24 +331,31 @@ result_t WebSocket_base::_new(exlib::string url, exlib::string protocol, exlib::
obj_ptr<WebSocket> sock = new WebSocket(url, protocol, origin);
sock->wrap(This);

(new asyncConnect(sock))->apost(0);
(new asyncConnect(sock, sock->holder()))->apost(0);

retVal = sock;

return 0;
}

void WebSocket::startRecv()
void WebSocket::startRecv(Isolate* isolate)
{
class asyncRead : public AsyncState {
public:
asyncRead(WebSocket* pThis)
asyncRead(WebSocket* pThis, Isolate* isolate)
: AsyncState(NULL)
, m_this(pThis)
{
m_isolate = isolate;
m_isolate->Ref();
set(recv);
}

~asyncRead()
{
m_isolate->Unref();
}

static int32_t recv(AsyncState* pState, int32_t n)
{
asyncRead* pThis = (asyncRead*)pState;
Expand Down Expand Up @@ -407,12 +420,13 @@ void WebSocket::startRecv()
}

private:
Isolate* m_isolate;
obj_ptr<WebSocket> m_this;
obj_ptr<WebSocketMessage> m_msg;
};

if (m_stream && m_readState.xchg(ws_base::_OPEN) != ws_base::_OPEN)
(new asyncRead(this))->apost(0);
(new asyncRead(this, isolate))->apost(0);
}

void WebSocket::endConnect(int32_t code, exlib::string reason)
Expand Down
8 changes: 8 additions & 0 deletions test/process/exec18.js
@@ -0,0 +1,8 @@
var ws = require('ws');

process.exitCode = 18;

var conn = new ws.Socket("ws://999.99.999.999/not_exists");
conn.onerror = e => {
console.log(1800);
}
14 changes: 14 additions & 0 deletions test/process/exec19.js
@@ -0,0 +1,14 @@
var ws = require('ws');

process.exitCode = 19;

var conn = new ws.Socket("ws://127.0.0.1:8899/ws");

conn.onopen = () => {
conn.send('hello');
}
conn.onmessage = function (msg) {
console.log(1900);
conn.close();
httpd.stop();
};
92 changes: 63 additions & 29 deletions test/process_test.js
Expand Up @@ -5,6 +5,8 @@ var process = require('process');
var coroutine = require("coroutine");
var path = require('path');
var json = require('json');
var ws = require('ws');
var http = require('http');

var cmd;
var s;
Expand All @@ -14,6 +16,14 @@ describe('process', () => {
cmd = process.execPath;
});

var ss = [];

after(() => {
ss.forEach((s) => {
s.close();
});
});

it("hrtime", () => {
var start = process.hrtime();
assert.isArray(start);
Expand Down Expand Up @@ -71,40 +81,64 @@ describe('process', () => {
});
});

it("multi fiber", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec7.js')]);
assert.equal(p.readLine(), "100");
assert.equal(p.wait(), 7);
});
describe('process holding', () => {
it("multi fiber", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec7.js')]);
assert.equal(p.readLine(), "100");
assert.equal(p.wait(), 7);
});

it("pendding async", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec8.js')]);
assert.equal(p.readLine(), "200");
assert.equal(p.wait(), 8);
});
it("pendding callback", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec8.js')]);
assert.equal(p.readLine(), "200");
assert.equal(p.wait(), 8);
});

it("setTimeout", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec9.js')]);
assert.equal(p.readLine(), "300");
assert.equal(p.wait(), 9);
});
it("setTimeout", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec9.js')]);
assert.equal(p.readLine(), "300");
assert.equal(p.wait(), 9);
});

it("setInterval", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec10.js')]);
assert.equal(p.readLine(), "400");
assert.equal(p.wait(), 10);
});
it("setInterval", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec10.js')]);
assert.equal(p.readLine(), "400");
assert.equal(p.wait(), 10);
});

it("setImmediate", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec11.js')]);
assert.equal(p.readLine(), "500");
assert.equal(p.wait(), 11);
});
it("setImmediate", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec11.js')]);
assert.equal(p.readLine(), "500");
assert.equal(p.wait(), 11);
});

it("bugfix: multi fiber async", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec12.js')]);
assert.equal(p.readLine(), "600");
assert.equal(p.wait(), 12);
it("websocket connect", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec18.js')]);
assert.equal(p.readLine(), "1800");
assert.equal(p.wait(), 18);
});

it("websocket disconnect", () => {
var httpd = new http.Server(8899, {
"/ws": ws.upgrade((s) => {
s.onmessage = function (msg) {
s.send(msg);
};
})
});
ss.push(httpd.socket);
httpd.run(() => {});

var p = process.open(cmd, [path.join(__dirname, 'process', 'exec19.js')]);
assert.equal(p.readLine(), "1900");
assert.equal(p.wait(), 19);
});

it("bugfix: multi fiber async", () => {
var p = process.open(cmd, [path.join(__dirname, 'process', 'exec12.js')]);
assert.equal(p.readLine(), "600");
assert.equal(p.wait(), 12);
});
});

it("start", () => {
Expand Down
3 changes: 3 additions & 0 deletions test/ws_test.js
Expand Up @@ -337,6 +337,7 @@ describe('ws', () => {

assert.equal(rep.statusCode, 101);
assert.equal(rep.firstHeader("Sec-WebSocket-Extensions"), "permessage-deflate");
rep.socket.close();

var rep = http.get("http://127.0.0.1:" + (8813 + base_port) + "/ws", {
headers: {
Expand All @@ -350,6 +351,7 @@ describe('ws', () => {

assert.equal(rep.statusCode, 101);
assert.equal(rep.firstHeader("Sec-WebSocket-Extensions"), "permessage-deflate");
rep.socket.close();
});
});

Expand Down Expand Up @@ -690,6 +692,7 @@ describe('ws', () => {

ev.wait();
console.timeEnd('ws');
s.close();
});

describe('onerror', () => {
Expand Down

0 comments on commit 38bdf89

Please sign in to comment.