From 7aa814a8a13f69bddec58a70ac9d390ed8612e37 Mon Sep 17 00:00:00 2001 From: rochdev Date: Mon, 9 Nov 2020 16:17:34 -0500 Subject: [PATCH] fix mongodb integration not instrumenting correctly --- .../datadog-plugin-mongodb-core/src/index.js | 175 +------------- .../datadog-plugin-mongodb-core/src/legacy.js | 59 +++++ .../src/unified.js | 70 ++++++ .../datadog-plugin-mongodb-core/src/util.js | 118 ++++++++++ .../test/{index.spec.js => core.spec.js} | 93 ++++---- .../test/mongodb.spec.js | 219 ++++++++++++++++++ 6 files changed, 512 insertions(+), 222 deletions(-) create mode 100644 packages/datadog-plugin-mongodb-core/src/legacy.js create mode 100644 packages/datadog-plugin-mongodb-core/src/unified.js create mode 100644 packages/datadog-plugin-mongodb-core/src/util.js rename packages/datadog-plugin-mongodb-core/test/{index.spec.js => core.spec.js} (84%) create mode 100644 packages/datadog-plugin-mongodb-core/test/mongodb.spec.js diff --git a/packages/datadog-plugin-mongodb-core/src/index.js b/packages/datadog-plugin-mongodb-core/src/index.js index f72b27f2f4..98f37d6fa4 100644 --- a/packages/datadog-plugin-mongodb-core/src/index.js +++ b/packages/datadog-plugin-mongodb-core/src/index.js @@ -1,175 +1,6 @@ 'use strict' -const analyticsSampler = require('../../dd-trace/src/analytics_sampler') +const unified = require('./unified') +const legacy = require('./legacy') -function createWrapOperation (tracer, config, operationName) { - return function wrapOperation (operation) { - return function operationWithTrace (ns, ops) { - const index = arguments.length - 1 - const callback = arguments[index] - - if (typeof callback !== 'function') return operation.apply(this, arguments) - - const scope = tracer.scope() - const childOf = scope.active() - const span = tracer.startSpan('mongodb.query', { childOf }) - - addTags(span, tracer, config, ns, ops, this, operationName) - - analyticsSampler.sample(span, config.analytics) - - arguments[index] = wrapCallback(tracer, span, callback) - - return scope.bind(operation, span).apply(this, arguments) - } - } -} - -function createWrapNext (tracer, config) { - return function wrapNext (next) { - return function nextWithTrace (cb) { - const scope = tracer.scope() - const childOf = scope.active() - const span = tracer.startSpan('mongodb.query', { childOf }) - - addTags(span, tracer, config, this.ns, this.cmd, this.topology) - - if (this.cursorState) { - span.addTags({ - 'mongodb.cursor.index': this.cursorState.cursorIndex - }) - } - - scope.bind(next, span).call(this, wrapCallback(tracer, span, cb, this)) - } - } -} - -function addTags (span, tracer, config, ns, cmd, topology, operationName) { - const query = getQuery(cmd) - const resource = getResource(ns, cmd, query, operationName) - - span.addTags({ - 'service.name': config.service || `${tracer._service}-mongodb`, - 'resource.name': resource, - 'span.type': 'mongodb', - 'db.name': ns - }) - - if (query) { - span.setTag('mongodb.query', query) - } - - addHost(span, topology) -} - -function addHost (span, topology) { - const options = topology && topology.s && topology.s.options - - if (options && options.host && options.port) { - span.addTags({ - 'out.host': topology.s.options.host, - 'out.port': topology.s.options.port - }) - } -} - -function wrapCallback (tracer, span, done, cursor) { - return tracer.scope().bind((err, res) => { - if (err) { - span.addTags({ - 'error.type': err.name, - 'error.msg': err.message, - 'error.stack': err.stack - }) - } - - if (cursor) { - addHost(span, cursor.server) - } - - span.finish() - - if (done) { - done(err, res) - } - }) -} - -function getQuery (cmd) { - return cmd.query && JSON.stringify(sanitize(cmd.query)) -} - -function getResource (ns, cmd, query, operationName) { - if (!operationName) { - operationName = Object.keys(cmd)[0] - } - - const parts = [operationName, ns] - - if (query) { - parts.push(query) - } - - return parts.join(' ') -} - -function sanitize (input) { - const output = {} - - if (!isObject(input) || Buffer.isBuffer(input) || isBSON(input)) return '?' - - for (const key in input) { - if (typeof input[key] === 'function') continue - - output[key] = sanitize(input[key]) - } - - return output -} - -function isObject (val) { - return typeof val === 'object' && val !== null && !(val instanceof Array) -} - -function isBSON (val) { - return val && val._bsontype -} - -function patch (core, tracer, config) { - this.wrap(core.Server.prototype, 'command', createWrapOperation(tracer, config)) - this.wrap(core.Server.prototype, 'insert', createWrapOperation(tracer, config, 'insert')) - this.wrap(core.Server.prototype, 'update', createWrapOperation(tracer, config, 'update')) - this.wrap(core.Server.prototype, 'remove', createWrapOperation(tracer, config, 'remove')) - - if (core.Cursor.prototype.next) { - this.wrap(core.Cursor.prototype, 'next', createWrapNext(tracer, config)) - } else if (core.Cursor.prototype._next) { - this.wrap(core.Cursor.prototype, '_next', createWrapNext(tracer, config)) - } -} - -function unpatch (core) { - this.unwrap(core.Server.prototype, 'command') - this.unwrap(core.Server.prototype, 'insert') - this.unwrap(core.Server.prototype, 'update') - this.unwrap(core.Server.prototype, 'remove') - this.unwrap(core.Cursor.prototype, 'next') - this.unwrap(core.Cursor.prototype, '_next') -} - -module.exports = [ - { - name: 'mongodb', - versions: ['>=3.3'], - file: 'lib/core/index.js', - patch, - unpatch - }, - { - name: 'mongodb-core', - versions: ['>=2'], - patch, - unpatch - } -] +module.exports = [].concat(unified, legacy) diff --git a/packages/datadog-plugin-mongodb-core/src/legacy.js b/packages/datadog-plugin-mongodb-core/src/legacy.js new file mode 100644 index 0000000000..9679cc93d9 --- /dev/null +++ b/packages/datadog-plugin-mongodb-core/src/legacy.js @@ -0,0 +1,59 @@ +'use strict' + +const { instrument } = require('./util') + +function createWrapCommand (tracer, config, name) { + return function wrapCommand (command) { + return function commandWithTrace (ns, ops) { + return instrument(command, this, arguments, this, ns, ops, tracer, config, { name }) + } + } +} + +function createWrapQuery (tracer, config) { + return function wrapQuery (query) { + return function queryWithTrace () { + const pool = this.server.s.pool + const ns = this.ns + const ops = this.cmd + + return instrument(query, this, arguments, pool, ns, ops, tracer, config) + } + } +} + +function createWrapCursor (tracer, config, name) { + return function wrapCursor (cursor) { + return function cursorWithTrace () { + const pool = this.server.s.pool + const ns = this.ns + + return instrument(cursor, this, arguments, pool, ns, {}, tracer, config, { name }) + } + } +} + +module.exports = [ + { + name: 'mongodb-core', + versions: ['2 - 3.1.9'], + patch ({ Cursor, Server }, tracer, config) { + this.wrap(Server.prototype, 'command', createWrapCommand(tracer, config)) + this.wrap(Server.prototype, 'insert', createWrapCommand(tracer, config, 'insert')) + this.wrap(Server.prototype, 'update', createWrapCommand(tracer, config, 'update')) + this.wrap(Server.prototype, 'remove', createWrapCommand(tracer, config, 'remove')) + this.wrap(Cursor.prototype, '_getmore', createWrapCursor(tracer, config, 'getMore')) + this.wrap(Cursor.prototype, '_find', createWrapQuery(tracer, config)) + this.wrap(Cursor.prototype, 'kill', createWrapCursor(tracer, config, 'killCursors')) + }, + unpatch ({ Cursor, Server }) { + this.unwrap(Server.prototype, 'command') + this.unwrap(Server.prototype, 'insert') + this.unwrap(Server.prototype, 'update') + this.unwrap(Server.prototype, 'remove') + this.unwrap(Cursor.prototype, '_getmore') + this.unwrap(Cursor.prototype, '_find') + this.unwrap(Cursor.prototype, 'kill') + } + } +] diff --git a/packages/datadog-plugin-mongodb-core/src/unified.js b/packages/datadog-plugin-mongodb-core/src/unified.js new file mode 100644 index 0000000000..0f56a024cc --- /dev/null +++ b/packages/datadog-plugin-mongodb-core/src/unified.js @@ -0,0 +1,70 @@ +'use strict' + +const { instrument } = require('./util') + +function createWrapCommand (tracer, config, name) { + return function wrapCommand (command) { + return function commandWithTrace (server, ns, ops) { + return instrument(command, this, arguments, server, ns, ops, tracer, config, { name }) + } + } +} + +function patch (wp, tracer, config) { + this.wrap(wp, 'command', createWrapCommand(tracer, config)) + this.wrap(wp, 'insert', createWrapCommand(tracer, config, 'insert')) + this.wrap(wp, 'update', createWrapCommand(tracer, config, 'update')) + this.wrap(wp, 'remove', createWrapCommand(tracer, config, 'remove')) + this.wrap(wp, 'query', createWrapCommand(tracer, config)) + this.wrap(wp, 'getMore', createWrapCommand(tracer, config, 'getMore')) + this.wrap(wp, 'killCursors', createWrapCommand(tracer, config, 'killCursors')) +} + +function unpatch (wp) { + this.unwrap(wp, 'command') + this.unwrap(wp, 'insert') + this.unwrap(wp, 'update') + this.unwrap(wp, 'remove') + this.unwrap(wp, 'query') + this.unwrap(wp, 'getMore') + this.unwrap(wp, 'killCursors') +} + +function patchClass (WireProtocol, tracer, config) { + this.wrap(WireProtocol.prototype, 'command', createWrapCommand(tracer, config)) +} + +function unpatchClass (WireProtocol) { + this.unwrap(WireProtocol.prototype, 'command') +} + +module.exports = [ + { + name: 'mongodb', + versions: ['>=3.3'], + file: 'lib/core/wireprotocol/index.js', + patch, + unpatch + }, + { + name: 'mongodb-core', + versions: ['>=3.2'], + file: 'lib/wireprotocol/index.js', + patch, + unpatch + }, + { + name: 'mongodb-core', + versions: ['~3.1.10'], + file: 'lib/wireprotocol/3_2_support.js', + patch: patchClass, + unpatch: unpatchClass + }, + { + name: 'mongodb-core', + versions: ['~3.1.10'], + file: 'lib/wireprotocol/2_6_support.js', + patch: patchClass, + unpatch: unpatchClass + } +] diff --git a/packages/datadog-plugin-mongodb-core/src/util.js b/packages/datadog-plugin-mongodb-core/src/util.js new file mode 100644 index 0000000000..7fc20b3755 --- /dev/null +++ b/packages/datadog-plugin-mongodb-core/src/util.js @@ -0,0 +1,118 @@ +'use strict' + +const analyticsSampler = require('../../dd-trace/src/analytics_sampler') + +function instrument (command, ctx, args, server, ns, ops, tracer, config, options = {}) { + const name = options.name || (ops && Object.keys(ops)[0]) + const index = args.length - 1 + const callback = args[index] + + if (typeof callback !== 'function') return command.apply(ctx, args) + + const span = startSpan(tracer, config, ns, ops, server, name) + + if (name !== 'getMore' && name !== 'killCursors') { + analyticsSampler.sample(span, config.analytics) + } + + args[index] = wrapCallback(tracer, span, callback) + + return tracer.scope().bind(command, span).apply(ctx, args) +} + +function startSpan (tracer, config, ns, ops, server, name) { + const scope = tracer.scope() + const childOf = scope.active() + const span = tracer.startSpan('mongodb.query', { childOf }) + + addTags(span, tracer, config, ns, ops, server, name) + + return span +} + +function wrapCallback (tracer, span, done) { + return tracer.scope().bind((err, res) => { + if (err) { + span.addTags({ + 'error.type': err.name, + 'error.msg': err.message, + 'error.stack': err.stack + }) + } + + span.finish() + + if (done) { + done(err, res) + } + }) +} + +function addTags (span, tracer, config, ns, cmd, topology, operationName) { + const query = getQuery(cmd) + const resource = getResource(ns, query, operationName) + + span.addTags({ + 'service.name': config.service || `${tracer._service}-mongodb`, + 'resource.name': resource, + 'span.type': 'mongodb', + 'db.name': ns + }) + + if (query) { + span.setTag('mongodb.query', query) + } + + addHost(span, topology) +} + +function addHost (span, topology) { + const options = topology && topology.s && topology.s.options + + if (options && options.host && options.port) { + span.addTags({ + 'out.host': topology.s.options.host, + 'out.port': topology.s.options.port + }) + } +} + +function getQuery (cmd) { + if (!cmd || typeof cmd !== 'object' || Array.isArray(cmd)) return + if (cmd.query) return JSON.stringify(sanitize(cmd.query)) + if (cmd.filter) return JSON.stringify(sanitize(cmd.filter)) +} + +function getResource (ns, query, operationName) { + const parts = [operationName, ns] + + if (query) { + parts.push(query) + } + + return parts.join(' ') +} + +function sanitize (input) { + const output = {} + + if (!isObject(input) || Buffer.isBuffer(input) || isBSON(input)) return '?' + + for (const key in input) { + if (typeof input[key] === 'function') continue + + output[key] = sanitize(input[key]) + } + + return output +} + +function isObject (val) { + return typeof val === 'object' && val !== null && !(val instanceof Array) +} + +function isBSON (val) { + return val && val._bsontype +} + +module.exports = { instrument } diff --git a/packages/datadog-plugin-mongodb-core/test/index.spec.js b/packages/datadog-plugin-mongodb-core/test/core.spec.js similarity index 84% rename from packages/datadog-plugin-mongodb-core/test/index.spec.js rename to packages/datadog-plugin-mongodb-core/test/core.spec.js index 569e147040..24ce1a6cac 100644 --- a/packages/datadog-plugin-mongodb-core/test/index.spec.js +++ b/packages/datadog-plugin-mongodb-core/test/core.spec.js @@ -1,25 +1,39 @@ 'use strict' +const semver = require('semver') const agent = require('../../dd-trace/test/plugins/agent') const plugin = require('../src') wrapIt() +const withTopologies = fn => { + withVersions(plugin, ['mongodb-core', 'mongodb'], (version, moduleName) => { + describe('using the server topology', () => { + fn(() => { + const { CoreServer, Server } = require(`../../../versions/${moduleName}@${version}`).get() + + return CoreServer || Server + }) + }) + + // TODO: use semver.subset when we can update semver + if (moduleName === 'mongodb-core' && !semver.intersects(version, '<3.2')) { + describe('using the unified topology', () => { + fn(() => require(`../../../versions/${moduleName}@${version}`).get().Topology) + }) + } + }) +} + describe('Plugin', () => { let server let id let tracer let collection - describe('mongodb-core', () => { - withVersions(plugin, ['mongodb', 'mongodb-core'], (version, moduleName) => { - const getServer = () => { - return moduleName === 'mongodb' - ? require(`../../../versions/${moduleName}@${version}`).get().CoreServer - : require(`../../../versions/${moduleName}@${version}`).get().Server - } - - const next = (cursor, cb) => { + describe('mongodb-core (core)', () => { + withTopologies(getServer => { + const next = (cursor, cb = () => {}) => { return cursor._next ? cursor._next(cb) : cursor.next(cb) @@ -219,55 +233,34 @@ describe('Plugin', () => { describe('cursor', () => { it('should do automatic instrumentation', done => { - agent - .use(traces => { - const span = traces[0][0] + let cursor - expect(span).to.have.property('name', 'mongodb.query') - expect(span).to.have.property('service', 'test-mongodb') - expect(span).to.have.property('type', 'mongodb') - expect(span.meta).to.have.property('db.name', `test.${collection}`) - expect(span.meta).to.have.property('out.host', 'localhost') - expect(span.metrics).to.have.property('out.port', 27017) - }) - .then(done) + Promise.all([ + agent + .use(traces => { + expect(traces[0][0].resource).to.equal(`find test.${collection} {}`) + }), + agent + .use(traces => { + expect(traces[0][0].resource).to.equal(`getMore test.${collection}`) + }), + agent + .use(traces => { + expect(traces[0][0].resource).to.equal(`killCursors test.${collection}`) + }) + ]) + .then(() => done()) .catch(done) - const cursor = server.cursor(`test.${collection}`, { - insert: `test.${collection}`, - documents: [{ a: 1 }] - }, {}) - - next(cursor) - }) - - it('should have the correct index', done => { - let cursor - - agent.use(() => { + server.insert(`test.${collection}`, [{ a: 1 }, { a: 2 }, { a: 3 }], {}, () => { cursor = server.cursor(`test.${collection}`, { find: `test.${collection}`, - query: {} + query: {}, + batchSize: 1 }, { batchSize: 1 }) - next(cursor) + next(cursor, () => next(cursor, () => cursor.kill(() => {}))) }) - - agent - .use(traces => { - expect(traces[0][0].metrics).to.have.property('mongodb.cursor.index', 0) - }) - .then(() => next(cursor)) - .catch(done) - - agent - .use(traces => { - expect(traces[0][0].metrics).to.have.property('mongodb.cursor.index', 1) - }) - .then(done) - .catch(done) - - server.insert(`test.${collection}`, [{ a: 1 }, { a: 2 }], {}, () => {}) }) it('should sanitize the query as the resource', done => { diff --git a/packages/datadog-plugin-mongodb-core/test/mongodb.spec.js b/packages/datadog-plugin-mongodb-core/test/mongodb.spec.js new file mode 100644 index 0000000000..e093a6efed --- /dev/null +++ b/packages/datadog-plugin-mongodb-core/test/mongodb.spec.js @@ -0,0 +1,219 @@ +'use strict' + +const agent = require('../../dd-trace/test/plugins/agent') +const plugin = require('../src') + +wrapIt() + +const withTopologies = fn => { + withVersions(plugin, 'mongodb', (version, moduleName) => { + describe('using the server topology', () => { + fn(async () => { + const { MongoClient } = require(`../../../versions/${moduleName}@${version}`).get() + const client = new MongoClient('mongodb://localhost:27017', { useUnifiedTopology: false }) + + await client.connect() + + return client + }) + }) + + describe('using the unified topology', () => { + fn(async () => { + const { MongoClient, Server } = require(`../../../versions/${moduleName}@${version}`).get() + const server = new Server('localhost', 27017, { reconnect: false }) + const client = new MongoClient(server, { useUnifiedTopology: true }) + + await client.connect() + + return client + }) + }) + }) +} + +describe('Plugin', () => { + let client + let id + let tracer + let collectionName + let collection + let db + + describe('mongodb-core', () => { + withTopologies(createClient => { + beforeEach(() => { + id = require('../../dd-trace/src/id') + tracer = require('../../dd-trace') + + collectionName = id().toString() + }) + + afterEach(() => { + return client.close() + }) + + describe('without configuration', () => { + before(() => { + return agent.load('mongodb-core') + }) + + after(() => { + return agent.close() + }) + + beforeEach(async () => { + client = await createClient() + db = client.db('test') + collection = db.collection(collectionName) + }) + + describe('server', () => { + it('should do automatic instrumentation', done => { + agent + .use(traces => { + const span = traces[0][0] + const resource = `insert test.${collectionName}` + + expect(span).to.have.property('name', 'mongodb.query') + expect(span).to.have.property('service', 'test-mongodb') + expect(span).to.have.property('resource', resource) + expect(span).to.have.property('type', 'mongodb') + expect(span.meta).to.have.property('db.name', `test.${collectionName}`) + expect(span.meta).to.have.property('out.host', 'localhost') + }) + .then(done) + .catch(done) + + collection.insertOne({ a: 1 }, {}, () => {}) + }) + + it('should use the correct resource name for arbitrary commands', done => { + agent + .use(traces => { + const span = traces[0][0] + const resource = `planCacheListPlans test.$cmd {}` + + expect(span).to.have.property('resource', resource) + }) + .then(done) + .catch(done) + + db.command({ + planCacheListPlans: `test.${collectionName}`, + query: {} + }, () => {}) + }) + + it('should sanitize the query', done => { + agent + .use(traces => { + const span = traces[0][0] + const query = '{"foo":"?","bar":{"baz":"?"}}' + const resource = `find test.${collectionName} ${query}` + + expect(span).to.have.property('resource', resource) + expect(span.meta).to.have.property('mongodb.query', query) + }) + .then(done) + .catch(done) + + collection.find({ + foo: 1, + bar: { + baz: [1, 2, 3] + } + }).toArray() + }) + + it('should sanitize buffers as values and not as objects', done => { + agent + .use(traces => { + const span = traces[0][0] + const resource = `find test.${collectionName} {"_id":"?"}` + + expect(span).to.have.property('resource', resource) + }) + .then(done) + .catch(done) + + collection.find({ + _id: Buffer.from('1234') + }).toArray() + }) + + it('should sanitize BSON as values and not as objects', done => { + const BSON = require(`../../../versions/bson@4.0.0`).get() + + agent + .use(traces => { + const span = traces[0][0] + const resource = `find test.${collectionName} {"_id":"?"}` + + expect(span).to.have.property('resource', resource) + }) + .then(done) + .catch(done) + + collection.find({ + _id: new BSON.ObjectID('123456781234567812345678') + }).toArray() + }) + + it('should skip functions when sanitizing', done => { + agent + .use(traces => { + const span = traces[0][0] + const resource = `find test.${collectionName} {"_id":"?"}` + + expect(span).to.have.property('resource', resource) + }) + .then(done) + .catch(done) + + collection.find({ + _id: '1234', + foo: () => {} + }).toArray() + }) + + it('should run the callback in the parent context', done => { + if (process.env.DD_CONTEXT_PROPAGATION === 'false') return done() + + collection.insertOne({ a: 1 }, {}, () => { + expect(tracer.scope().active()).to.be.null + done() + }) + }) + }) + }) + + describe('with configuration', () => { + before(() => { + return agent.load('mongodb-core', { service: 'custom' }) + }) + + after(() => { + return agent.close() + }) + + beforeEach(async () => { + client = await createClient() + db = client.db('test') + collection = db.collection(collectionName) + }) + + it('should be configured with the correct values', done => { + agent + .use(traces => { + expect(traces[0][0]).to.have.property('service', 'custom') + }) + .then(done) + .catch(done) + + collection.insertOne({ a: 1 }, () => {}) + }) + }) + }) + }) +})