From 49c065489ecb8bf5674f7391c24c7026eb06a2af Mon Sep 17 00:00:00 2001 From: abalabahaha Date: Fri, 13 Oct 2017 23:36:47 -0700 Subject: [PATCH] Experimental zlib-stream support part 1: Gateway JSON The built-in zlib module is pretty bad at this. inflateSync() doesn't allow you to pass a custom context. The context-based stream from createInflate() is subject to race conditions, since another packet from the gateway WebSocket could enter the chunk queue before the flush() callback is called, causing the JSON parser to attempt to parse multiple JSON packets in one string. C'mon Node. Even PHP has a better API for this[1]. Things to work on: - See if we can cut down on CPU usage - Avoid an excessively large packet buffer in memory if the CPU can't keep up - zlib-stream for ETF support (is this even beneficial?) - console.log()s are everywhere [1]: http://php.net/manual/en/function.inflate-add.php --- lib/Client.js | 10 +- lib/gateway/Shard.js | 261 ++++++++++++++++++++++++++----------------- 2 files changed, 166 insertions(+), 105 deletions(-) diff --git a/lib/Client.js b/lib/Client.js index 3be533dcb..3f1c676b3 100644 --- a/lib/Client.js +++ b/lib/Client.js @@ -118,11 +118,7 @@ class Client extends EventEmitter { this.options.lastShardID = this.options.maxShards - 1; } if(typeof window !== "undefined") { - try { - require("pako"); - } catch(err) { - this.options.compress = false; // zlib does not like Blobs, Pako is not here - } + this.options.compress = false; // zlib does not like Blobs, Pako is not here } if(!~Constants.ImageFormats.indexOf(this.options.defaultImageFormat.toLowerCase())) { this.options.defaultImageFormat = "jpg"; @@ -183,6 +179,10 @@ class Client extends EventEmitter { } this.gatewayURL = data.url + "?v=" + Constants.GATEWAY_VERSION + "&encoding=" + (Erlpack ? "etf" : "json"); + if(this.options.compress && !Erlpack) { + this.gatewayURL += "&compress=zlib-stream"; + } + if (this.options.maxShards === "auto") { if (!data.shards) { return Promise.reject(new Error("Failed to autoshard due to lack of data from Discord.")); diff --git a/lib/gateway/Shard.js b/lib/gateway/Shard.js index aa7cbeecd..36ab9285f 100644 --- a/lib/gateway/Shard.js +++ b/lib/gateway/Shard.js @@ -47,6 +47,9 @@ class Shard extends EventEmitter { this.id = id; this.client = client; + this.onWSMessage = this.onWSMessage.bind(this); + this._onZlibFlush = this._onZlibFlush.bind(this); + this.hardReset(); } @@ -83,6 +86,10 @@ class Shard extends EventEmitter { } this.ws.onclose = undefined; + if(this._zlib) { + this._zlib.close(); + this._zlib = null; + } try { if(options.reconnect && this.sessionID) { this.ws.terminate(); @@ -138,6 +145,9 @@ class Shard extends EventEmitter { this.lastHeartbeatReceived = null; this.lastHeartbeatSent = null; this.status = "disconnected"; + this._zlibFlushing = false; + this._zlibChunks = null; + this._zlibIncomingChunks = null; } hardReset() { @@ -1481,9 +1491,18 @@ class Shard extends EventEmitter { } initializeWS() { - this._rPackets = 0; - this._rStartTime = Date.now(); this.status = "connecting"; + if(this.client.options.compress && !Erlpack) { + this._zlib = Zlib.createUnzip({ + flush: Zlib.constants.Z_SYNC_FLUSH, + chunkSize: 128 * 1024 + }); + this._zlibChunks = []; + this._zlibIncomingChunks = []; + this._zlib.on("data", (data) => { + this._zlibChunks.push(data); + }); + } this.ws = new WebSocket(this.client.gatewayURL, this.client.options.ws); this.ws.onopen = () => { if(!this.client.token) { @@ -1499,93 +1518,35 @@ class Shard extends EventEmitter { this.lastHeartbeatAck = true; }; this.ws.onmessage = (m) => { - this._rPackets++; try { - var packet = this.parse(m); - - if(this.client.listeners("rawWS").length > 0) { - /** - * Fired when the shard receives a websocket packet - * @event Client#rawWS - * @prop {Object} packet The packet - * @prop {Number} id The ID of the shard - */ - this.client.emit("rawWS", packet, this.id); - } - - if(packet.s) { - if(packet.s > this.seq + 1 && this.ws && this.status !== "resuming") { - /** - * Fired to warn of something weird but non-breaking happening - * @event Client#warn - * @prop {String} message The warning message - * @prop {Number} id The ID of the shard - */ - this.client.emit("warn", "Non-consecutive sequence, requesting resume", this.id); - this.seq = packet.s; - this.resume(); - } - this.seq = packet.s; - } - - switch(packet.op) { - case OPCodes.EVENT: { - if(!this.client.options.disableEvents[packet.t]) { - this.wsEvent(packet); - } - break; - } - case OPCodes.HEARTBEAT: { - this.heartbeat(); - break; - } - case OPCodes.INVALID_SESSION: { - this.seq = 0; - this.sessionID = null; - this.client.emit("warn", "Invalid session, reidentifying!", this.id); - this.identify(); - break; - } - case OPCodes.RECONNECT: { - this.disconnect({ - reconnect: "auto" - }); - break; - } - case OPCodes.HELLO: { - if(packet.d.heartbeat_interval > 0) { - if(this.heartbeatInterval) { - clearInterval(this.heartbeatInterval); - } - this.heartbeatInterval = setInterval(() => this.heartbeat(true), packet.d.heartbeat_interval); - } - - this.discordServerTrace = packet.d._trace; - this.connecting = false; + var data = m.data; + if(data instanceof ArrayBuffer) { + data = new Buffer(data); + } else if(Array.isArray(data)) { // Fragmented messages + data = Buffer.concat(data); // Copyfull concat is slow, but no alternative + } + if(Erlpack) { + return this.onWSMessage(Erlpack.unpack(data)); + } else if(this.client.options.compress) { + if(this._zlibFlushing) { + this._zlibIncomingChunks.push(data); + // console.log("Stored data"); + } else if(this._zlib) { + this._zlib.write(data); + // console.log("Wrote data"); - if(this.sessionID) { - this.resume(); - } else { - this.identify(); + if(data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) { + this._zlibFlushing = 1; + this._zlib.flush(Zlib.constants.Z_SYNC_FLUSH, this._onZlibFlush); + this._zlibFlushCalls = (this._zlibFlushCalls || 0) + 1; + // console.log("Flushing..."); } - this.heartbeat(); - /** - * Fired when a shard receives an OP:10/HELLO packet - * @event Client#hello - * @prop {String[]} trace The Discord server trace of the gateway and session servers - * @prop {Number} id The ID of the shard - */ - this.client.emit("hello", packet.d._trace, this.id); - break; /* eslint-enable no-unreachable */ } - case OPCodes.HEARTBEAT_ACK: { - this.lastHeartbeatAck = true; - this.lastHeartbeatReceived = new Date().getTime(); - break; - } - default: { - this.client.emit("unknown", packet, this.id); - break; + } else { + try { + return this.onWSMessage(JSON.parse(data.toString())); + } catch(err) { + throw new Error(err.message + "\n\n" + data); } } } catch(err) { @@ -1647,23 +1608,123 @@ class Shard extends EventEmitter { }, this.client.options.connectionTimeout); } - parse(message) { - var data = message.data; - if(data instanceof ArrayBuffer) { - data = new Buffer(data); - } else if(Array.isArray(data)) { // Fragmented messages - data = Buffer.concat(data); // Copyfull concat is slow, but no alternative + _onZlibFlush() { + // this._zlibFlushCallbacks = (this._zlibFlushCallbacks || 0) + 1; + // console.log("Flushed. ", this._zlibFlushing, this._zlibFlushCalls, this._zlibFlushCallbacks); + this._zlibFlushing = false; + if(this._zlibChunks.length === 0) { + return; } - if(Erlpack) { - return Erlpack.unpack(data); - } else { - if(this.client.options.compress && typeof data !== "string") { - data = Zlib.inflateSync(data); + + var buffer = this._zlibChunks[0]; + if(this._zlibChunks.length > 1) { + // console.log("Chunk begends, ", this._zlibChunks.map(c=>{ + // let s=c.toString(); + // return (s[0]||" ")+(s[s.length-1]||" "); + // }).join(",")) + buffer = Buffer.concat(this._zlibChunks); + } + this._zlibChunks.length = 0; + + while(this._zlibIncomingChunks.length > 0) { + var data = this._zlibIncomingChunks.shift(); + this._zlib.write(data); + // console.log("Unstored data."); + if(data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) { + this._zlibFlushing = 2; + this._zlib.flush(Zlib.constants.Z_SYNC_FLUSH, this._onZlibFlush); + // this._zlibFlushCalls = (this._zlibFlushCalls || 0) + 1; + // console.log("Flushing store..."); + break; } - try { - return JSON.parse(data); - } catch(err) { - throw new Error(err.message + "\n\n" + data); + } + + try { + this.onWSMessage(JSON.parse(buffer.toString())); + } catch(err) { + require("fs").writeFileSync("./wtf.json", buffer.toString()); + throw err; + this.client.emit("error", err, this.id); + } + } + + onWSMessage(packet) { + if(this.client.listeners("rawWS").length > 0) { + /** + * Fired when the shard receives a websocket packet + * @event Client#rawWS + * @prop {Object} packet The packet + * @prop {Number} id The ID of the shard + */ + this.client.emit("rawWS", packet, this.id); + } + + if(packet.s) { + if(packet.s > this.seq + 1 && this.ws) { + /** + * Fired to warn of something weird but non-breaking happening + * @event Client#warn + * @prop {String} message The warning message + * @prop {Number} id The ID of the shard + */ + this.client.emit("warn", "Non-consecutive sequence, requesting resume", this.id); + this.seq = packet.s; + this.resume(); + } + this.seq = packet.s; + } + + switch(packet.op) { + case OPCodes.EVENT: { + if(!this.client.options.disableEvents[packet.t]) { + this.wsEvent(packet); + } + break; + } + case OPCodes.HEARTBEAT: { + this.heartbeat(); + break; + } + case OPCodes.INVALID_SESSION: { + this.seq = 0; + this.sessionID = null; + this.client.emit("warn", "Invalid session, reidentifying!", this.id); + this.identify(); + break; + } + case OPCodes.RECONNECT: { + this.disconnect({ + reconnect: "auto" + }); + break; + } + case OPCodes.HELLO: { + if(packet.d.heartbeat_interval > 0) { + if(this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + } + this.heartbeatInterval = setInterval(() => this.heartbeat(true), packet.d.heartbeat_interval); + } + + this.discordServerTrace = packet.d._trace; + this.connecting = false; + + if(this.sessionID) { + this.resume(); + } else { + this.identify(); + } + this.heartbeat(); + break; /* eslint-enable no-unreachable */ + } + case OPCodes.HEARTBEAT_ACK: { + this.lastHeartbeatAck = true; + this.lastHeartbeatReceived = new Date().getTime(); + break; + } + default: { + this.client.emit("unknown", packet, this.id); + break; } } }