Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
306d6cd
feat: add producer-side batch message handling with span linking
nina9753 Nov 6, 2025
083e5aa
feat: add ack context map and producer improvements for batching
nina9753 Nov 14, 2025
7c8cda6
fix: resolve linting errors in google-cloud-pubsub.js
nina9753 Nov 18, 2025
10eb97c
fix: remove trailing whitespace in client.js
nina9753 Nov 18, 2025
7172f88
Merge branch 'nina.rei/SVLS-7168/gcp-push-pubsub-plugin' into nina.re…
nina9753 Nov 19, 2025
1394f6c
fix comments
nina9753 Nov 19, 2025
9a40f0b
Merge branch 'master' into nina.rei/SVLS-7168/gcp-push-pubsub-plugin
nina9753 Nov 19, 2025
a67bc15
Merge branch 'nina.rei/SVLS-7168/gcp-push-pubsub-plugin' into nina.re…
nina9753 Nov 19, 2025
9322651
feat: add span linking from delivery span to pubsub.request
nina9753 Nov 6, 2025
44ea80c
feat: check for pubsub.delivery span in AsyncLocalStorage before extr…
nina9753 Nov 6, 2025
50cd907
feat: add span linking and batch metadata to pull-based consumer
nina9753 Nov 6, 2025
5cdaa94
feat: add comprehensive span linking for consumer and push subscriptions
nina9753 Nov 14, 2025
c2edec8
Remove comments
nina9753 Nov 18, 2025
0371fac
run npm lint
nina9753 Nov 19, 2025
065ad3e
fix test
nina9753 Nov 19, 2025
7505ad0
fix test
nina9753 Nov 19, 2025
6fdb1c8
Merge branch 'nina.rei/SVLS-7168/gcp-pubsub-batch-plugin' into nina.r…
nina9753 Nov 20, 2025
cc98bec
fix test
nina9753 Nov 20, 2025
6697655
fix index.js test
nina9753 Nov 20, 2025
3dd7fec
fix index.js test
nina9753 Nov 20, 2025
03d725d
fix context error
nina9753 Nov 20, 2025
5002a29
fix lint
nina9753 Nov 20, 2025
b1c71d4
fix lint
nina9753 Nov 20, 2025
9fc3bd8
fix lint
nina9753 Nov 20, 2025
81e643a
fix index.js test
nina9753 Nov 20, 2025
615d2c8
fix plugin
nina9753 Nov 20, 2025
db9e235
add ctx
nina9753 Nov 21, 2025
4d44ec0
test the index
nina9753 Nov 21, 2025
488bfc2
update consumer subscription bindstart
nina9753 Nov 21, 2025
a440473
update addhook subscription
nina9753 Nov 21, 2025
4609bcd
add debug logs
nina9753 Nov 21, 2025
0ba390e
Fix consumer span context loss with WeakMap and add comprehensive deb…
nina9753 Nov 21, 2025
8b41a6b
Fix double finish
nina9753 Nov 21, 2025
de1d319
Fix consumer span type: explicitly set _type='worker' after startSpan
nina9753 Nov 21, 2025
6f6820a
Fix: use span.setTag('span.type', 'worker') instead of span._type
nina9753 Nov 21, 2025
1f3b25d
Debug: Add type in startSpan options and via setTag, improve logging
nina9753 Nov 21, 2025
47cb8f6
Fix: Use runStores for _dispense and publish for remove with improved…
nina9753 Nov 21, 2025
fb73e20
add debug logs
nina9753 Nov 21, 2025
5912702
Fix: Guarantee test harness loads instrumentation before requiring @g…
nina9753 Nov 21, 2025
a8338c3
update subscription
nina9753 Nov 21, 2025
f2fa333
update subscription
nina9753 Nov 24, 2025
6e10d50
test fix for index.spec.js timeout
nina9753 Nov 28, 2025
c9fc172
fix test logic
nina9753 Nov 29, 2025
82085be
fix linter
nina9753 Nov 29, 2025
19962dd
fix linter
nina9753 Nov 29, 2025
a1fd1eb
remove comments
nina9753 Nov 30, 2025
c9ad616
feat: add producer-side batch message handling with span linking
nina9753 Nov 6, 2025
25afa6b
feat: add ack context map and producer improvements for batching
nina9753 Nov 14, 2025
83796c6
fix: resolve linting errors in google-cloud-pubsub.js
nina9753 Nov 18, 2025
2513640
fix: remove trailing whitespace in client.js
nina9753 Nov 18, 2025
ed3cc6d
fix comments
nina9753 Nov 19, 2025
725d47d
Merge branch 'nina.rei/SVLS-7168/gcp-pubsub-batch-plugin' into nina.r…
nina9753 Dec 1, 2025
00a08ce
Merge nina.rei/SVLS-7168/gcp-pubsub-batch-plugin into nina.rei/SVLS-7…
nina9753 Dec 9, 2025
87829c4
update from review
nina9753 Dec 9, 2025
3021780
addional cleanup
nina9753 Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 65 additions & 25 deletions packages/datadog-instrumentations/src/google-cloud-pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,25 @@ const receiveStartCh = channel('apm:google-cloud-pubsub:receive:start')
const receiveFinishCh = channel('apm:google-cloud-pubsub:receive:finish')
const receiveErrorCh = channel('apm:google-cloud-pubsub:receive:error')

// Bounded map to prevent memory leaks from acks that never complete
const ackContextMap = new Map()
const ACK_CONTEXT_MAX_SIZE = 10_000
const ACK_CONTEXT_TTL_MS = 600_000 // 10 minutes - matches Cloud Run streaming pull default deadline

// Cleanup old entries periodically
const ackContextCleanupInterval = setInterval(() => {
const now = Date.now()
for (const [ackId, entry] of ackContextMap.entries()) {
if (now - entry.timestamp > ACK_CONTEXT_TTL_MS) {
ackContextMap.delete(ackId)
}
}
}, 60_000) // Run cleanup every 60 seconds

// Allow process to exit cleanly
if (ackContextCleanupInterval.unref) {
ackContextCleanupInterval.unref()
}

const publisherMethods = [
'createTopic',
Expand Down Expand Up @@ -88,18 +106,16 @@ function wrapMethod (method) {
if (isAckOperation && request && request.ackIds && request.ackIds.length > 0) {
// Try to find a stored context for any of these ack IDs
for (const ackId of request.ackIds) {
const storedContext = ackContextMap.get(ackId)
if (storedContext) {
restoredStore = storedContext
const entry = ackContextMap.get(ackId)
if (entry) {
restoredStore = entry.context
break
}
}

if (api === 'acknowledge') {
request.ackIds.forEach(ackId => {
if (ackContextMap.has(ackId)) {
ackContextMap.delete(ackId)
}
ackContextMap.delete(ackId)
})
}
}
Expand Down Expand Up @@ -214,7 +230,19 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/su
const storeWithSpanContext = { ...currentStore, span: activeSpan }

if (this.ackId) {
ackContextMap.set(this.ackId, storeWithSpanContext)
// Enforce max size to prevent unbounded growth
if (ackContextMap.size >= ACK_CONTEXT_MAX_SIZE) {
// Remove oldest entry (first entry in Map iteration order)
const firstKey = ackContextMap.keys().next().value
if (firstKey !== undefined) {
ackContextMap.delete(firstKey)
}
}

ackContextMap.set(this.ackId, {
context: storeWithSpanContext,
timestamp: Date.now()
})
}
}

Expand All @@ -227,24 +255,36 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/su

addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/lease-manager.js' }, (obj) => {
const LeaseManager = obj.LeaseManager
const ctx = {}
if (!LeaseManager) {
return obj
}

const messageContexts = new WeakMap()

shimmer.wrap(LeaseManager.prototype, '_dispense', dispense => function (message) {
if (receiveStartCh.hasSubscribers) {
ctx.message = message
return receiveStartCh.runStores(ctx, dispense, this, ...arguments)
}
return dispense.apply(this, arguments)
const ctx = { message }
messageContexts.set(message, ctx)

return receiveStartCh.runStores(ctx, dispense, this, ...arguments)
})

shimmer.wrap(LeaseManager.prototype, 'remove', remove => function (message) {
const ctx = messageContexts.get(message) || { message }
messageContexts.delete(message)

return receiveFinishCh.runStores(ctx, remove, this, ...arguments)
})

shimmer.wrap(LeaseManager.prototype, 'clear', clear => function () {
for (const message of this._messages) {
ctx.message = message
receiveFinishCh.publish(ctx)
// Finish spans for all messages still in the lease before clearing
if (this._messages) {
for (const message of this._messages.values()) {
const ctx = messageContexts.get(message)
if (ctx) {
receiveFinishCh.publish(ctx)
messageContexts.delete(message)
}
}
}
return clear.apply(this, arguments)
})
Expand Down Expand Up @@ -275,19 +315,19 @@ function injectTraceContext (attributes, pubsub, topicName) {
addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'] }, (obj) => {
if (!obj.Topic?.prototype) return obj

// Wrap Topic.publishMessage (modern API)
if (obj.Topic.prototype.publishMessage) {
shimmer.wrap(obj.Topic.prototype, 'publishMessage', publishMessage => function (data) {
if (data && typeof data === 'object') {
if (!data.attributes) data.attributes = {}
injectTraceContext(data.attributes, this.pubsub, this.name)
if (typeof obj.Topic.prototype.publishMessage === 'function') {
shimmer.wrap(obj.Topic.prototype, 'publishMessage', publishMessage => {
return function (data, attributesOrCallback, callback) {
if (data && typeof data === 'object') {
if (!data.attributes) data.attributes = {}
injectTraceContext(data.attributes, this.pubsub, this.name)
}
return publishMessage.apply(this, arguments)
}
return publishMessage.apply(this, arguments)
})
}

// Wrap Topic.publish (legacy API)
if (obj.Topic.prototype.publish) {
if (typeof obj.Topic.prototype.publish === 'function') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you find that sometimes obj.Topic.prototype.publish was a truthy value but wasn't a function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I have not found evidence that obj.Topic.prototype.publish is ever a truthy non-function value in practice.
The typeof obj.Topic.prototype.publish === 'function' check is following the pattern that's used throughout the dd-trace-js codebase for instrumentation you can see other examples in the same file that i did not add myself

shimmer.wrap(obj.Topic.prototype, 'publish', publish => function (buffer, attributesOrCallback, callback) {
if (typeof attributesOrCallback === 'function' || !attributesOrCallback) {
arguments[1] = {}
Expand Down
156 changes: 144 additions & 12 deletions packages/datadog-plugin-google-cloud-pubsub/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,161 @@

const { getMessageSize } = require('../../dd-trace/src/datastreams')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const SpanContext = require('../../dd-trace/src/opentracing/span_context')
const id = require('../../dd-trace/src/id')

class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
static id = 'google-cloud-pubsub'
static operation = 'receive'

_reconstructPubSubRequestContext (attrs) {
const traceIdLower = attrs['_dd.pubsub_request.trace_id']
const spanId = attrs['_dd.pubsub_request.span_id']
const traceIdUpper = attrs['_dd.p.tid']

if (!traceIdLower || !spanId) return null

try {
const traceId128 = traceIdUpper ? traceIdUpper + traceIdLower : traceIdLower.padStart(32, '0')
const traceId = id(traceId128, 16)
const parentId = id(spanId, 16)

const tags = {}
if (traceIdUpper) tags['_dd.p.tid'] = traceIdUpper

return new SpanContext({
traceId,
spanId: parentId,
tags
})
} catch {
return null
}
}

bindStart (ctx) {
const { message } = ctx
const subscription = message._subscriber._subscription
const topic = subscription.metadata && subscription.metadata.topic
const childOf = this.tracer.extract('text_map', message.attributes) || null
const topic = (subscription.metadata && subscription.metadata.topic) ||
(message.attributes && message.attributes['pubsub.topic']) ||
(message.attributes && message.attributes['gcloud.project_id']
? `projects/${message.attributes['gcloud.project_id']}/topics/unknown`
: null)

const batchRequestTraceId = message.attributes?.['_dd.pubsub_request.trace_id']
const batchRequestSpanId = message.attributes?.['_dd.pubsub_request.span_id']
const batchSize = message.attributes?.['_dd.batch.size']
const batchIndex = message.attributes?.['_dd.batch.index']

let childOf = this.tracer.extract('text_map', message.attributes) || null

const isFirstMessage = batchIndex === '0' || batchIndex === 0
if (isFirstMessage && batchRequestSpanId) {
const pubsubRequestContext = this._reconstructPubSubRequestContext(message.attributes)
if (pubsubRequestContext) {
childOf = pubsubRequestContext
}
}

const topicName = topic ? topic.split('/').pop() : subscription.name.split('/').pop()
const baseService = this.tracer._service || 'unknown'
const serviceName = this.config.service || `${baseService}-pubsub`
const meta = {
'gcloud.project_id': subscription.pubsub.projectId,
'pubsub.topic': topic,
'span.kind': 'consumer',
'pubsub.delivery_method': 'pull',
'pubsub.span_type': 'message_processing',
'messaging.operation': 'receive',
'_dd.base_service': this.tracer._service,
'_dd.serviceoverride.type': 'custom'
}

if (batchRequestTraceId) {
meta['pubsub.batch.request_trace_id'] = batchRequestTraceId
}
if (batchRequestSpanId) {
meta['pubsub.batch.request_span_id'] = batchRequestSpanId
// Also add span link metadata
meta['_dd.pubsub_request.trace_id'] = batchRequestTraceId
meta['_dd.pubsub_request.span_id'] = batchRequestSpanId
if (batchRequestTraceId && batchRequestSpanId) {
// Use JSON format like producer for proper span link parsing
meta['_dd.span_links'] = JSON.stringify([{
trace_id: batchRequestTraceId,
span_id: batchRequestSpanId,
flags: 0
}])
}
}

const metrics = {
'pubsub.ack': 0
}

if (batchSize) {
metrics['pubsub.batch.message_count'] = Number.parseInt(batchSize, 10)
metrics['pubsub.batch.size'] = Number.parseInt(batchSize, 10)
}
if (batchIndex !== undefined) {
metrics['pubsub.batch.message_index'] = Number.parseInt(batchIndex, 10)
metrics['pubsub.batch.index'] = Number.parseInt(batchIndex, 10)
}

// Add batch description
if (batchSize && batchIndex !== undefined) {
const index = Number.parseInt(batchIndex, 10)
const size = Number.parseInt(batchSize, 10)
meta['pubsub.batch.description'] = `Message ${index + 1} of ${size}`
}

const span = this.startSpan({
childOf,
resource: topic,
resource: `Message from ${topicName}`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like in the previous PR this one also looks like a breaking change.

Copy link
Author

@nina9753 nina9753 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shouldn't be any users for this currently, but I can always revert to what is in prod. I wanted to make the spans more descriptive for the blog post and product release.

type: 'worker',
meta: {
'gcloud.project_id': subscription.pubsub.projectId,
'pubsub.topic': topic
},
metrics: {
'pubsub.ack': 0
}
service: serviceName,
meta,
metrics
}, ctx)

if (message.id) {
span.setTag('pubsub.message_id', message.id)
}
if (message.publishTime) {
span.setTag('pubsub.publish_time', message.publishTime.toISOString())
}

if (message.attributes) {
const publishStartTime = message.attributes['x-dd-publish-start-time']
if (publishStartTime) {
const deliveryDuration = Date.now() - Number.parseInt(publishStartTime, 10)
span.setTag('pubsub.delivery_duration_ms', deliveryDuration)
}

const pubsubRequestTraceId = message.attributes['_dd.pubsub_request.trace_id']
const pubsubRequestSpanId = message.attributes['_dd.pubsub_request.span_id']
const batchSize = message.attributes['_dd.batch.size']
const batchIndex = message.attributes['_dd.batch.index']

if (pubsubRequestTraceId && pubsubRequestSpanId) {
span.setTag('_dd.pubsub_request.trace_id', pubsubRequestTraceId)
span.setTag('_dd.pubsub_request.span_id', pubsubRequestSpanId)
// Use JSON format like producer for proper span link parsing
span.setTag('_dd.span_links', JSON.stringify([{
trace_id: pubsubRequestTraceId,
span_id: pubsubRequestSpanId,
flags: 0
}]))
}

if (batchSize) {
span.setTag('pubsub.batch.size', Number.parseInt(batchSize, 10))
}
if (batchIndex) {
span.setTag('pubsub.batch.index', Number.parseInt(batchIndex, 10))
}
}

if (this.config.dsmEnabled && message?.attributes) {
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.attributes)
Expand All @@ -38,14 +169,15 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {

bindFinish (ctx) {
const { message } = ctx
const span = ctx.currentStore.span
const span = ctx.currentStore?.span

if (!span) return ctx.parentStore

if (message?._handled) {
span.setTag('pubsub.ack', 1)
}

super.finish()

return ctx.parentStore
}
}
Expand Down
Loading
Loading