Skip to content

Commit

Permalink
Experimental zlib-stream support part 2: Gateway ETF
Browse files Browse the repository at this point in the history
  • Loading branch information
abalabahaha committed Oct 16, 2017
1 parent 94c0767 commit 90790ea
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 24 deletions.
2 changes: 1 addition & 1 deletion lib/Client.js
Expand Up @@ -179,7 +179,7 @@ class Client extends EventEmitter {
}
this.gatewayURL = data.url + "?v=" + Constants.GATEWAY_VERSION + "&encoding=" + (Erlpack ? "etf" : "json");

if(this.options.compress && !Erlpack) {
if(this.options.compress) {
this.gatewayURL += "&compress=zlib-stream";
}

Expand Down
35 changes: 12 additions & 23 deletions lib/gateway/Shard.js
Expand Up @@ -1150,7 +1150,7 @@ class Shard extends EventEmitter {
this.client.joinVoiceChannel(channel.id, false);
}
} else { // Phantom voice states from connected users in deleted channels (╯°□°)╯︵ ┻━┻
this.emit("warn", "Phantom voice state received but channel not found | Guild: " + guild.id + " | Channel: " + voiceState.channel_id);
this.client.emit("warn", "Phantom voice state received but channel not found | Guild: " + guild.id + " | Channel: " + voiceState.channel_id);
}
}
}
Expand Down Expand Up @@ -1492,7 +1492,7 @@ class Shard extends EventEmitter {

initializeWS() {
this.status = "connecting";
if(this.client.options.compress && !Erlpack) {
if(this.client.options.compress) {
this._zlib = Zlib.createUnzip({
flush: Zlib.constants.Z_SYNC_FLUSH,
chunkSize: 128 * 1024
Expand Down Expand Up @@ -1525,23 +1525,19 @@ class Shard extends EventEmitter {
} else if(Array.isArray(data)) { // Fragmented messages
data = Buffer.concat(data); // Copyfull concat is slow, but no alternative
}
if(Erlpack) {
return this.onWSMessage(Erlpack.unpack(data));
} else if(this.client.options.compress) {
if(this.client.options.compress) {
if(this._zlibFlushing) {
this._zlibIncomingChunks.push(data);
// console.log("Stored data");
} else if(this._zlib) {
this._zlib.write(data);
// console.log("Wrote data");

if(data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) {
this._zlibFlushing = 1;
this._zlibFlushing = true;
this._zlib.flush(Zlib.constants.Z_SYNC_FLUSH, this._onZlibFlush);
this._zlibFlushCalls = (this._zlibFlushCalls || 0) + 1;
// console.log("Flushing...");
}
}
} else if(Erlpack) {
return this.onWSMessage(Erlpack.unpack(data));
} else {
try {
return this.onWSMessage(JSON.parse(data.toString()));
Expand Down Expand Up @@ -1609,41 +1605,34 @@ class Shard extends EventEmitter {
}

_onZlibFlush() {
// this._zlibFlushCallbacks = (this._zlibFlushCallbacks || 0) + 1;
// console.log("Flushed. ", this._zlibFlushing, this._zlibFlushCalls, this._zlibFlushCallbacks);
this._zlibFlushing = false;
if(this._zlibChunks.length === 0) {
return;
}

var buffer = this._zlibChunks[0];
if(this._zlibChunks.length > 1) {
// console.log("Chunk begends, ", this._zlibChunks.map(c=>{
// let s=c.toString();
// return (s[0]||" ")+(s[s.length-1]||" ");
// }).join(","))
buffer = Buffer.concat(this._zlibChunks);
}
this._zlibChunks.length = 0;

while(this._zlibIncomingChunks.length > 0) {
var data = this._zlibIncomingChunks.shift();
this._zlib.write(data);
// console.log("Unstored data.");
if(data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) {
this._zlibFlushing = 2;
this._zlibFlushing = true;
this._zlib.flush(Zlib.constants.Z_SYNC_FLUSH, this._onZlibFlush);
// this._zlibFlushCalls = (this._zlibFlushCalls || 0) + 1;
// console.log("Flushing store...");
break;
}
}

try {
this.onWSMessage(JSON.parse(buffer.toString()));
if(Erlpack) {
this.onWSMessage(Erlpack.unpack(buffer));
} else {
this.onWSMessage(JSON.parse(buffer.toString()));
}
} catch(err) {
require("fs").writeFileSync("./wtf.json", buffer.toString());
throw err;
this.emit("error", err, this.id);
}
}
Expand Down

0 comments on commit 90790ea

Please sign in to comment.