diff --git a/packages/datadog-instrumentations/src/google-cloud-pubsub.js b/packages/datadog-instrumentations/src/google-cloud-pubsub.js index 848c4b47565..d900da71002 100644 --- a/packages/datadog-instrumentations/src/google-cloud-pubsub.js +++ b/packages/datadog-instrumentations/src/google-cloud-pubsub.js @@ -30,7 +30,25 @@ const receiveStartCh = channel('apm:google-cloud-pubsub:receive:start') const receiveFinishCh = channel('apm:google-cloud-pubsub:receive:finish') const receiveErrorCh = channel('apm:google-cloud-pubsub:receive:error') +// Bounded map to prevent memory leaks from acks that never complete const ackContextMap = new Map() +const ACK_CONTEXT_MAX_SIZE = 10_000 +const ACK_CONTEXT_TTL_MS = 600_000 // 10 minutes - matches Cloud Run streaming pull default deadline + +// Cleanup old entries periodically +const ackContextCleanupInterval = setInterval(() => { + const now = Date.now() + for (const [ackId, entry] of ackContextMap.entries()) { + if (now - entry.timestamp > ACK_CONTEXT_TTL_MS) { + ackContextMap.delete(ackId) + } + } +}, 60_000) // Run cleanup every 60 seconds + +// Allow process to exit cleanly +if (ackContextCleanupInterval.unref) { + ackContextCleanupInterval.unref() +} const publisherMethods = [ 'createTopic', @@ -88,18 +106,16 @@ function wrapMethod (method) { if (isAckOperation && request && request.ackIds && request.ackIds.length > 0) { // Try to find a stored context for any of these ack IDs for (const ackId of request.ackIds) { - const storedContext = ackContextMap.get(ackId) - if (storedContext) { - restoredStore = storedContext + const entry = ackContextMap.get(ackId) + if (entry) { + restoredStore = entry.context break } } if (api === 'acknowledge') { request.ackIds.forEach(ackId => { - if (ackContextMap.has(ackId)) { - ackContextMap.delete(ackId) - } + ackContextMap.delete(ackId) }) } } @@ -214,7 +230,19 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/su const storeWithSpanContext = { ...currentStore, span: activeSpan } if (this.ackId) { - ackContextMap.set(this.ackId, storeWithSpanContext) + // Enforce max size to prevent unbounded growth + if (ackContextMap.size >= ACK_CONTEXT_MAX_SIZE) { + // Remove oldest entry (first entry in Map iteration order) + const firstKey = ackContextMap.keys().next().value + if (firstKey !== undefined) { + ackContextMap.delete(firstKey) + } + } + + ackContextMap.set(this.ackId, { + context: storeWithSpanContext, + timestamp: Date.now() + }) } } @@ -227,24 +255,36 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/su addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/lease-manager.js' }, (obj) => { const LeaseManager = obj.LeaseManager - const ctx = {} + if (!LeaseManager) { + return obj + } + + const messageContexts = new WeakMap() shimmer.wrap(LeaseManager.prototype, '_dispense', dispense => function (message) { - if (receiveStartCh.hasSubscribers) { - ctx.message = message - return receiveStartCh.runStores(ctx, dispense, this, ...arguments) - } - return dispense.apply(this, arguments) + const ctx = { message } + messageContexts.set(message, ctx) + + return receiveStartCh.runStores(ctx, dispense, this, ...arguments) }) shimmer.wrap(LeaseManager.prototype, 'remove', remove => function (message) { + const ctx = messageContexts.get(message) || { message } + messageContexts.delete(message) + return receiveFinishCh.runStores(ctx, remove, this, ...arguments) }) shimmer.wrap(LeaseManager.prototype, 'clear', clear => function () { - for (const message of this._messages) { - ctx.message = message - receiveFinishCh.publish(ctx) + // Finish spans for all messages still in the lease before clearing + if (this._messages) { + for (const message of this._messages.values()) { + const ctx = messageContexts.get(message) + if (ctx) { + receiveFinishCh.publish(ctx) + messageContexts.delete(message) + } + } } return clear.apply(this, arguments) }) @@ -275,19 +315,19 @@ function injectTraceContext (attributes, pubsub, topicName) { addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'] }, (obj) => { if (!obj.Topic?.prototype) return obj - // Wrap Topic.publishMessage (modern API) - if (obj.Topic.prototype.publishMessage) { - shimmer.wrap(obj.Topic.prototype, 'publishMessage', publishMessage => function (data) { - if (data && typeof data === 'object') { - if (!data.attributes) data.attributes = {} - injectTraceContext(data.attributes, this.pubsub, this.name) + if (typeof obj.Topic.prototype.publishMessage === 'function') { + shimmer.wrap(obj.Topic.prototype, 'publishMessage', publishMessage => { + return function (data, attributesOrCallback, callback) { + if (data && typeof data === 'object') { + if (!data.attributes) data.attributes = {} + injectTraceContext(data.attributes, this.pubsub, this.name) + } + return publishMessage.apply(this, arguments) } - return publishMessage.apply(this, arguments) }) } - // Wrap Topic.publish (legacy API) - if (obj.Topic.prototype.publish) { + if (typeof obj.Topic.prototype.publish === 'function') { shimmer.wrap(obj.Topic.prototype, 'publish', publish => function (buffer, attributesOrCallback, callback) { if (typeof attributesOrCallback === 'function' || !attributesOrCallback) { arguments[1] = {} diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js index af46088a16a..46701e6d135 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js @@ -2,30 +2,161 @@ const { getMessageSize } = require('../../dd-trace/src/datastreams') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') +const SpanContext = require('../../dd-trace/src/opentracing/span_context') +const id = require('../../dd-trace/src/id') class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { static id = 'google-cloud-pubsub' static operation = 'receive' + _reconstructPubSubRequestContext (attrs) { + const traceIdLower = attrs['_dd.pubsub_request.trace_id'] + const spanId = attrs['_dd.pubsub_request.span_id'] + const traceIdUpper = attrs['_dd.p.tid'] + + if (!traceIdLower || !spanId) return null + + try { + const traceId128 = traceIdUpper ? traceIdUpper + traceIdLower : traceIdLower.padStart(32, '0') + const traceId = id(traceId128, 16) + const parentId = id(spanId, 16) + + const tags = {} + if (traceIdUpper) tags['_dd.p.tid'] = traceIdUpper + + return new SpanContext({ + traceId, + spanId: parentId, + tags + }) + } catch { + return null + } + } + bindStart (ctx) { const { message } = ctx const subscription = message._subscriber._subscription - const topic = subscription.metadata && subscription.metadata.topic - const childOf = this.tracer.extract('text_map', message.attributes) || null + const topic = (subscription.metadata && subscription.metadata.topic) || + (message.attributes && message.attributes['pubsub.topic']) || + (message.attributes && message.attributes['gcloud.project_id'] + ? `projects/${message.attributes['gcloud.project_id']}/topics/unknown` + : null) + + const batchRequestTraceId = message.attributes?.['_dd.pubsub_request.trace_id'] + const batchRequestSpanId = message.attributes?.['_dd.pubsub_request.span_id'] + const batchSize = message.attributes?.['_dd.batch.size'] + const batchIndex = message.attributes?.['_dd.batch.index'] + + let childOf = this.tracer.extract('text_map', message.attributes) || null + + const isFirstMessage = batchIndex === '0' || batchIndex === 0 + if (isFirstMessage && batchRequestSpanId) { + const pubsubRequestContext = this._reconstructPubSubRequestContext(message.attributes) + if (pubsubRequestContext) { + childOf = pubsubRequestContext + } + } + + const topicName = topic ? topic.split('/').pop() : subscription.name.split('/').pop() + const baseService = this.tracer._service || 'unknown' + const serviceName = this.config.service || `${baseService}-pubsub` + const meta = { + 'gcloud.project_id': subscription.pubsub.projectId, + 'pubsub.topic': topic, + 'span.kind': 'consumer', + 'pubsub.delivery_method': 'pull', + 'pubsub.span_type': 'message_processing', + 'messaging.operation': 'receive', + '_dd.base_service': this.tracer._service, + '_dd.serviceoverride.type': 'custom' + } + + if (batchRequestTraceId) { + meta['pubsub.batch.request_trace_id'] = batchRequestTraceId + } + if (batchRequestSpanId) { + meta['pubsub.batch.request_span_id'] = batchRequestSpanId + // Also add span link metadata + meta['_dd.pubsub_request.trace_id'] = batchRequestTraceId + meta['_dd.pubsub_request.span_id'] = batchRequestSpanId + if (batchRequestTraceId && batchRequestSpanId) { + // Use JSON format like producer for proper span link parsing + meta['_dd.span_links'] = JSON.stringify([{ + trace_id: batchRequestTraceId, + span_id: batchRequestSpanId, + flags: 0 + }]) + } + } + + const metrics = { + 'pubsub.ack': 0 + } + + if (batchSize) { + metrics['pubsub.batch.message_count'] = Number.parseInt(batchSize, 10) + metrics['pubsub.batch.size'] = Number.parseInt(batchSize, 10) + } + if (batchIndex !== undefined) { + metrics['pubsub.batch.message_index'] = Number.parseInt(batchIndex, 10) + metrics['pubsub.batch.index'] = Number.parseInt(batchIndex, 10) + } + + // Add batch description + if (batchSize && batchIndex !== undefined) { + const index = Number.parseInt(batchIndex, 10) + const size = Number.parseInt(batchSize, 10) + meta['pubsub.batch.description'] = `Message ${index + 1} of ${size}` + } const span = this.startSpan({ childOf, - resource: topic, + resource: `Message from ${topicName}`, type: 'worker', - meta: { - 'gcloud.project_id': subscription.pubsub.projectId, - 'pubsub.topic': topic - }, - metrics: { - 'pubsub.ack': 0 - } + service: serviceName, + meta, + metrics }, ctx) + if (message.id) { + span.setTag('pubsub.message_id', message.id) + } + if (message.publishTime) { + span.setTag('pubsub.publish_time', message.publishTime.toISOString()) + } + + if (message.attributes) { + const publishStartTime = message.attributes['x-dd-publish-start-time'] + if (publishStartTime) { + const deliveryDuration = Date.now() - Number.parseInt(publishStartTime, 10) + span.setTag('pubsub.delivery_duration_ms', deliveryDuration) + } + + const pubsubRequestTraceId = message.attributes['_dd.pubsub_request.trace_id'] + const pubsubRequestSpanId = message.attributes['_dd.pubsub_request.span_id'] + const batchSize = message.attributes['_dd.batch.size'] + const batchIndex = message.attributes['_dd.batch.index'] + + if (pubsubRequestTraceId && pubsubRequestSpanId) { + span.setTag('_dd.pubsub_request.trace_id', pubsubRequestTraceId) + span.setTag('_dd.pubsub_request.span_id', pubsubRequestSpanId) + // Use JSON format like producer for proper span link parsing + span.setTag('_dd.span_links', JSON.stringify([{ + trace_id: pubsubRequestTraceId, + span_id: pubsubRequestSpanId, + flags: 0 + }])) + } + + if (batchSize) { + span.setTag('pubsub.batch.size', Number.parseInt(batchSize, 10)) + } + if (batchIndex) { + span.setTag('pubsub.batch.index', Number.parseInt(batchIndex, 10)) + } + } + if (this.config.dsmEnabled && message?.attributes) { const payloadSize = getMessageSize(message) this.tracer.decodeDataStreamsContext(message.attributes) @@ -38,14 +169,15 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { bindFinish (ctx) { const { message } = ctx - const span = ctx.currentStore.span + const span = ctx.currentStore?.span + + if (!span) return ctx.parentStore if (message?._handled) { span.setTag('pubsub.ack', 1) } super.finish() - return ctx.parentStore } } diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription.js b/packages/datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription.js index 56ff2a93688..f258d875d51 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription.js @@ -1,6 +1,8 @@ 'use strict' const TracingPlugin = require('../../dd-trace/src/plugins/tracing') +const SpanContext = require('../../dd-trace/src/opentracing/span_context') +const id = require('../../dd-trace/src/id') const log = require('../../dd-trace/src/log') const { storage } = require('../../datadog-core') const { channel } = require('../../datadog-instrumentations/src/helpers/instrument') @@ -11,8 +13,7 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin { constructor (...args) { super(...args) - // Subscribe to HTTP start channel to intercept PubSub requests - // We run BEFORE HTTP plugin to set delivery span as active parent + const startCh = channel('apm:http:server:request:start') startCh.subscribe(({ req, res }) => { this._handlePubSubRequest({ req, res }) @@ -22,16 +23,18 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin { _handlePubSubRequest ({ req, res }) { const userAgent = req.headers['user-agent'] || '' if (req.method !== 'POST' || !userAgent.includes('APIs-Google')) return - // Check for unwrapped Pub/Sub format (--push-no-wrapper-write-metadata) + if (req.headers['x-goog-pubsub-message-id']) { - log.debug('[PubSub] Detected unwrapped Pub/Sub push subscription') + log.warn('[PubSub] Detected unwrapped Pub/Sub format (push subscription)') + log.warn(`[PubSub] message-id: ${req.headers['x-goog-pubsub-message-id']}`) this._createDeliverySpanAndActivate({ req, res }) - } else { - log.warn( - '[PubSub] No x-goog-pubsub-* headers detected. pubsub.delivery spans will not be created. ' + - 'Add --push-no-wrapper-write-metadata to your subscription.' - ) + return } + + log.warn( + '[PubSub] No x-goog-pubsub-* headers detected. pubsub.delivery spans will not be created. ' + + 'Add --push-no-wrapper-write-metadata to your subscription.' + ) } _createDeliverySpanAndActivate ({ req, res }) { @@ -40,8 +43,19 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin { if (!tracer || !tracer._tracer) return - const parentContext = tracer._tracer.extract('text_map', messageData.attrs) || undefined - const deliverySpan = this._createDeliverySpan(messageData, parentContext, tracer) + const originalContext = this._extractContext(messageData, tracer) + const pubsubRequestContext = this._reconstructPubSubContext(messageData.attrs) || originalContext + + const isSameTrace = originalContext && pubsubRequestContext && + originalContext.toTraceId() === pubsubRequestContext.toTraceId() + + const deliverySpan = this._createDeliverySpan( + messageData, + isSameTrace ? pubsubRequestContext : originalContext, + isSameTrace ? null : pubsubRequestContext, + tracer + ) + const finishDelivery = () => { if (!deliverySpan.finished) { deliverySpan.finish() @@ -70,7 +84,36 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin { return { message, subscription, attrs: req.headers, topicName } } - _createDeliverySpan (messageData, parentContext, tracer) { + _extractContext (messageData, tracer) { + return tracer._tracer.extract('text_map', messageData.attrs) || undefined + } + + _reconstructPubSubContext (attrs) { + const traceIdLower = attrs['_dd.pubsub_request.trace_id'] + const spanId = attrs['_dd.pubsub_request.span_id'] + const traceIdUpper = attrs['_dd.pubsub_request.p.tid'] + + if (!traceIdLower || !spanId) return null + + try { + const traceId128 = traceIdUpper ? traceIdUpper + traceIdLower : traceIdLower.padStart(32, '0') + const traceId = id(traceId128, 16) + const parentId = id(spanId, 16) + + const tags = {} + if (traceIdUpper) tags['_dd.p.tid'] = traceIdUpper + + return new SpanContext({ + traceId, + spanId: parentId, + tags + }) + } catch { + return null + } + } + + _createDeliverySpan (messageData, parentContext, linkContext, tracer) { const { message, subscription, topicName, attrs } = messageData const subscriptionName = subscription.split('/').pop() || subscription const publishStartTime = attrs['x-dd-publish-start-time'] @@ -97,6 +140,16 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin { }) this._addBatchMetadata(span, attrs) + + if (linkContext) { + if (typeof span.addLink === 'function') { + span.addLink(linkContext, {}) + } else { + span._links = span._links || [] + span._links.push({ context: linkContext, attributes: {} }) + } + } + return span } diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js index 39a42e26806..54548782500 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js @@ -1,19 +1,19 @@ 'use strict' -const assert = require('node:assert/strict') - const { expect } = require('chai') -const { after, afterEach, before, beforeEach, describe, it } = require('mocha') -const sinon = require('sinon') +const { describe, it, beforeEach, afterEach, before, after } = require('mocha') -const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') -const id = require('../../dd-trace/src/id') +const { withNamingSchema, withVersions } = require('../../dd-trace/test/setup/mocha') const agent = require('../../dd-trace/test/plugins/agent') const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/helpers') -const { withNamingSchema, withVersions } = require('../../dd-trace/test/setup/mocha') +const id = require('../../dd-trace/src/id') +const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') + const { expectedSchema, rawExpectedSchema } = require('./naming') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') + +// The roundtrip to the pubsub emulator takes time. Sometimes a *long* time. const TIMEOUT = 30000 const dsmTopicName = 'dsm-topic' @@ -50,7 +50,7 @@ describe('Plugin', () => { describe('without configuration', () => { beforeEach(() => { - return agent.load('google-cloud-pubsub', { dsmEnabled: false }) + return agent.load('google-cloud-pubsub', { dsmEnabled: false }, { flushMinSpans: 1 }) }) beforeEach(() => { @@ -119,10 +119,18 @@ describe('Plugin', () => { component: 'google-cloud-pubsub' } }) - const publisher = new v1.PublisherClient({ projectId: project }) + const publisher = new v1.PublisherClient({ + projectId: project, + grpc: gax.grpc, + servicePath: 'localhost', + port: 8081, + sslCreds: gax.grpc.credentials.createInsecure() + }, gax) const name = `projects/${project}/topics/${topicName}` try { + // This should fail because the topic already exists or similar error await publisher.createTopic({ name }) + await publisher.createTopic({ name }) // Try to create twice to force error } catch (e) { // this is just to prevent mocha from crashing } @@ -133,7 +141,7 @@ describe('Plugin', () => { const firstSpan = tracer.scope().active() return pubsub.createTopic(topicName) .then(() => { - assert.strictEqual(tracer.scope().active(), firstSpan) + expect(tracer.scope().active()).to.equal(firstSpan) }) }) }) @@ -162,7 +170,7 @@ describe('Plugin', () => { publish(topic, { data: Buffer.from('hello') }) ) .then(() => { - assert.strictEqual(tracer.scope().active(), firstSpan) + expect(tracer.scope().active()).to.equal(firstSpan) }) }) @@ -201,24 +209,24 @@ describe('Plugin', () => { const expectedSpanPromise = expectSpanWithDefaults({ name: expectedSchema.receive.opName, service: expectedSchema.receive.serviceName, - meta: { 'span.kind': 'consumer' } + type: 'worker', + meta: { + component: 'google-cloud-pubsub', + 'span.kind': 'consumer', + 'pubsub.topic': resource + } }) const [topic] = await pubsub.createTopic(topicName) const [sub] = await topic.createSubscription('foo') - const rxPromise = new Promise((resolve, reject) => { - sub.on('message', msg => { - const receiverSpanContext = tracer.scope().active()._spanContext - try { - assert.ok(typeof receiverSpanContext._parentId === 'object' && receiverSpanContext._parentId !== null) - resolve() - msg.ack() - } catch (e) { - reject(e) - } - }) + sub.on('message', msg => { + const activeSpan = tracer.scope().active() + if (activeSpan) { + const receiverSpanContext = activeSpan._spanContext + expect(receiverSpanContext._parentId).to.be.an('object') + } + msg.ack() }) await publish(topic, { data: Buffer.from('hello') }) - await rxPromise return expectedSpanPromise }) @@ -227,12 +235,15 @@ describe('Plugin', () => { const expectedSpanPromise = expectSpanWithDefaults({ name: expectedSchema.receive.opName, service: expectedSchema.receive.serviceName, + type: 'worker', error: 1, meta: { [ERROR_MESSAGE]: error.message, [ERROR_TYPE]: error.name, [ERROR_STACK]: error.stack, - component: 'google-cloud-pubsub' + component: 'google-cloud-pubsub', + 'span.kind': 'consumer', + 'pubsub.topic': resource } }) const [topic] = await pubsub.createTopic(topicName) @@ -247,7 +258,7 @@ describe('Plugin', () => { err = e } finally { if (name === 'message') { - assert.strictEqual(err, error) + expect(err).to.equal(error) } } } @@ -263,13 +274,31 @@ describe('Plugin', () => { }) withNamingSchema( - async () => { + async (config) => { const [topic] = await pubsub.createTopic(topicName) const [sub] = await topic.createSubscription('foo') sub.on('message', msg => msg.ack()) await publish(topic, { data: Buffer.from('hello') }) }, - rawExpectedSchema.receive + rawExpectedSchema.receive, + { + // Custom selectSpan: look through all traces for a consumer span + // (withNamingSchema will check the name matches expected opName) + selectSpan: (traces) => { + // Flatten all spans from all traces + for (const trace of traces) { + for (const span of trace) { + // Return the first worker-type span (consumer span) + if (span.type === 'worker') { + return span + } + } + } + // If no worker span found, return undefined to trigger retry + // (withNamingSchema's assertSomeTraces will keep waiting) + return undefined + } + } ) }) @@ -392,7 +421,7 @@ describe('Plugin', () => { }) } }) - assert.ok(statsPointsReceived >= 1) + expect(statsPointsReceived).to.be.at.least(1) expect(agent.dsmStatsExist(agent, expectedProducerHash.readBigUInt64BE(0).toString())).to.equal(true) }, { timeoutMs: TIMEOUT }) }) @@ -410,46 +439,25 @@ describe('Plugin', () => { }) } }) - assert.ok(statsPointsReceived >= 2) + expect(statsPointsReceived).to.be.at.least(2) expect(agent.dsmStatsExist(agent, expectedConsumerHash.readBigUInt64BE(0).toString())).to.equal(true) }, { timeoutMs: TIMEOUT }) }) }) }) - - describe('it should set a message payload size', () => { - let recordCheckpointSpy - - beforeEach(() => { - recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - }) - - afterEach(() => { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - }) - - it('when producing a message', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM produce payload size') }) - assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - }) - - it('when consuming a message', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM consume payload size') }) - - await consume(async () => { - assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - }) - }) - }) }) function expectSpanWithDefaults (expected) { let prefixedResource - const method = expected.meta['pubsub.method'] + const method = expected.meta?.['pubsub.method'] + const spanKind = expected.meta?.['span.kind'] if (method === 'publish') { // For publish operations, use the new format: "publish to Topic " prefixedResource = `${method} to Topic ${topicName}` + } else if (spanKind === 'consumer') { + // For consumer operations, use the new format: "Message from " + prefixedResource = `Message from ${topicName}` } else if (method) { // For other operations, use the old format: " " prefixedResource = `${method} ${resource}` @@ -457,9 +465,19 @@ describe('Plugin', () => { prefixedResource = resource } + // Determine the default operation name based on span kind + let defaultOpName = 'pubsub.receive' + if (spanKind === 'consumer') { + defaultOpName = expectedSchema.receive.opName + } else if (spanKind === 'producer') { + defaultOpName = expectedSchema.send.opName + } else { + defaultOpName = expectedSchema.controlPlane.opName + } + const service = method ? 'test-pubsub' : 'test' expected = withDefaults({ - name: 'pubsub.request', + name: defaultOpName, resource: prefixedResource, service, error: 0, @@ -468,7 +486,8 @@ describe('Plugin', () => { 'gcloud.project_id': project } }, expected) - return expectSomeSpan(agent, expected, { timeoutMs: TIMEOUT }) + + return expectSomeSpan(agent, expected, TIMEOUT) } }) }) diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/naming.js b/packages/datadog-plugin-google-cloud-pubsub/test/naming.js index b03e300f346..d10f801b8d1 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/naming.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/naming.js @@ -16,11 +16,11 @@ const rawExpectedSchema = { receive: { v0: { opName: 'pubsub.receive', - serviceName: 'test' + serviceName: 'test-pubsub' }, v1: { opName: 'gcp.pubsub.process', - serviceName: 'test' + serviceName: 'test-pubsub' } }, controlPlane: { diff --git a/packages/dd-trace/src/plugins/util/web.js b/packages/dd-trace/src/plugins/util/web.js index a1af749bb2a..4bc8d96bf86 100644 --- a/packages/dd-trace/src/plugins/util/web.js +++ b/packages/dd-trace/src/plugins/util/web.js @@ -274,11 +274,14 @@ const web = { return context.middleware.at(-1) }, - // Extract the parent span from the headers and start a new span as its child startChildSpan (tracer, config, name, req, traceCtx) { const headers = req.headers const reqCtx = contexts.get(req) - let childOf = tracer.extract(FORMAT_HTTP_HEADERS, headers) + const { storage } = require('../../../../datadog-core') + const store = storage('legacy').getStore() + const deliverySpan = store?.span?._name === 'pubsub.delivery' ? store.span : null + + let childOf = deliverySpan || tracer.extract(FORMAT_HTTP_HEADERS, headers) // we may have headers signaling a router proxy span should be created (such as for AWS API Gateway) if (tracer._config?.inferredProxyServicesEnabled) {