diff --git a/lib/Client.js b/lib/Client.js index 3be533dcb..3f1c676b3 100644 --- a/lib/Client.js +++ b/lib/Client.js @@ -118,11 +118,7 @@ class Client extends EventEmitter { this.options.lastShardID = this.options.maxShards - 1; } if(typeof window !== "undefined") { - try { - require("pako"); - } catch(err) { - this.options.compress = false; // zlib does not like Blobs, Pako is not here - } + this.options.compress = false; // zlib does not like Blobs, Pako is not here } if(!~Constants.ImageFormats.indexOf(this.options.defaultImageFormat.toLowerCase())) { this.options.defaultImageFormat = "jpg"; @@ -183,6 +179,10 @@ class Client extends EventEmitter { } this.gatewayURL = data.url + "?v=" + Constants.GATEWAY_VERSION + "&encoding=" + (Erlpack ? "etf" : "json"); + if(this.options.compress && !Erlpack) { + this.gatewayURL += "&compress=zlib-stream"; + } + if (this.options.maxShards === "auto") { if (!data.shards) { return Promise.reject(new Error("Failed to autoshard due to lack of data from Discord.")); diff --git a/lib/gateway/Shard.js b/lib/gateway/Shard.js index aa7cbeecd..36ab9285f 100644 --- a/lib/gateway/Shard.js +++ b/lib/gateway/Shard.js @@ -47,6 +47,9 @@ class Shard extends EventEmitter { this.id = id; this.client = client; + this.onWSMessage = this.onWSMessage.bind(this); + this._onZlibFlush = this._onZlibFlush.bind(this); + this.hardReset(); } @@ -83,6 +86,10 @@ class Shard extends EventEmitter { } this.ws.onclose = undefined; + if(this._zlib) { + this._zlib.close(); + this._zlib = null; + } try { if(options.reconnect && this.sessionID) { this.ws.terminate(); @@ -138,6 +145,9 @@ class Shard extends EventEmitter { this.lastHeartbeatReceived = null; this.lastHeartbeatSent = null; this.status = "disconnected"; + this._zlibFlushing = false; + this._zlibChunks = null; + this._zlibIncomingChunks = null; } hardReset() { @@ -1481,9 +1491,18 @@ class Shard extends EventEmitter { } initializeWS() { - this._rPackets = 0; - this._rStartTime = Date.now(); this.status = "connecting"; + if(this.client.options.compress && !Erlpack) { + this._zlib = Zlib.createUnzip({ + flush: Zlib.constants.Z_SYNC_FLUSH, + chunkSize: 128 * 1024 + }); + this._zlibChunks = []; + this._zlibIncomingChunks = []; + this._zlib.on("data", (data) => { + this._zlibChunks.push(data); + }); + } this.ws = new WebSocket(this.client.gatewayURL, this.client.options.ws); this.ws.onopen = () => { if(!this.client.token) { @@ -1499,93 +1518,35 @@ class Shard extends EventEmitter { this.lastHeartbeatAck = true; }; this.ws.onmessage = (m) => { - this._rPackets++; try { - var packet = this.parse(m); - - if(this.client.listeners("rawWS").length > 0) { - /** - * Fired when the shard receives a websocket packet - * @event Client#rawWS - * @prop {Object} packet The packet - * @prop {Number} id The ID of the shard - */ - this.client.emit("rawWS", packet, this.id); - } - - if(packet.s) { - if(packet.s > this.seq + 1 && this.ws && this.status !== "resuming") { - /** - * Fired to warn of something weird but non-breaking happening - * @event Client#warn - * @prop {String} message The warning message - * @prop {Number} id The ID of the shard - */ - this.client.emit("warn", "Non-consecutive sequence, requesting resume", this.id); - this.seq = packet.s; - this.resume(); - } - this.seq = packet.s; - } - - switch(packet.op) { - case OPCodes.EVENT: { - if(!this.client.options.disableEvents[packet.t]) { - this.wsEvent(packet); - } - break; - } - case OPCodes.HEARTBEAT: { - this.heartbeat(); - break; - } - case OPCodes.INVALID_SESSION: { - this.seq = 0; - this.sessionID = null; - this.client.emit("warn", "Invalid session, reidentifying!", this.id); - this.identify(); - break; - } - case OPCodes.RECONNECT: { - this.disconnect({ - reconnect: "auto" - }); - break; - } - case OPCodes.HELLO: { - if(packet.d.heartbeat_interval > 0) { - if(this.heartbeatInterval) { - clearInterval(this.heartbeatInterval); - } - this.heartbeatInterval = setInterval(() => this.heartbeat(true), packet.d.heartbeat_interval); - } - - this.discordServerTrace = packet.d._trace; - this.connecting = false; + var data = m.data; + if(data instanceof ArrayBuffer) { + data = new Buffer(data); + } else if(Array.isArray(data)) { // Fragmented messages + data = Buffer.concat(data); // Copyfull concat is slow, but no alternative + } + if(Erlpack) { + return this.onWSMessage(Erlpack.unpack(data)); + } else if(this.client.options.compress) { + if(this._zlibFlushing) { + this._zlibIncomingChunks.push(data); + // console.log("Stored data"); + } else if(this._zlib) { + this._zlib.write(data); + // console.log("Wrote data"); - if(this.sessionID) { - this.resume(); - } else { - this.identify(); + if(data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) { + this._zlibFlushing = 1; + this._zlib.flush(Zlib.constants.Z_SYNC_FLUSH, this._onZlibFlush); + this._zlibFlushCalls = (this._zlibFlushCalls || 0) + 1; + // console.log("Flushing..."); } - this.heartbeat(); - /** - * Fired when a shard receives an OP:10/HELLO packet - * @event Client#hello - * @prop {String[]} trace The Discord server trace of the gateway and session servers - * @prop {Number} id The ID of the shard - */ - this.client.emit("hello", packet.d._trace, this.id); - break; /* eslint-enable no-unreachable */ } - case OPCodes.HEARTBEAT_ACK: { - this.lastHeartbeatAck = true; - this.lastHeartbeatReceived = new Date().getTime(); - break; - } - default: { - this.client.emit("unknown", packet, this.id); - break; + } else { + try { + return this.onWSMessage(JSON.parse(data.toString())); + } catch(err) { + throw new Error(err.message + "\n\n" + data); } } } catch(err) { @@ -1647,23 +1608,123 @@ class Shard extends EventEmitter { }, this.client.options.connectionTimeout); } - parse(message) { - var data = message.data; - if(data instanceof ArrayBuffer) { - data = new Buffer(data); - } else if(Array.isArray(data)) { // Fragmented messages - data = Buffer.concat(data); // Copyfull concat is slow, but no alternative + _onZlibFlush() { + // this._zlibFlushCallbacks = (this._zlibFlushCallbacks || 0) + 1; + // console.log("Flushed. ", this._zlibFlushing, this._zlibFlushCalls, this._zlibFlushCallbacks); + this._zlibFlushing = false; + if(this._zlibChunks.length === 0) { + return; } - if(Erlpack) { - return Erlpack.unpack(data); - } else { - if(this.client.options.compress && typeof data !== "string") { - data = Zlib.inflateSync(data); + + var buffer = this._zlibChunks[0]; + if(this._zlibChunks.length > 1) { + // console.log("Chunk begends, ", this._zlibChunks.map(c=>{ + // let s=c.toString(); + // return (s[0]||" ")+(s[s.length-1]||" "); + // }).join(",")) + buffer = Buffer.concat(this._zlibChunks); + } + this._zlibChunks.length = 0; + + while(this._zlibIncomingChunks.length > 0) { + var data = this._zlibIncomingChunks.shift(); + this._zlib.write(data); + // console.log("Unstored data."); + if(data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) { + this._zlibFlushing = 2; + this._zlib.flush(Zlib.constants.Z_SYNC_FLUSH, this._onZlibFlush); + // this._zlibFlushCalls = (this._zlibFlushCalls || 0) + 1; + // console.log("Flushing store..."); + break; } - try { - return JSON.parse(data); - } catch(err) { - throw new Error(err.message + "\n\n" + data); + } + + try { + this.onWSMessage(JSON.parse(buffer.toString())); + } catch(err) { + require("fs").writeFileSync("./wtf.json", buffer.toString()); + throw err; + this.client.emit("error", err, this.id); + } + } + + onWSMessage(packet) { + if(this.client.listeners("rawWS").length > 0) { + /** + * Fired when the shard receives a websocket packet + * @event Client#rawWS + * @prop {Object} packet The packet + * @prop {Number} id The ID of the shard + */ + this.client.emit("rawWS", packet, this.id); + } + + if(packet.s) { + if(packet.s > this.seq + 1 && this.ws) { + /** + * Fired to warn of something weird but non-breaking happening + * @event Client#warn + * @prop {String} message The warning message + * @prop {Number} id The ID of the shard + */ + this.client.emit("warn", "Non-consecutive sequence, requesting resume", this.id); + this.seq = packet.s; + this.resume(); + } + this.seq = packet.s; + } + + switch(packet.op) { + case OPCodes.EVENT: { + if(!this.client.options.disableEvents[packet.t]) { + this.wsEvent(packet); + } + break; + } + case OPCodes.HEARTBEAT: { + this.heartbeat(); + break; + } + case OPCodes.INVALID_SESSION: { + this.seq = 0; + this.sessionID = null; + this.client.emit("warn", "Invalid session, reidentifying!", this.id); + this.identify(); + break; + } + case OPCodes.RECONNECT: { + this.disconnect({ + reconnect: "auto" + }); + break; + } + case OPCodes.HELLO: { + if(packet.d.heartbeat_interval > 0) { + if(this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + } + this.heartbeatInterval = setInterval(() => this.heartbeat(true), packet.d.heartbeat_interval); + } + + this.discordServerTrace = packet.d._trace; + this.connecting = false; + + if(this.sessionID) { + this.resume(); + } else { + this.identify(); + } + this.heartbeat(); + break; /* eslint-enable no-unreachable */ + } + case OPCodes.HEARTBEAT_ACK: { + this.lastHeartbeatAck = true; + this.lastHeartbeatReceived = new Date().getTime(); + break; + } + default: { + this.client.emit("unknown", packet, this.id); + break; } } }