Skip to content

Commit

Permalink
code small perf improvement
Browse files Browse the repository at this point in the history
adding __test public method to permit testing
  • Loading branch information
rusher committed May 1, 2018
1 parent 7825c54 commit c54f430
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 127 deletions.
4 changes: 0 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ matrix:
- node_js: "node"
env: DB=mysql:8.0

cache:
directories:
- node_modules

notifications:
email: false

Expand Down
4 changes: 2 additions & 2 deletions src/cmd/change-user.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class ChangeUser extends Handshake {
writeParam(out, attrNames[k], encoding);
writeParam(out, connectAttributes[attrNames[k]], encoding);
}

//write end size
out.buf[initPos] = out.pos - initPos - 2;
out.buf[initPos + 1] = (out.pos - initPos - 2) >> 8;
out.writeInt16AtPos(initPos);
}

out.flushBuffer(true);
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/handshake/client-handshake-response.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ module.exports.send = function send(cmd, out, opts, pluginName, info) {
writeParam(out, attrNames[k], encoding);
writeParam(out, connectAttributes[attrNames[k]], encoding);
}

//write end size
out.buf[initPos] = out.pos - initPos - 2;
out.buf[initPos + 1] = (out.pos - initPos - 2) >> 8;
out.writeInt16AtPos(initPos);
}

out.flushBuffer(true);
Expand Down
22 changes: 12 additions & 10 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,18 +347,21 @@ function Connection(options) {
//*****************************************************************
// internal public testing methods
//*****************************************************************
this._test_collation = () => {
function TestMethods(){}
TestMethods.prototype.getCollation = () => {
return opts.collation;
};

this._test_socket = () => {
TestMethods.prototype.getSocket = () => {
return _socket;
};

this._test_info = () => {
TestMethods.prototype.getInfo = () => {
return info;
};

this.__tests = new TestMethods();

//*****************************************************************
// internal methods
//*****************************************************************
Expand Down Expand Up @@ -386,9 +389,9 @@ function Connection(options) {
});

_events.on("collation_changed", () => {
const stream = _out.stream;
const stream = _out.getStream();
_out = new PacketOutputStream(opts, info);
_out.setStreamer(stream);
_out.setStream(stream);
});
};

Expand Down Expand Up @@ -427,8 +430,7 @@ function Connection(options) {
_socket.setTimeout(opts.connectTimeout, _connectTimeoutReached.bind(this));
}

const packetInputStream = _in;
_socket.on("data", chunk => packetInputStream.onData(chunk));
_socket.on("data", _in.onData.bind(_in));
_socket.on("error", _socketError);
_socket.on("end", _socketError);
_socket.on("timeout", _socketError);
Expand All @@ -440,7 +442,7 @@ function Connection(options) {

_socket.writeBuf = _socket.write;
_socket.flush = () => {};
_out.setStreamer(_socket);
_out.setStream(_socket);
};

/**
Expand All @@ -450,7 +452,7 @@ function Connection(options) {
*/
const _succeedAuthentication = () => {
if (opts.compress) {
_out.setStreamer(new CompressionOutputStream(_socket, opts, info));
_out.setStream(new CompressionOutputStream(_socket, opts, info));
_in = new CompressionInputStream(_in, _receiveQueue, opts, info);
_socket.removeAllListeners("data");
_socket.on("data", _in.onData.bind(_in));
Expand Down Expand Up @@ -499,7 +501,7 @@ function Connection(options) {
_socket.removeAllListeners("data");
_socket = secureSocket;

_out.setStreamer(secureSocket);
_out.setStream(secureSocket);
} catch (err) {
_socketError(err);
}
Expand Down
59 changes: 27 additions & 32 deletions src/io/compression-input-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,33 @@ class CompressionInputStream {
this.headerLen = 0;
}

/**
* Read 4 bytes header.
*
* @param chunk chunk
* @param chunkLen chunk length
* @returns packet length if header is completely received
* @private
*/
readHeader(chunk, chunkLen) {
if (this.remainingLen) return this.remainingLen;
while (chunkLen - this.pos > 0) {
this.header[this.headerLen++] = chunk[this.pos++];
if (this.headerLen === 7) {
this.compressPacketLen = this.header[0] | (this.header[1] << 8) | (this.header[2] << 16);
this.packetLen = this.header[4] | (this.header[5] << 8) | (this.header[6] << 16);
if (this.packetLen === 0) this.packetLen = this.compressPacketLen;
return this.compressPacketLen;
}
}
return null;
}

onData(chunk) {
this.pos = 0;
let pos = 0;
let length;
const chunkLen = chunk.length;

do {
if ((length = this.readHeader(chunk, chunkLen))) {
if (chunkLen - this.pos >= length) {
const buf = chunk.slice(this.pos, this.pos + length);
this.pos += length;

if (this.remainingLen) {
length = this.remainingLen;
} else {
length = null;
while (chunkLen - pos > 0) {
this.header[this.headerLen++] = chunk[pos++];
if (this.headerLen === 7) {
this.compressPacketLen = this.header[0] | (this.header[1] << 8) | (this.header[2] << 16);
this.packetLen = this.header[4] | (this.header[5] << 8) | (this.header[6] << 16);
if (this.packetLen === 0) this.packetLen = this.compressPacketLen;
length = this.compressPacketLen;
break;
}
}
}

if (length) {
if (chunkLen - pos >= length) {
const buf = chunk.slice(pos, pos + length);
pos += length;
if (this.parts) {
this.parts.push(buf);
this.partsTotalLen += length;
Expand All @@ -122,19 +117,19 @@ class CompressionInputStream {
}
this.resetHeader();
} else {
const buf = chunk.slice(this.pos, chunkLen);
const buf = chunk.slice(pos, chunkLen);
if (!this.parts) {
this.parts = [buf];
this.partsTotalLen = chunkLen - this.pos;
this.partsTotalLen = chunkLen - pos;
} else {
this.parts.push(buf);
this.partsTotalLen += chunkLen - this.pos;
this.partsTotalLen += chunkLen - pos;
}
this.remainingLen = length - (chunkLen - this.pos);
this.remainingLen = length - (chunkLen - pos);
return;
}
}
} while (this.pos < chunkLen);
} while (pos < chunkLen);
}
}

Expand Down
55 changes: 25 additions & 30 deletions src/io/packet-input-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,6 @@ class PacketInputStream {
this.headerLen = 0;
}

/**
* Read 4 bytes header.
*
* @param chunk chunk
* @param chunkLen chunk length
* @returns packet length if header is completely received
* @private
*/
readHeader(chunk, chunkLen) {
if (this.remainingLen) return this.remainingLen;
while (chunkLen - this.pos > 0) {
this.header[this.headerLen++] = chunk[this.pos++];
if (this.headerLen === 4) {
this.packetLen = this.header[0] | (this.header[1] << 8) | (this.header[2] << 16);
return this.packetLen;
}
}
return null;
}

currentCmd() {
let cmd;
while ((cmd = this.receiveQueue.peek())) {
Expand All @@ -93,15 +73,30 @@ class PacketInputStream {
}

onData(chunk) {
this.pos = 0;
let pos = 0;
let length;
const chunkLen = chunk.length;

do {
if ((length = this.readHeader(chunk, chunkLen))) {
if (chunkLen - this.pos >= length) {
const buf = chunk.slice(this.pos, this.pos + length);
this.pos += length;
//read header
if (this.remainingLen) {
length = this.remainingLen;
} else {
length = null;
while (chunkLen - pos > 0) {
this.header[this.headerLen++] = chunk[pos++];
if (this.headerLen === 4) {
this.packetLen = this.header[0] | (this.header[1] << 8) | (this.header[2] << 16);
length = this.packetLen;
break;
}
}
}

if (length) {
if (chunkLen - pos >= length) {
const buf = chunk.slice(pos, pos + length);
pos += length;
if (this.parts) {
this.parts.push(buf);
this.partsTotalLen += length;
Expand All @@ -126,19 +121,19 @@ class PacketInputStream {
}
this.resetHeader();
} else {
const buf = chunk.slice(this.pos, chunkLen);
const buf = chunk.slice(pos, chunkLen);
if (!this.parts) {
this.parts = [buf];
this.partsTotalLen = chunkLen - this.pos;
this.partsTotalLen = chunkLen - pos;
} else {
this.parts.push(buf);
this.partsTotalLen += chunkLen - this.pos;
this.partsTotalLen += chunkLen - pos;
}
this.remainingLen = length - (chunkLen - this.pos);
this.remainingLen = length - (chunkLen - pos);
return;
}
}
} while (this.pos < chunkLen);
} while (pos < chunkLen);
}
}

Expand Down
53 changes: 22 additions & 31 deletions src/io/packet-output-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ class PacketOutputStream {
}
}

setStreamer(stream) {
setStream(stream) {
this.stream = stream;
}

getStream() {
return this.stream;
}

growBuffer(len) {
let newCapacity;
if (len + this.pos < MEDIUM_BUFFER_SIZE) {
Expand Down Expand Up @@ -85,6 +89,11 @@ class PacketOutputStream {
this.pos += 2;
}

writeInt16AtPos(initPos) {
this.buf[initPos] = this.pos - initPos - 2;
this.buf[initPos + 1] = (this.pos - initPos - 2) >> 8;
}

writeInt24(value) {
if (this.pos + 3 >= this.buf.length) {
let b = Buffer.allocUnsafe(3);
Expand Down Expand Up @@ -193,11 +202,6 @@ class PacketOutputStream {
this._writeDatePart(year, mon, day, hour, min, sec, ms);
}

writeLengthCodedBuffer(arr) {
this.writeLengthCoded(arr.length);
this.writeBuffer(arr, 0, arr.length);
}

writeBuffer(arr, off, len) {
if (len > this.buf.length - this.pos) {
if (this.buf.length !== MAX_BUFFER_SIZE) {
Expand Down Expand Up @@ -478,35 +482,11 @@ class PacketOutputStream {
this.buf = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
}
} else {
this.buf = this.allocateBuffer(remainingLen + 4);
this.buf = allocateBuffer(remainingLen + 4);
this.pos = 4;
}
}

allocateBuffer(len) {
if (len < SMALL_BUFFER_SIZE) {
return Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
} else if (len < MEDIUM_BUFFER_SIZE) {
return Buffer.allocUnsafe(MEDIUM_BUFFER_SIZE);
} else if (len < LARGE_BUFFER_SIZE) {
return Buffer.allocUnsafe(LARGE_BUFFER_SIZE);
}
return Buffer.allocUnsafe(MAX_BUFFER_SIZE);
}

growBuffer(len) {
let newCapacity;
if (len + this.pos < MEDIUM_BUFFER_SIZE) {
newCapacity = MEDIUM_BUFFER_SIZE;
} else if (len + this.pos < LARGE_BUFFER_SIZE) {
newCapacity = LARGE_BUFFER_SIZE;
} else newCapacity = MAX_BUFFER_SIZE;

let newBuf = Buffer.allocUnsafe(newCapacity);
this.buf.copy(newBuf, 0, 0, this.pos);
this.buf = newBuf;
}

writeEmptyPacket() {
const emptyBuf = Buffer.from([0x00, 0x00, 0x00, this.cmd.sequenceNo]);
this.cmd.incrementSequenceNo(1);
Expand All @@ -527,4 +507,15 @@ class PacketOutputStream {
}
}

function allocateBuffer(len) {
if (len < SMALL_BUFFER_SIZE) {
return Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
} else if (len < MEDIUM_BUFFER_SIZE) {
return Buffer.allocUnsafe(MEDIUM_BUFFER_SIZE);
} else if (len < LARGE_BUFFER_SIZE) {
return Buffer.allocUnsafe(LARGE_BUFFER_SIZE);
}
return Buffer.allocUnsafe(MAX_BUFFER_SIZE);
}

module.exports = PacketOutputStream;
5 changes: 2 additions & 3 deletions test/integration/test-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,10 @@ describe("connection", () => {
"'autocommit, character_set_client, character_set_connection, character_set_results, time_zone'"
);
}

assert.equal(conn._test_collation(), Collations.fromName("UTF8MB4_UNICODE_CI"));
assert.equal(conn.__tests.getCollation(), Collations.fromName("UTF8MB4_UNICODE_CI"));
conn.query("SET time_zone = '+00:00', character_set_client = cp850", (err, rows) => {
if (err) done(err);
assert.equal(conn._test_collation(), Collations.fromName("CP850_GENERAL_CI"));
assert.equal(conn.__tests.getCollation(), Collations.fromName("CP850_GENERAL_CI"));
conn.end(() => done());
});
});
Expand Down
2 changes: 1 addition & 1 deletion test/integration/test-ssl.js
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,6 @@ describe("ssl", function() {
function checkProtocol(conn, protocol) {
const ver = process.version.substring(1).split(".");
if (ver[0] > 5 || (ver[0] === 5 && ver[1] === 7)) {
assert.equal(conn._test_socket().getProtocol(), protocol);
assert.equal(conn.__tests.getSocket().getProtocol(), protocol);
}
}
Loading

0 comments on commit c54f430

Please sign in to comment.