Skip to content

Commit 31a2890

Browse files
authored
fix(ws): wrap event remove handler for message event removal (#7052)
* fix(ws): wrap event remove handler for message event removal The remove handler has to understand what method was originally wrapped and remove the wrapped method. Otherwise it would still exist and be triggered. Fixes: #7025 * fixup! * chore: simplify code and add TODO comments The instrumentation should be improved further for error handling and preventing multiple message listeners creating individual spans.
1 parent a897a20 commit 31a2890

File tree

4 files changed

+97
-41
lines changed

4 files changed

+97
-41
lines changed

packages/datadog-instrumentations/src/find-my-way.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ function wrapOn (on) {
99
return function onWithTrace (method, path, opts) {
1010
const index = typeof opts === 'function' ? 2 : 3
1111
const handler = arguments[index]
12-
const wrapper = shimmer.wrapFunction(handler, handler => function (req) {
13-
routeChannel.publish({ req, route: path })
14-
15-
return handler.apply(this, arguments)
16-
})
1712

1813
if (typeof handler === 'function') {
14+
const wrapper = shimmer.wrapFunction(handler, handler => function (req) {
15+
routeChannel.publish({ req, route: path })
16+
17+
return handler.apply(this, arguments)
18+
})
1919
arguments[index] = wrapper
2020
}
2121

@@ -24,6 +24,7 @@ function wrapOn (on) {
2424
}
2525

2626
addHook({ name: 'find-my-way', versions: ['>=1'] }, Router => {
27+
// No need to wrap the off method as it would not contain the handler function.
2728
shimmer.wrap(Router.prototype, 'on', wrapOn)
2829

2930
return Router

packages/datadog-instrumentations/src/ws.js

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ const producerCh = tracingChannel('ws:send')
1212
const receiverCh = tracingChannel('ws:receive')
1313
const closeCh = tracingChannel('ws:close')
1414
const emitCh = channel('tracing:ws:server:connect:emit')
15+
// TODO: Add a error channel / handle error events properly.
16+
17+
const eventHandlerMap = new WeakMap()
1518

1619
function wrapHandleUpgrade (handleUpgrade) {
1720
return function () {
@@ -67,24 +70,37 @@ function createWrapEmit (emit) {
6770
}
6871

6972
function createWrappedHandler (handler) {
70-
return function wrappedMessageHandler (data, binary) {
73+
return shimmer.wrapFunction(handler, originalHandler => function (data, binary) {
7174
const byteLength = dataLength(data)
7275

7376
const ctx = { data, binary, socket: this._sender?._socket, byteLength }
7477

75-
return receiverCh.traceSync(handler, ctx, this, data, binary)
76-
}
78+
return receiverCh.traceSync(originalHandler, ctx, this, data, binary)
79+
})
7780
}
7881

7982
function wrapListener (originalOn) {
8083
return function (eventName, handler) {
8184
if (eventName === 'message') {
82-
return originalOn.call(this, eventName, createWrappedHandler(handler))
85+
// Prevent multiple wrapping of the same handler in case the user adds the listener multiple times
86+
const wrappedHandler = eventHandlerMap.get(handler) ?? createWrappedHandler(handler)
87+
eventHandlerMap.set(handler, wrappedHandler)
88+
return originalOn.call(this, eventName, wrappedHandler)
8389
}
8490
return originalOn.apply(this, arguments)
8591
}
8692
}
8793

94+
function removeListener (originalOff) {
95+
return function (eventName, handler) {
96+
if (eventName === 'message') {
97+
const wrappedHandler = eventHandlerMap.get(handler)
98+
return originalOff.call(this, eventName, wrappedHandler)
99+
}
100+
return originalOff.apply(this, arguments)
101+
}
102+
}
103+
88104
function wrapClose (close) {
89105
return function (code, data) {
90106
// _closeFrameReceived is set to true when receiver receives a close frame from a peer
@@ -115,22 +131,24 @@ addHook({
115131
versions: ['>=8.0.0']
116132
}, ws => {
117133
shimmer.wrap(ws.prototype, 'send', wrapSend)
118-
shimmer.wrap(ws.prototype, 'on', wrapListener)
119134
shimmer.wrap(ws.prototype, 'close', wrapClose)
135+
136+
// TODO: Do not wrap these methods. Instead, add a listener to the websocket instance when one is created.
137+
// That way it avoids producing too many spans for the same websocket instance and less user code is impacted.
138+
shimmer.wrap(ws.prototype, 'on', wrapListener)
139+
shimmer.wrap(ws.prototype, 'addListener', wrapListener)
140+
shimmer.wrap(ws.prototype, 'off', removeListener)
141+
shimmer.wrap(ws.prototype, 'removeListener', removeListener)
142+
120143
return ws
121144
})
122145

123-
function detectType (data) {
124-
if (typeof Blob !== 'undefined' && data instanceof Blob) return 'Blob'
125-
if (typeof Buffer !== 'undefined' && Buffer.isBuffer(data)) return 'Buffer'
126-
if (typeof data === 'string') return 'string'
127-
return 'Unknown'
128-
}
129-
130146
function dataLength (data) {
131-
const type = detectType(data)
132-
if (type === 'Blob') return data.size
133-
if (type === 'Buffer') return data.length
134-
if (type === 'string') return Buffer.byteLength(data)
135-
return 0
147+
if (typeof data === 'string') {
148+
return Buffer.byteLength(data)
149+
}
150+
if (data instanceof Blob) {
151+
return data.size
152+
}
153+
return data?.length ?? 0
136154
}

packages/datadog-plugin-ws/test/index.spec.js

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,34 @@ describe('Plugin', () => {
6565
agent.close({ ritmReset: false, wipe: true })
6666
})
6767

68-
it('should do automatic instrumentation', () => {
68+
it('should do automatic instrumentation and remove broken handler', () => {
6969
wsServer.on('connection', (ws) => {
7070
connectionReceived = true
7171
ws.send('test message')
7272
})
7373

74-
client.on('message', (msg) => {
74+
const brokenHandler = () => {
75+
throw new Error('broken handler')
76+
}
77+
78+
client.on('message', brokenHandler)
79+
80+
client.addListener('message', (msg) => {
7581
assert.strictEqual(msg.toString(), 'test message')
7682
})
7783

78-
return agent.assertSomeTraces(traces => {
79-
assert.strictEqual(traces[0][0].name, 'web.request')
84+
client.off('message', brokenHandler)
85+
86+
return agent.assertFirstTraceSpan({
87+
name: 'websocket.send',
88+
type: 'websocket',
89+
resource: `websocket /${route}`,
90+
service: 'some',
91+
parent_id: 0n,
92+
error: 0,
93+
meta: {
94+
'span.kind': 'producer',
95+
}
8096
})
8197
})
8298

@@ -103,7 +119,7 @@ describe('Plugin', () => {
103119
client.on('error', done)
104120
})
105121

106-
it('should instrument message sending', done => {
122+
it('should instrument message sending and not double wrap the same handler', done => {
107123
wsServer.on('connection', ws => {
108124
connectionReceived = true
109125
ws.on('message', msg => {
@@ -116,32 +132,49 @@ describe('Plugin', () => {
116132
client.send('test message')
117133
})
118134

119-
client.on('message', (data) => {
135+
const brokenHandler = () => {
136+
throw new Error('broken handler')
137+
}
138+
139+
client.on('message', brokenHandler)
140+
141+
const handler = (data) => {
120142
assert.strictEqual(data.toString(), 'test message')
121143
done()
122-
})
144+
}
145+
146+
client.addListener('message', handler)
147+
client.on('message', handler)
148+
149+
const handlers = client.listeners('message')
150+
151+
assert.strictEqual(handlers[0].name, brokenHandler.name)
152+
assert.strictEqual(handlers[1], handlers[2])
153+
154+
client.removeListener('message', brokenHandler)
155+
client.removeListener('message', handler)
123156

124157
client.on('error', done)
125158
})
126159

127-
it('should instrument message receiving', done => {
160+
it('should instrument message receiving', () => {
128161
wsServer.on('connection', (ws) => {
129162
ws.on('message', (data) => {
130163
assert.strictEqual(data.toString(), 'test message from client')
131164
})
132165
})
133-
agent.assertSomeTraces(traces => {
134-
assert.strictEqual(traces[0][0].name, 'websocket.receive')
135-
assert.strictEqual(traces[0][0].resource, `websocket /${route}`)
136-
})
137-
.then(done)
138-
.catch(done)
139166

140167
client.on('open', () => {
141168
client.send('test message from client')
142169
})
143170

144-
client.on('error', done)
171+
return Promise.race([
172+
once(client, 'error'),
173+
agent.assertFirstTraceSpan({
174+
name: 'websocket.receive',
175+
resource: `websocket /${route}`
176+
})
177+
])
145178
})
146179

147180
it('should instrument connection close', () => {

packages/dd-trace/test/plugins/agent.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ const http = require('http')
55
const path = require('path')
66
const util = require('util')
77

8-
const msgpack = require('@msgpack/msgpack')
98
const bodyParser = require('body-parser')
109
const express = require('express')
10+
const msgpack = require('@msgpack/msgpack')
1111
const proxyquire = require('proxyquire')
12+
const semifies = require('semifies')
1213

1314
const { assertObjectContains } = require('../../../../integration-tests/helpers')
1415
const { storage } = require('../../../datadog-core')
@@ -338,7 +339,7 @@ function runCallbackAgainstTraces (callback, options = {}, handlers) {
338339
enumerable: true
339340
})
340341
// Hack for the information to be fully visible.
341-
error.message = util.inspect(error)
342+
error.message = util.inspect(error, { depth: null })
342343
reject(error)
343344
}
344345
}, options.timeoutMs || 1000)
@@ -583,8 +584,11 @@ module.exports = {
583584
try {
584585
assertObjectContains(traces[0][0], callbackOrExpected)
585586
} catch (error) {
586-
// eslint-disable-next-line no-console
587-
console.error('Expected span %o did not match traces:\n%o', callbackOrExpected, traces)
587+
// Enrich error with actual and expected traces for Node.js < 22.17.0
588+
if (semifies(process.version, '<22.17.0')) {
589+
error.actualTraces = util.inspect(traces, { depth: null })
590+
error.expectedTraces = util.inspect(callbackOrExpected, { depth: null })
591+
}
588592
throw error
589593
}
590594
} else {

0 commit comments

Comments
 (0)