Skip to content

Commit e760b3b

Browse files
jcstorms1watson
andauthored
fix: Azure Service Bus batching memory leak (#6917)
* initial * fix issues with esm * clean up * Update packages/datadog-plugin-azure-service-bus/src/producer.js Co-authored-by: Thomas Watson <thomas.watson@datadoghq.com> * Update packages/datadog-instrumentations/src/azure-service-bus.js Co-authored-by: Thomas Watson <thomas.watson@datadoghq.com> * Update packages/datadog-plugin-azure-service-bus/src/producer.js Co-authored-by: Thomas Watson <thomas.watson@datadoghq.com> --------- Co-authored-by: Thomas Watson <thomas.watson@datadoghq.com>
1 parent 16081de commit e760b3b

File tree

2 files changed

+57
-41
lines changed

2 files changed

+57
-41
lines changed

packages/datadog-instrumentations/src/azure-service-bus.js

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,57 +8,64 @@ const shimmer = require('../../datadog-shimmer')
88
const dc = require('dc-polyfill')
99

1010
const producerCh = dc.tracingChannel('apm:azure-service-bus:send')
11+
const isItDefault = new WeakSet()
1112

12-
addHook({ name: '@azure/service-bus', versions: ['>=7.9.2'], patchDefault: false }, (obj) => {
13+
addHook({ name: '@azure/service-bus', versions: ['>=7.9.2'] }, (obj) => {
1314
const ServiceBusClient = obj.ServiceBusClient
14-
let didItShim = false
1515
shimmer.wrap(ServiceBusClient.prototype, 'createSender',
1616
createSender => function (queueOrTopicName) {
1717
const sender = createSender.apply(this, arguments)
18-
if (didItShim) return sender
1918
const senderPrototype = sender.constructor.prototype
2019
const senderSenderPrototype = sender._sender.constructor.prototype
21-
shimmer.wrap(senderPrototype, 'scheduleMessages', scheduleMessages =>
22-
function (msg, scheduledEnqueueTimeUtc) {
23-
const functionName = scheduleMessages.name
20+
21+
if (!isItDefault.has(senderPrototype)) {
22+
isItDefault.add(senderPrototype)
23+
24+
shimmer.wrap(senderPrototype, 'scheduleMessages', scheduleMessages =>
25+
function (msg, scheduledEnqueueTimeUtc) {
26+
const functionName = scheduleMessages.name
27+
const config = this._context.config
28+
const entityPath = this._entityPath
29+
return producerCh.tracePromise(
30+
scheduleMessages,
31+
{ config, entityPath, functionName, msg, scheduledEnqueueTimeUtc },
32+
this, ...arguments
33+
)
34+
})
35+
36+
shimmer.wrap(senderPrototype, 'createMessageBatch', createMessageBatch => async function () {
37+
const batch = await createMessageBatch.apply(this, arguments)
38+
shimmer.wrap(batch, 'tryAddMessage', tryAddMessage => function (msg) {
39+
const functionName = tryAddMessage.name
40+
const config = this._context.config
41+
return producerCh.tracePromise(
42+
tryAddMessage, { config, functionName, batch, msg }, this, ...arguments)
43+
})
44+
return batch
45+
})
46+
}
47+
48+
if (!isItDefault.has(senderSenderPrototype)) {
49+
isItDefault.add(senderSenderPrototype)
50+
51+
shimmer.wrap(senderSenderPrototype, 'send', send => function (msg) {
52+
const functionName = send.name
2453
const config = this._context.config
25-
const entityPath = this._entityPath
54+
const entityPath = this.entityPath
2655
return producerCh.tracePromise(
27-
scheduleMessages,
28-
{ config, entityPath, functionName, msg, scheduledEnqueueTimeUtc },
29-
this, ...arguments
56+
send, { config, entityPath, functionName, msg }, this, ...arguments
3057
)
3158
})
3259

33-
shimmer.wrap(senderPrototype, 'createMessageBatch', createMessageBatch => async function () {
34-
const batch = await createMessageBatch.apply(this, arguments)
35-
shimmer.wrap(batch.constructor.prototype, 'tryAddMessage', tryAddMessage => function (msg) {
36-
const functionName = tryAddMessage.name
60+
shimmer.wrap(senderSenderPrototype, 'sendBatch', sendBatch => function (msg) {
61+
const functionName = sendBatch.name
3762
const config = this._context.config
63+
const entityPath = this.entityPath
3864
return producerCh.tracePromise(
39-
tryAddMessage, { config, functionName, batch, msg }, this, ...arguments)
65+
sendBatch, { config, entityPath, functionName, msg }, this, ...arguments
66+
)
4067
})
41-
return batch
42-
})
43-
44-
shimmer.wrap(senderSenderPrototype, 'send', send => function (msg) {
45-
const functionName = send.name
46-
const config = this._context.config
47-
const entityPath = this.entityPath
48-
return producerCh.tracePromise(
49-
send, { config, entityPath, functionName, msg }, this, ...arguments
50-
)
51-
})
52-
53-
shimmer.wrap(senderSenderPrototype, 'sendBatch', sendBatch => function (msg) {
54-
const functionName = sendBatch.name
55-
const config = this._context.config
56-
const entityPath = this.entityPath
57-
return producerCh.tracePromise(
58-
sendBatch, { config, entityPath, functionName, msg }, this, ...arguments
59-
)
60-
})
61-
didItShim = true
68+
}
6269
return sender
6370
})
6471
return obj

packages/datadog-plugin-azure-service-bus/src/producer.js

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
const { getEnvironmentVariable } = require('../../dd-trace/src/config-helper')
44
const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
5+
const spanContexts = new WeakMap()
56

67
class AzureServiceBusProducerPlugin extends ProducerPlugin {
78
static get id () { return 'azure-service-bus' }
@@ -36,7 +37,12 @@ class AzureServiceBusProducerPlugin extends ProducerPlugin {
3637
}
3738

3839
if (batchLinksAreEnabled()) {
39-
ctx.batch._spanContexts.push(span.context())
40+
const spanContext = spanContexts.get(ctx.batch)
41+
if (spanContext) {
42+
spanContext.push(span.context())
43+
} else {
44+
spanContexts.set(ctx.batch, [span.context()])
45+
}
4046
injectTraceContext(this.tracer, span, ctx.msg)
4147
}
4248
}
@@ -47,9 +53,12 @@ class AzureServiceBusProducerPlugin extends ProducerPlugin {
4753
if (isBatch) {
4854
span.setTag('messaging.batch.message_count', messages.count)
4955
if (batchLinksAreEnabled()) {
50-
messages._spanContexts.forEach(spanContext => {
51-
span.addLink(spanContext)
52-
})
56+
const contexts = spanContexts.get(messages)
57+
if (contexts) {
58+
for (const spanContext of contexts) {
59+
span.addLink(spanContext)
60+
}
61+
}
5362
}
5463
} else if (Array.isArray(messages)) {
5564
span.setTag('messaging.batch.message_count', messages.length)
@@ -64,7 +73,7 @@ class AzureServiceBusProducerPlugin extends ProducerPlugin {
6473
}
6574

6675
asyncEnd (ctx) {
67-
super.finish()
76+
super.finish(ctx)
6877
}
6978
}
7079

0 commit comments

Comments
 (0)