Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

Commit 2356ffb

Browse files
Sebastian Hallum Clarkembroadst
authored andcommitted
feat(compression): implement wire protocol compression support
NODE-1016, NODE-1027 * core work for getting compression message to travel * Inital compression message test with mock server * Renaming test * typo in runner.js * only emit fullsetup event once * Extending tests * Can decompress snappy messages * Adding zlib support * Reducing console logging verbosity * Debugging mock * Fixing mock to be able to send OP_COMPRESSED * Added mock tests for various OP_COMPRESSED reception cases * Edit no compression test to include doc insertion * Compression tests attempt inserting * Commenting out unneeded test * Added test to check for invalid compressor args * Allowing sending of outbound OP_COMPRESSED * Don't compress uncompressible messages * Removing changes to server_tests.js * Refactoring * Increasing test verbosity * (WIP) receiving compressed messages in the mock * (WIP) convering compressed mock msgs to original messages * Mock server can receive compressed messages * CRUD tests for mock server w/ compression * Mock test bug fixes * Added tests to mock to compress commands * Testing that uncompressible commands aren't compressed * Note in responses whether the message was originally compressed * Starting integration testing * Adding Wire Protocol Version detection to tests * Renaming test * Some auth tests in server_tests * Removed erroneous test statements * refactor(test-runner): don't check mongodb version if skip specified * Use actual server type in standalone SDAM events: Currently the server type inside the topologyDescriptionChanged event for a single topology is 'Standalone', which is unexpected. According to the SDAM specification a TopologyDescription has a list of ServerDescriptions, which had a type field indicating the type of server it is and does not state that the server type never changes if the topology is single. My expectation is that a single connection to a replica set primary, for example, would give me a topology type of 'Single' and a server type of 'RSPrimary', not 'Standalone' like it currently does. The serverDescriptionChanged event properly has the type set. * test(sdam): add test for emitting correct SDAM server type * Allow running tests with a particular version of mongod using the -v flag * Tidying comments * Create facility to spawn new mongod with options * Create compression test * Tidying snappy compression test implementation * Removing unneeded import * Tidying tests * Switch to using deflate/inflate instead of zip/unzip * Implementing async compression of outbound OP_COMPRESSED * Changing clashing port number * Using strict mode * Switching to use forEach * Tidying requires() * Removing version flag Use environment variable instead * Tidying up * Fixing typo * [NODE-1021] [NODE-1023] OP_COMPRESSED reception support and mock testing * core work for getting compression message to travel * Inital compression message test with mock server * Renaming test * typo in runner.js * Extending tests * Can decompress snappy messages * Adding zlib support * Reducing console logging verbosity * Debugging mock * Fixing mock to be able to send OP_COMPRESSED * Added mock tests for various OP_COMPRESSED reception cases * Edit no compression test to include doc insertion * Compression tests attempt inserting * Commenting out unneeded test * Added test to check for invalid compressor args * Add snappy to package.json * Removing use of "in" and debugging statements * Changing magic number for a constant * Remove extraneous console log * Fixing typo * Refactoring and bailing early * Fixing typo * chore(travis): only run CI for node LTS versions 4 & 6 * chore(travis): use containerized trusty builds * Changing OP_COMMAND to OP_COMPRESSED * Small refactor for reading a header * (WIP) refactoring decompression to be async * WIP Async decompression * Tweaking test to use config port * WIP Making decompression async * Validate compressorId * Async decompression now working * Updating tests (copying in from NODE-1027) * Adding strict mode So we can use "let", etc… on older versions of Node * Moving snappy to devDependencies * Simplifying code * Reducing reliance on constants * Fixing mis-named variable * Removing unneeded and unused function * Defining OP_COMMAND constant once * Improving code readability * Making snappy optional * Require_Optional Snappy * Making decompression code clearer * Making error message more informative * Removing debug message * Reordering function arguments so that "self" is first * Removing incorrect environment This environment has not yet been created in this branch * Using constants to decide compression mechanism * Removing unused code * Using !== instead of != * Using pre-release require_master * Tidying up * Tidying use of constants * Standardising use of compressorID and simplifying deciding which ID to use * Tidying up * Removing unneeded requires * Updating Travis Node version, require_optional version, and creating package_lock * Tidying Response creation * Removing MongoDB 2.4.x from Travis * Moving OPCODE numbers to wireprotocol/shared.js * core work for getting compression message to travel * Inital compression message test with mock server * Extending tests * Can decompress snappy messages * Adding zlib support * Reducing console logging verbosity * Debugging mock * Fixing mock to be able to send OP_COMPRESSED * Added mock tests for various OP_COMPRESSED reception cases * Compression tests attempt inserting * Commenting out unneeded test * Allowing sending of outbound OP_COMPRESSED * Don't compress uncompressible messages * Removing changes to server_tests.js * Refactoring * (WIP) receiving compressed messages in the mock * (WIP) convering compressed mock msgs to original messages * Mock server can receive compressed messages * CRUD tests for mock server w/ compression * Mock test bug fixes * Added tests to mock to compress commands * Starting integration testing * Adding Wire Protocol Version detection to tests * Renaming test * Some auth tests in server_tests * Removed erroneous test statements * Allow running tests with a particular version of mongod using the -v flag * Tidying comments * Create facility to spawn new mongod with options * Create compression test * Tidying snappy compression test implementation * Tidying tests * Switch to using deflate/inflate instead of zip/unzip * Implementing async compression of outbound OP_COMPRESSED * Switching to use forEach * Removing version flag Use environment variable instead * Tidying up * Post-rebase tidying * Tidying code * Melding branches together * Fixing comment * Adding test topologies * Improving how zlib compression level is set There is no need to tell zlib to use the default compression level. * Fixing compatibility with Node v4 Node v4 seems to have difficulty supporting Buffer.fill(someBuffer). * Moving parseHeader * Moving compressorIDs * Moving uncompressibleCommands * Moving compress and decompress * Moving operation construction out of promise * Removing use of promises in command serialization * Fixing typo * Refactoring hasUncompressibleCommands
1 parent 966ff6d commit 2356ffb

File tree

13 files changed

+475
-126
lines changed

13 files changed

+475
-126
lines changed

lib/connection/connection.js

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ var inherits = require('util').inherits
77
, crypto = require('crypto')
88
, f = require('util').format
99
, debugOptions = require('./utils').debugOptions
10-
, parseHeader = require('./utils').parseHeader
11-
, compressorIDs = require('./utils').compressorIDs
10+
, parseHeader = require('../wireprotocol/shared').parseHeader
11+
, decompress = require('../wireprotocol/compression').decompress
1212
, Response = require('./commands').Response
1313
, MongoError = require('../error')
1414
, Logger = require('./logger')
@@ -82,7 +82,7 @@ var Connection = function(messageHandler, options) {
8282
this.host = options.host || 'localhost';
8383
this.family = typeof options.family == 'number' ? options.family : 4;
8484
this.keepAlive = typeof options.keepAlive == 'boolean' ? options.keepAlive : true;
85-
this.keepAliveInitialDelay = typeof options.keepAliveInitialDelay == 'number'
85+
this.keepAliveInitialDelay = typeof options.keepAliveInitialDelay == 'number'
8686
? options.keepAliveInitialDelay : 300000;
8787
this.noDelay = typeof options.noDelay == 'boolean' ? options.noDelay : true;
8888
this.connectionTimeout = typeof options.connectionTimeout == 'number'
@@ -221,23 +221,6 @@ var closeHandler = function(self) {
221221
}
222222
}
223223

224-
// Decompress a message using the given compressor
225-
var decompress = function(compressorID, compressedData, callback) {
226-
if (compressorID < 0 || compressorID > compressorIDs.length) {
227-
throw new Error('Server sent message compressed using an unsupported compressor. (Received compressor ID ' + compressorID + ')');
228-
}
229-
switch (compressorID) {
230-
case compressorIDs.snappy:
231-
Snappy.uncompress(compressedData, callback);
232-
break;
233-
case compressorIDs.zlib:
234-
zlib.inflate(compressedData, callback);
235-
break;
236-
default:
237-
callback(null, compressedData);
238-
}
239-
}
240-
241224
// Handle a message once it is recieved
242225
var emitMessageHandler = function (self, message) {
243226
var msgHeader = parseHeader(message);

lib/connection/pool.js

Lines changed: 99 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@ var inherits = require('util').inherits,
88
f = require('util').format,
99
Query = require('./commands').Query,
1010
CommandResult = require('./command_result'),
11-
assign = require('../utils').assign;
11+
assign = require('../utils').assign,
12+
Snappy = require('./utils').retrieveSnappy(),
13+
zlib = require('zlib'),
14+
MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE,
15+
opcodes = require('../wireprotocol/shared').opcodes,
16+
compress = require('../wireprotocol/compression').compress,
17+
compressorIDs = require('../wireprotocol/compression').compressorIDs,
18+
uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands;
1219

1320
var MongoCR = require('../auth/mongocr')
1421
, X509 = require('../auth/x509')
@@ -918,6 +925,56 @@ Pool.prototype.destroy = function(force) {
918925
checkStatus();
919926
}
920927

928+
// Prepare the buffer that Pool.prototype.write() uses to send to the server
929+
var serializeCommands = function(self, commands, result, callback) {
930+
// Base case when there are no more commands to serialize
931+
if (commands.length === 0) return callback(null, result);
932+
933+
// Pop off the zeroth command and serialize it
934+
var thisCommand = commands.shift();
935+
var originalCommandBuffer = thisCommand.toBin();
936+
937+
// Check whether we and the server have agreed to use a compressor
938+
if (self.options.agreedCompressor && !hasUncompressibleCommands(thisCommand)) {
939+
// Transform originalCommandBuffer into OP_COMPRESSED
940+
var concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer);
941+
var messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
942+
943+
// Extract information needed for OP_COMPRESSED from the uncompressed message
944+
var originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
945+
946+
// Compress the message body
947+
compress(self, messageToBeCompressed, function(err, compressedMessage) {
948+
if (err) return callback(err, null);
949+
950+
// Create the msgHeader of OP_COMPRESSED
951+
var msgHeader = new Buffer(MESSAGE_HEADER_SIZE);
952+
msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + 9 + compressedMessage.length, 0); // messageLength
953+
msgHeader.writeInt32LE(thisCommand.requestId, 4); // requestID
954+
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
955+
msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode
956+
957+
// Create the compression details of OP_COMPRESSED
958+
var compressionDetails = new Buffer(9);
959+
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
960+
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
961+
compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID
962+
963+
// Push the concatenation of the OP_COMPRESSED message onto results
964+
result.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
965+
966+
// Continue recursing through the commands array
967+
serializeCommands(self, commands, result, callback);
968+
})
969+
} else {
970+
// Push the serialization of the command onto results
971+
result.push(originalCommandBuffer);
972+
973+
// Continue recursing through the commands array
974+
serializeCommands(self, commands, result, callback);
975+
}
976+
}
977+
921978
/**
922979
* Write a message to MongoDB
923980
* @method
@@ -933,6 +990,11 @@ Pool.prototype.write = function(commands, options, cb) {
933990
// Always have options
934991
options = options || {};
935992

993+
// We need to have a callback function unless the message returns no response
994+
if(!(typeof cb == 'function') && !options.noResponse) {
995+
throw new MongoError('write method must provide a callback');
996+
}
997+
936998
// Pool was destroyed error out
937999
if(this.state == DESTROYED || this.state == DESTROYING) {
9381000
// Callback with an error
@@ -968,25 +1030,6 @@ Pool.prototype.write = function(commands, options, cb) {
9681030
cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false
9691031
};
9701032

971-
var buffer = null
972-
973-
if(Array.isArray(commands)) {
974-
buffer = [];
975-
976-
for(var i = 0; i < commands.length; i++) {
977-
buffer.push(commands[i].toBin());
978-
}
979-
980-
// Get the requestId
981-
operation.requestId = commands[commands.length - 1].requestId;
982-
} else {
983-
operation.requestId = commands.requestId;
984-
buffer = commands.toBin();
985-
}
986-
987-
// Set the buffers
988-
operation.buffer = buffer;
989-
9901033
// Set the options for the parsing
9911034
operation.promoteLongs = typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true;
9921035
operation.promoteValues = typeof options.promoteValues == 'boolean' ? options.promoteValues : true;
@@ -997,7 +1040,6 @@ Pool.prototype.write = function(commands, options, cb) {
9971040
operation.command = typeof options.command == 'boolean' ? options.command : false;
9981041
operation.fullResult = typeof options.fullResult == 'boolean' ? options.fullResult : false;
9991042
operation.noResponse = typeof options.noResponse == 'boolean' ? options.noResponse : false;
1000-
// operation.requestId = options.requestId;
10011043

10021044
// Optional per operation socketTimeout
10031045
operation.socketTimeout = options.socketTimeout;
@@ -1007,25 +1049,45 @@ Pool.prototype.write = function(commands, options, cb) {
10071049
operation.socketTimeout = options.socketTimeout;
10081050
}
10091051

1010-
// We need to have a callback function unless the message returns no response
1011-
if(!(typeof cb == 'function') && !options.noResponse) {
1012-
throw new MongoError('write method must provide a callback');
1052+
// Ensure commands is an array
1053+
if (!Array.isArray(commands)) {
1054+
commands = [commands];
10131055
}
10141056

1015-
// If we have a monitoring operation schedule as the very first operation
1016-
// Otherwise add to back of queue
1017-
if(options.monitoring) {
1018-
this.queue.unshift(operation);
1019-
} else {
1020-
this.queue.push(operation);
1021-
}
1057+
// Get the requestId
1058+
operation.requestId = commands[commands.length - 1].requestId;
10221059

1023-
// Attempt to execute the operation
1024-
if(!self.executing) {
1025-
process.nextTick(function() {
1026-
_execute(self)();
1027-
});
1028-
}
1060+
// Prepare the operation buffer
1061+
serializeCommands(self, commands, [], function(err, serializedCommands) {
1062+
if (err) throw err;
1063+
1064+
// Set the operation's buffer to the serialization of the commands
1065+
operation.buffer = serializedCommands;
1066+
1067+
// If we have a monitoring operation schedule as the very first operation
1068+
// Otherwise add to back of queue
1069+
if(options.monitoring) {
1070+
self.queue.unshift(operation);
1071+
} else {
1072+
self.queue.push(operation);
1073+
}
1074+
1075+
// Attempt to execute the operation
1076+
if(!self.executing) {
1077+
process.nextTick(function() {
1078+
_execute(self)();
1079+
});
1080+
}
1081+
})
1082+
1083+
}
1084+
1085+
// Return whether a command contains an uncompressible command term
1086+
// Will return true if command contains no uncompressible command terms
1087+
var hasUncompressibleCommands = function(command) {
1088+
return uncompressibleCommands.some(function(cmd) {
1089+
return command.query.hasOwnProperty(cmd);
1090+
});
10291091
}
10301092

10311093
// Remove connection method

lib/connection/utils.js

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -97,25 +97,10 @@ var retrieveSnappy = function() {
9797
return snappy;
9898
}
9999

100-
// Parses the header of a wire protocol message
101-
var parseHeader = function(message) {
102-
return {
103-
length: message.readInt32LE(0),
104-
requestId: message.readInt32LE(4),
105-
responseTo: message.readInt32LE(8),
106-
opCode: message.readInt32LE(12)
107-
}
108-
}
109-
110-
exports.compressorIDs = {
111-
snappy: 1,
112-
zlib: 2
113-
}
114100
exports.setProperty = setProperty;
115101
exports.getProperty = getProperty;
116102
exports.getSingleProperty = getSingleProperty;
117103
exports.copy = copy;
118104
exports.debugOptions = debugOptions;
119105
exports.retrieveBSON = retrieveBSON;
120106
exports.retrieveSnappy = retrieveSnappy;
121-
exports.parseHeader = parseHeader;

lib/topologies/server.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,20 @@ var eventHandler = function(self, event) {
265265
return;
266266
}
267267

268+
// Determine whether the server is instructing us to use a compressor
269+
if (result.result && result.result.compression) {
270+
for (var i = 0; i < self.s.compression.compressors.length; i++) {
271+
if (result.result.compression.indexOf(self.s.compression.compressors[i]) > -1) {
272+
self.s.pool.options.agreedCompressor = self.s.compression.compressors[i]
273+
break;
274+
}
275+
}
276+
277+
if (self.s.compression.zlibCompressionLevel) {
278+
self.s.pool.options.zlibCompressionLevel = self.s.compression.zlibCompressionLevel;
279+
}
280+
}
281+
268282
// Ensure no error emitted after initial connect when reconnecting
269283
self.initalConnect = false;
270284
// Save the ismaster

lib/topologies/shared.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ function createCompressionInfo(options) {
6464
// Check that all supplied compressors are valid
6565
options.compression.compressors.forEach(function(compressor) {
6666
if (compressor !== 'snappy' && compressor !== 'zlib') {
67-
throw new Error('compressors must be at least one of snappy or zlib')
67+
throw new Error('compressors must be at least one of snappy or zlib');
6868
}
6969
})
7070

71-
return options.compression.compressors
71+
return options.compression.compressors;
7272
}
7373

7474
function clone(object) {

lib/wireprotocol/compression.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
var Snappy = require('../connection/utils').retrieveSnappy(),
2+
zlib = require('zlib');
3+
4+
var compressorIDs = {
5+
snappy: 1,
6+
zlib: 2
7+
}
8+
9+
var uncompressibleCommands = [
10+
'ismaster',
11+
'saslStart',
12+
'saslContinue',
13+
'getnonce',
14+
'authenticate',
15+
'createUser',
16+
'updateUser',
17+
'copydbSaslStart',
18+
'copydbgetnonce',
19+
'copydb'
20+
];
21+
22+
23+
// Facilitate compressing a message using an agreed compressor
24+
var compress = function(self, dataToBeCompressed, callback) {
25+
switch (self.options.agreedCompressor) {
26+
case 'snappy':
27+
Snappy.compress(dataToBeCompressed, callback);
28+
break;
29+
case 'zlib':
30+
// Determine zlibCompressionLevel
31+
var zlibOptions = {};
32+
if (self.options.zlibCompressionLevel) {
33+
zlibOptions.level = self.options.zlibCompressionLevel
34+
}
35+
zlib.deflate(dataToBeCompressed, zlibOptions, callback);
36+
break;
37+
default:
38+
throw new Error('Attempt to compress message using unknown compressor \"' + self.options.agreedCompressor + '\".');
39+
}
40+
}
41+
42+
// Decompress a message using the given compressor
43+
var decompress = function(compressorID, compressedData, callback) {
44+
if (compressorID < 0 || compressorID > compressorIDs.length) {
45+
throw new Error('Server sent message compressed using an unsupported compressor. (Received compressor ID ' + compressorID + ')');
46+
}
47+
switch (compressorID) {
48+
case compressorIDs.snappy:
49+
Snappy.uncompress(compressedData, callback);
50+
break;
51+
case compressorIDs.zlib:
52+
zlib.inflate(compressedData, callback);
53+
break;
54+
default:
55+
callback(null, compressedData);
56+
}
57+
}
58+
59+
module.exports = {
60+
compressorIDs: compressorIDs,
61+
uncompressibleCommands: uncompressibleCommands,
62+
compress: compress,
63+
decompress: decompress
64+
}

lib/wireprotocol/shared.js

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,19 @@ var getReadPreference = function(cmd, options) {
3737
return readPreference;
3838
}
3939

40+
// Parses the header of a wire protocol message
41+
var parseHeader = function(message) {
42+
return {
43+
length: message.readInt32LE(0),
44+
requestId: message.readInt32LE(4),
45+
responseTo: message.readInt32LE(8),
46+
opCode: message.readInt32LE(12)
47+
}
48+
}
49+
4050
module.exports = {
4151
getReadPreference: getReadPreference,
4252
MESSAGE_HEADER_SIZE: MESSAGE_HEADER_SIZE,
43-
opcodes: opcodes
53+
opcodes: opcodes,
54+
parseHeader: parseHeader
4455
}

0 commit comments

Comments
 (0)