Skip to content

Commit

Permalink
Experimental zlib-stream support part 1: Gateway JSON
Browse files Browse the repository at this point in the history
The built-in zlib module is pretty bad at this. inflateSync() doesn't allow you to pass a custom context. The context-based stream from createInflate() is subject to race conditions, since another packet from the gateway WebSocket could enter the chunk queue before the flush() callback is called, causing the JSON parser to attempt to parse multiple JSON packets in one string. C'mon Node. Even PHP has a better API for this[1].

Things to work on:
- See if we can cut down on CPU usage
- Avoid an excessively large packet buffer in memory if the CPU can't keep up
- zlib-stream for ETF support (is this even beneficial?)
- console.log()s are everywhere

[1]: http://php.net/manual/en/function.inflate-add.php
  • Loading branch information
abalabahaha committed Oct 14, 2017
1 parent 1d8271a commit 49c0654
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 105 deletions.
10 changes: 5 additions & 5 deletions lib/Client.js
Expand Up @@ -118,11 +118,7 @@ class Client extends EventEmitter {
this.options.lastShardID = this.options.maxShards - 1;
}
if(typeof window !== "undefined") {
try {
require("pako");
} catch(err) {
this.options.compress = false; // zlib does not like Blobs, Pako is not here
}
this.options.compress = false; // zlib does not like Blobs, Pako is not here
}
if(!~Constants.ImageFormats.indexOf(this.options.defaultImageFormat.toLowerCase())) {
this.options.defaultImageFormat = "jpg";
Expand Down Expand Up @@ -183,6 +179,10 @@ class Client extends EventEmitter {
}
this.gatewayURL = data.url + "?v=" + Constants.GATEWAY_VERSION + "&encoding=" + (Erlpack ? "etf" : "json");

if(this.options.compress && !Erlpack) {
this.gatewayURL += "&compress=zlib-stream";
}

if (this.options.maxShards === "auto") {
if (!data.shards) {
return Promise.reject(new Error("Failed to autoshard due to lack of data from Discord."));
Expand Down
261 changes: 161 additions & 100 deletions lib/gateway/Shard.js
Expand Up @@ -47,6 +47,9 @@ class Shard extends EventEmitter {
this.id = id;
this.client = client;

this.onWSMessage = this.onWSMessage.bind(this);
this._onZlibFlush = this._onZlibFlush.bind(this);

this.hardReset();
}

Expand Down Expand Up @@ -83,6 +86,10 @@ class Shard extends EventEmitter {
}

this.ws.onclose = undefined;
if(this._zlib) {
this._zlib.close();
this._zlib = null;
}
try {
if(options.reconnect && this.sessionID) {
this.ws.terminate();
Expand Down Expand Up @@ -138,6 +145,9 @@ class Shard extends EventEmitter {
this.lastHeartbeatReceived = null;
this.lastHeartbeatSent = null;
this.status = "disconnected";
this._zlibFlushing = false;
this._zlibChunks = null;
this._zlibIncomingChunks = null;
}

hardReset() {
Expand Down Expand Up @@ -1481,9 +1491,18 @@ class Shard extends EventEmitter {
}

initializeWS() {
this._rPackets = 0;
this._rStartTime = Date.now();
this.status = "connecting";
if(this.client.options.compress && !Erlpack) {
this._zlib = Zlib.createUnzip({
flush: Zlib.constants.Z_SYNC_FLUSH,
chunkSize: 128 * 1024
});
this._zlibChunks = [];
this._zlibIncomingChunks = [];
this._zlib.on("data", (data) => {
this._zlibChunks.push(data);
});
}
this.ws = new WebSocket(this.client.gatewayURL, this.client.options.ws);
this.ws.onopen = () => {
if(!this.client.token) {
Expand All @@ -1499,93 +1518,35 @@ class Shard extends EventEmitter {
this.lastHeartbeatAck = true;
};
this.ws.onmessage = (m) => {
this._rPackets++;
try {
var packet = this.parse(m);

if(this.client.listeners("rawWS").length > 0) {
/**
* Fired when the shard receives a websocket packet
* @event Client#rawWS
* @prop {Object} packet The packet
* @prop {Number} id The ID of the shard
*/
this.client.emit("rawWS", packet, this.id);
}

if(packet.s) {
if(packet.s > this.seq + 1 && this.ws && this.status !== "resuming") {
/**
* Fired to warn of something weird but non-breaking happening
* @event Client#warn
* @prop {String} message The warning message
* @prop {Number} id The ID of the shard
*/
this.client.emit("warn", "Non-consecutive sequence, requesting resume", this.id);
this.seq = packet.s;
this.resume();
}
this.seq = packet.s;
}

switch(packet.op) {
case OPCodes.EVENT: {
if(!this.client.options.disableEvents[packet.t]) {
this.wsEvent(packet);
}
break;
}
case OPCodes.HEARTBEAT: {
this.heartbeat();
break;
}
case OPCodes.INVALID_SESSION: {
this.seq = 0;
this.sessionID = null;
this.client.emit("warn", "Invalid session, reidentifying!", this.id);
this.identify();
break;
}
case OPCodes.RECONNECT: {
this.disconnect({
reconnect: "auto"
});
break;
}
case OPCodes.HELLO: {
if(packet.d.heartbeat_interval > 0) {
if(this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
this.heartbeatInterval = setInterval(() => this.heartbeat(true), packet.d.heartbeat_interval);
}

this.discordServerTrace = packet.d._trace;
this.connecting = false;
var data = m.data;
if(data instanceof ArrayBuffer) {
data = new Buffer(data);
} else if(Array.isArray(data)) { // Fragmented messages
data = Buffer.concat(data); // Copyfull concat is slow, but no alternative
}
if(Erlpack) {
return this.onWSMessage(Erlpack.unpack(data));
} else if(this.client.options.compress) {
if(this._zlibFlushing) {
this._zlibIncomingChunks.push(data);
// console.log("Stored data");
} else if(this._zlib) {
this._zlib.write(data);
// console.log("Wrote data");

if(this.sessionID) {
this.resume();
} else {
this.identify();
if(data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) {
this._zlibFlushing = 1;
this._zlib.flush(Zlib.constants.Z_SYNC_FLUSH, this._onZlibFlush);
this._zlibFlushCalls = (this._zlibFlushCalls || 0) + 1;
// console.log("Flushing...");
}
this.heartbeat();
/**
* Fired when a shard receives an OP:10/HELLO packet
* @event Client#hello
* @prop {String[]} trace The Discord server trace of the gateway and session servers
* @prop {Number} id The ID of the shard
*/
this.client.emit("hello", packet.d._trace, this.id);
break; /* eslint-enable no-unreachable */
}
case OPCodes.HEARTBEAT_ACK: {
this.lastHeartbeatAck = true;
this.lastHeartbeatReceived = new Date().getTime();
break;
}
default: {
this.client.emit("unknown", packet, this.id);
break;
} else {
try {
return this.onWSMessage(JSON.parse(data.toString()));
} catch(err) {
throw new Error(err.message + "\n\n" + data);
}
}
} catch(err) {
Expand Down Expand Up @@ -1647,23 +1608,123 @@ class Shard extends EventEmitter {
}, this.client.options.connectionTimeout);
}

parse(message) {
var data = message.data;
if(data instanceof ArrayBuffer) {
data = new Buffer(data);
} else if(Array.isArray(data)) { // Fragmented messages
data = Buffer.concat(data); // Copyfull concat is slow, but no alternative
_onZlibFlush() {
// this._zlibFlushCallbacks = (this._zlibFlushCallbacks || 0) + 1;
// console.log("Flushed. ", this._zlibFlushing, this._zlibFlushCalls, this._zlibFlushCallbacks);
this._zlibFlushing = false;
if(this._zlibChunks.length === 0) {
return;
}
if(Erlpack) {
return Erlpack.unpack(data);
} else {
if(this.client.options.compress && typeof data !== "string") {
data = Zlib.inflateSync(data);

var buffer = this._zlibChunks[0];
if(this._zlibChunks.length > 1) {
// console.log("Chunk begends, ", this._zlibChunks.map(c=>{
// let s=c.toString();
// return (s[0]||" ")+(s[s.length-1]||" ");
// }).join(","))
buffer = Buffer.concat(this._zlibChunks);
}
this._zlibChunks.length = 0;

while(this._zlibIncomingChunks.length > 0) {
var data = this._zlibIncomingChunks.shift();
this._zlib.write(data);
// console.log("Unstored data.");
if(data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) {
this._zlibFlushing = 2;
this._zlib.flush(Zlib.constants.Z_SYNC_FLUSH, this._onZlibFlush);
// this._zlibFlushCalls = (this._zlibFlushCalls || 0) + 1;
// console.log("Flushing store...");
break;
}
try {
return JSON.parse(data);
} catch(err) {
throw new Error(err.message + "\n\n" + data);
}

try {
this.onWSMessage(JSON.parse(buffer.toString()));
} catch(err) {
require("fs").writeFileSync("./wtf.json", buffer.toString());
throw err;
this.client.emit("error", err, this.id);
}
}

onWSMessage(packet) {
if(this.client.listeners("rawWS").length > 0) {
/**
* Fired when the shard receives a websocket packet
* @event Client#rawWS
* @prop {Object} packet The packet
* @prop {Number} id The ID of the shard
*/
this.client.emit("rawWS", packet, this.id);
}

if(packet.s) {
if(packet.s > this.seq + 1 && this.ws) {
/**
* Fired to warn of something weird but non-breaking happening
* @event Client#warn
* @prop {String} message The warning message
* @prop {Number} id The ID of the shard
*/
this.client.emit("warn", "Non-consecutive sequence, requesting resume", this.id);
this.seq = packet.s;
this.resume();
}
this.seq = packet.s;
}

switch(packet.op) {
case OPCodes.EVENT: {
if(!this.client.options.disableEvents[packet.t]) {
this.wsEvent(packet);
}
break;
}
case OPCodes.HEARTBEAT: {
this.heartbeat();
break;
}
case OPCodes.INVALID_SESSION: {
this.seq = 0;
this.sessionID = null;
this.client.emit("warn", "Invalid session, reidentifying!", this.id);
this.identify();
break;
}
case OPCodes.RECONNECT: {
this.disconnect({
reconnect: "auto"
});
break;
}
case OPCodes.HELLO: {
if(packet.d.heartbeat_interval > 0) {
if(this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
this.heartbeatInterval = setInterval(() => this.heartbeat(true), packet.d.heartbeat_interval);
}

this.discordServerTrace = packet.d._trace;
this.connecting = false;

if(this.sessionID) {
this.resume();
} else {
this.identify();
}
this.heartbeat();
break; /* eslint-enable no-unreachable */
}
case OPCodes.HEARTBEAT_ACK: {
this.lastHeartbeatAck = true;
this.lastHeartbeatReceived = new Date().getTime();
break;
}
default: {
this.client.emit("unknown", packet, this.id);
break;
}
}
}
Expand Down

0 comments on commit 49c0654

Please sign in to comment.