diff --git a/.oxlintrc.base.json b/.oxlintrc.base.json index 82f9f4ac5d50..3026ebb81447 100644 --- a/.oxlintrc.base.json +++ b/.oxlintrc.base.json @@ -147,7 +147,8 @@ "**/integrations/fs/vendored/**/*.ts", "**/integrations/tracing/knex/vendored/**/*.ts", "**/integrations/tracing/mongo/vendored/**/*.ts", - "**/integrations/tracing/connect/vendored/**/*.ts" + "**/integrations/tracing/connect/vendored/**/*.ts", + "**/integrations/tracing/amqplib/vendored/**/*.ts" ], "rules": { "typescript/no-explicit-any": "off" diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/docker-compose.yml b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/docker-compose.yml new file mode 100644 index 000000000000..94b878fa4b62 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/docker-compose.yml @@ -0,0 +1,22 @@ +version: '3' + +services: + rabbitmq: + image: rabbitmq:management + container_name: rabbitmq-v1 + environment: + - RABBITMQ_DEFAULT_USER=sentry + - RABBITMQ_DEFAULT_PASS=sentry + ports: + - '5673:5672' + - '15673:15672' + healthcheck: + test: ['CMD-SHELL', 'rabbitmq-diagnostics -q ping'] + interval: 2s + timeout: 10s + retries: 30 + start_period: 15s + +networks: + default: + driver: bridge diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/instrument.mjs b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/instrument.mjs new file mode 100644 index 000000000000..46a27dd03b74 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/instrument.mjs @@ -0,0 +1,9 @@ +import * as Sentry from '@sentry/node'; +import { loggingTransport } from '@sentry-internal/node-integration-tests'; + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + release: '1.0', + tracesSampleRate: 1.0, + transport: loggingTransport, +}); diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/package.json b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/package.json new file mode 100644 index 000000000000..ed4b3f1c72bc --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/package.json @@ -0,0 +1,7 @@ +{ + "name": "sentry-amqplib-v1-test", + "version": "1.0.0", + "dependencies": { + "amqplib": "^1.0.0" + } +} diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/scenario.mjs b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/scenario.mjs new file mode 100644 index 000000000000..35b76d382d94 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/scenario.mjs @@ -0,0 +1,72 @@ +import * as Sentry from '@sentry/node'; +import amqp from 'amqplib'; + +const queueName = 'queue1'; +const amqpUsername = 'sentry'; +const amqpPassword = 'sentry'; + +const AMQP_URL = `amqp://${amqpUsername}:${amqpPassword}@localhost:5673/`; +const ACKNOWLEDGEMENT = { noAck: false }; + +const QUEUE_OPTIONS = { + durable: true, // Make the queue durable + exclusive: false, // Not exclusive + autoDelete: false, // Don't auto-delete the queue + arguments: { + 'x-message-ttl': 30000, // Message TTL of 30 seconds + 'x-max-length': 1000, // Maximum queue length of 1000 messages + }, +}; + +(async () => { + const { connection, channel } = await connectToRabbitMQ(); + await createQueue(queueName, channel); + + const consumeMessagePromise = consumeMessageFromQueue(queueName, channel); + + await Sentry.startSpan({ name: 'root span' }, async () => { + sendMessageToQueue(queueName, channel, JSON.stringify({ foo: 'bar01' })); + }); + + await consumeMessagePromise; + + await channel.close(); + await connection.close(); +})(); + +async function connectToRabbitMQ() { + const connection = await amqp.connect(AMQP_URL); + const channel = await connection.createChannel(); + return { connection, channel }; +} + +async function createQueue(queueName, channel) { + await channel.assertQueue(queueName, QUEUE_OPTIONS); +} + +function sendMessageToQueue(queueName, channel, message) { + channel.sendToQueue(queueName, Buffer.from(message)); +} + +async function consumer(queueName, channel) { + return new Promise((resolve, reject) => { + channel + .consume( + queueName, + message => { + if (message) { + channel.ack(message); + resolve(); + } else { + reject(new Error('No message received')); + } + }, + ACKNOWLEDGEMENT, + ) + .catch(reject); + }); +} + +async function consumeMessageFromQueue(queueName, channel) { + await consumer(queueName, channel); +} diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/test.ts b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/test.ts new file mode 100644 index 000000000000..d70b44832596 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib-v1/test.ts @@ -0,0 +1,55 @@ +import type { TransactionEvent } from '@sentry/core'; +import { afterAll, describe, expect } from 'vitest'; +import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner'; + +const EXPECTED_MESSAGE_SPAN_PRODUCER = expect.objectContaining({ + op: 'message', + data: expect.objectContaining({ + 'messaging.system': 'rabbitmq', + 'otel.kind': 'PRODUCER', + 'sentry.op': 'message', + 'sentry.origin': 'auto.amqplib.otel.publisher', + }), + status: 'ok', +}); + +const EXPECTED_MESSAGE_SPAN_CONSUMER = expect.objectContaining({ + op: 'message', + data: expect.objectContaining({ + 'messaging.system': 'rabbitmq', + 'otel.kind': 'CONSUMER', + 'sentry.op': 'message', + 'sentry.origin': 'auto.amqplib.otel.consumer', + }), + status: 'ok', +}); + +describe('amqplib v1 auto-instrumentation', () => { + afterAll(async () => { + cleanupChildProcesses(); + }); + + createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createTestRunner, test) => { + test('should be able to send and receive messages with amqplib v1', { timeout: 60_000 }, async () => { + await createTestRunner() + .withDockerCompose({ + workingDirectory: [__dirname], + }) + .expect({ + transaction: (transaction: TransactionEvent) => { + expect(transaction.transaction).toEqual('root span'); + expect(transaction.spans?.length).toEqual(1); + expect(transaction.spans![0]).toMatchObject(EXPECTED_MESSAGE_SPAN_PRODUCER); + }, + }) + .expect({ + transaction: (transaction: TransactionEvent) => { + expect(transaction.transaction).toEqual('queue1 process'); + expect(transaction.contexts?.trace).toMatchObject(EXPECTED_MESSAGE_SPAN_CONSUMER); + }, + }) + .start() + .completed(); + }); + }); +}); diff --git a/packages/node/package.json b/packages/node/package.json index e8a57e00a78d..0bd6ded5f53a 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -68,7 +68,6 @@ "@opentelemetry/api": "^1.9.1", "@opentelemetry/core": "^2.6.1", "@opentelemetry/instrumentation": "^0.214.0", - "@opentelemetry/instrumentation-amqplib": "0.61.0", "@opentelemetry/instrumentation-graphql": "0.62.0", "@opentelemetry/instrumentation-hapi": "0.60.0", "@opentelemetry/instrumentation-http": "0.214.0", diff --git a/packages/node/src/integrations/tracing/amqplib.ts b/packages/node/src/integrations/tracing/amqplib/index.ts similarity index 90% rename from packages/node/src/integrations/tracing/amqplib.ts rename to packages/node/src/integrations/tracing/amqplib/index.ts index 7b4ac259add5..a9a1446ce4ee 100644 --- a/packages/node/src/integrations/tracing/amqplib.ts +++ b/packages/node/src/integrations/tracing/amqplib/index.ts @@ -1,5 +1,6 @@ import type { Span } from '@opentelemetry/api'; -import { AmqplibInstrumentation, type AmqplibInstrumentationConfig } from '@opentelemetry/instrumentation-amqplib'; +import { AmqplibInstrumentation } from './vendored/amqplib'; +import type { AmqplibInstrumentationConfig } from './vendored/types'; import type { IntegrationFn } from '@sentry/core'; import { defineIntegration } from '@sentry/core'; import { addOriginToSpan, generateInstrumentOnce } from '@sentry/node-core'; diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/amqplib-types.ts b/packages/node/src/integrations/tracing/amqplib/vendored/amqplib-types.ts new file mode 100644 index 000000000000..71a03a7e998d --- /dev/null +++ b/packages/node/src/integrations/tracing/amqplib/vendored/amqplib-types.ts @@ -0,0 +1,47 @@ +/* + * Simplified types inlined from @types/amqplib (DefinitelyTyped). + * Only includes members accessed by this instrumentation. + * Other amqplib types (Message, ConsumeMessage, Options.Publish, etc.) are already + * vendored in types.ts by the upstream OTel instrumentation. + */ + +export interface Connection { + connection: { serverProperties: { product?: string; [key: string]: any } }; + [key: string]: any; +} + +export interface Channel { + connection: Connection; + [key: string]: any; +} + +export interface ConfirmChannel extends Channel {} + +export namespace Options { + export interface Connect { + protocol?: string; + hostname?: string; + port?: number; + username?: string; + vhost?: string; + } + export interface Consume { + consumerTag?: string; + noLocal?: boolean; + noAck?: boolean; + exclusive?: boolean; + priority?: number; + arguments?: any; + } + export interface Publish { + headers?: any; + [key: string]: any; + } +} + +export namespace Replies { + export interface Empty {} + export interface Consume { + consumerTag: string; + } +} diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/amqplib.ts b/packages/node/src/integrations/tracing/amqplib/vendored/amqplib.ts new file mode 100644 index 000000000000..655e7492e5bf --- /dev/null +++ b/packages/node/src/integrations/tracing/amqplib/vendored/amqplib.ts @@ -0,0 +1,632 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NOTICE from the Sentry authors: + * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib + * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 + */ +/* eslint-disable */ + +import { + context, + diag, + propagation, + trace, + Span, + SpanKind, + SpanStatusCode, + ROOT_CONTEXT, + Link, + Context, +} from '@opentelemetry/api'; +import { hrTime, hrTimeDuration, hrTimeToMilliseconds } from '@opentelemetry/core'; +import { + InstrumentationBase, + InstrumentationNodeModuleDefinition, + InstrumentationNodeModuleFile, + isWrapped, + safeExecuteInTheMiddle, + SemconvStability, + semconvStabilityFromStr, +} from '@opentelemetry/instrumentation'; +import { ATTR_MESSAGING_OPERATION } from './semconv'; +import { + ATTR_MESSAGING_DESTINATION, + ATTR_MESSAGING_DESTINATION_KIND, + ATTR_MESSAGING_RABBITMQ_ROUTING_KEY, + MESSAGING_DESTINATION_KIND_VALUE_TOPIC, + MESSAGING_OPERATION_VALUE_PROCESS, + OLD_ATTR_MESSAGING_MESSAGE_ID, + ATTR_MESSAGING_CONVERSATION_ID, +} from './semconv-obsolete'; +import type { Connection, Options, Replies } from './amqplib-types'; +import { AmqplibInstrumentationConfig, DEFAULT_CONFIG, EndOperation, type ConsumeMessage, type Message } from './types'; +import { + CHANNEL_CONSUME_TIMEOUT_TIMER, + CHANNEL_SPANS_NOT_ENDED, + CONNECTION_ATTRIBUTES, + getConnectionAttributesFromServer, + getConnectionAttributesFromUrl, + InstrumentationConnection, + InstrumentationConsumeChannel, + InstrumentationConsumeMessage, + InstrumentationMessage, + InstrumentationPublishChannel, + isConfirmChannelTracing, + markConfirmChannelTracing, + MESSAGE_STORED_SPAN, + normalizeExchange, + unmarkConfirmChannelTracing, +} from './utils'; + +import { SDK_VERSION } from '@sentry/core'; + +const PACKAGE_NAME = '@sentry/instrumentation-amqplib'; +const supportedVersions = ['>=0.5.5 <2']; + +export class AmqplibInstrumentation extends InstrumentationBase { + private _netSemconvStability!: SemconvStability; + + constructor(config: AmqplibInstrumentationConfig = {}) { + super(PACKAGE_NAME, SDK_VERSION, { ...DEFAULT_CONFIG, ...config }); + this._setSemconvStabilityFromEnv(); + } + + // Used for testing. + private _setSemconvStabilityFromEnv() { + this._netSemconvStability = semconvStabilityFromStr('http', process.env.OTEL_SEMCONV_STABILITY_OPT_IN); + } + + override setConfig(config: AmqplibInstrumentationConfig = {}) { + super.setConfig({ ...DEFAULT_CONFIG, ...config }); + } + + protected init() { + const channelModelModuleFile = new InstrumentationNodeModuleFile( + 'amqplib/lib/channel_model.js', + supportedVersions, + this.patchChannelModel.bind(this), + this.unpatchChannelModel.bind(this), + ); + + const callbackModelModuleFile = new InstrumentationNodeModuleFile( + 'amqplib/lib/callback_model.js', + supportedVersions, + this.patchChannelModel.bind(this), + this.unpatchChannelModel.bind(this), + ); + + const connectModuleFile = new InstrumentationNodeModuleFile( + 'amqplib/lib/connect.js', + supportedVersions, + this.patchConnect.bind(this), + this.unpatchConnect.bind(this), + ); + + const module = new InstrumentationNodeModuleDefinition('amqplib', supportedVersions, undefined, undefined, [ + channelModelModuleFile, + connectModuleFile, + callbackModelModuleFile, + ]); + return module; + } + + private patchConnect(moduleExports: any) { + moduleExports = this.unpatchConnect(moduleExports); + if (!isWrapped(moduleExports.connect)) { + this._wrap(moduleExports, 'connect', this.getConnectPatch.bind(this)); + } + return moduleExports; + } + + private unpatchConnect(moduleExports: any) { + if (isWrapped(moduleExports.connect)) { + this._unwrap(moduleExports, 'connect'); + } + return moduleExports; + } + + private patchChannelModel(moduleExports: any, moduleVersion: string | undefined) { + if (!isWrapped(moduleExports.Channel.prototype.publish)) { + this._wrap(moduleExports.Channel.prototype, 'publish', this.getPublishPatch.bind(this, moduleVersion)); + } + if (!isWrapped(moduleExports.Channel.prototype.consume)) { + this._wrap(moduleExports.Channel.prototype, 'consume', this.getConsumePatch.bind(this, moduleVersion)); + } + if (!isWrapped(moduleExports.Channel.prototype.ack)) { + this._wrap(moduleExports.Channel.prototype, 'ack', this.getAckPatch.bind(this, false, EndOperation.Ack)); + } + if (!isWrapped(moduleExports.Channel.prototype.nack)) { + this._wrap(moduleExports.Channel.prototype, 'nack', this.getAckPatch.bind(this, true, EndOperation.Nack)); + } + if (!isWrapped(moduleExports.Channel.prototype.reject)) { + this._wrap(moduleExports.Channel.prototype, 'reject', this.getAckPatch.bind(this, true, EndOperation.Reject)); + } + if (!isWrapped(moduleExports.Channel.prototype.ackAll)) { + this._wrap(moduleExports.Channel.prototype, 'ackAll', this.getAckAllPatch.bind(this, false, EndOperation.AckAll)); + } + if (!isWrapped(moduleExports.Channel.prototype.nackAll)) { + this._wrap( + moduleExports.Channel.prototype, + 'nackAll', + this.getAckAllPatch.bind(this, true, EndOperation.NackAll), + ); + } + if (!isWrapped(moduleExports.Channel.prototype.emit)) { + this._wrap(moduleExports.Channel.prototype, 'emit', this.getChannelEmitPatch.bind(this)); + } + if (!isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { + this._wrap( + moduleExports.ConfirmChannel.prototype, + 'publish', + this.getConfirmedPublishPatch.bind(this, moduleVersion), + ); + } + return moduleExports; + } + + private unpatchChannelModel(moduleExports: any) { + if (isWrapped(moduleExports.Channel.prototype.publish)) { + this._unwrap(moduleExports.Channel.prototype, 'publish'); + } + if (isWrapped(moduleExports.Channel.prototype.consume)) { + this._unwrap(moduleExports.Channel.prototype, 'consume'); + } + if (isWrapped(moduleExports.Channel.prototype.ack)) { + this._unwrap(moduleExports.Channel.prototype, 'ack'); + } + if (isWrapped(moduleExports.Channel.prototype.nack)) { + this._unwrap(moduleExports.Channel.prototype, 'nack'); + } + if (isWrapped(moduleExports.Channel.prototype.reject)) { + this._unwrap(moduleExports.Channel.prototype, 'reject'); + } + if (isWrapped(moduleExports.Channel.prototype.ackAll)) { + this._unwrap(moduleExports.Channel.prototype, 'ackAll'); + } + if (isWrapped(moduleExports.Channel.prototype.nackAll)) { + this._unwrap(moduleExports.Channel.prototype, 'nackAll'); + } + if (isWrapped(moduleExports.Channel.prototype.emit)) { + this._unwrap(moduleExports.Channel.prototype, 'emit'); + } + if (isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { + this._unwrap(moduleExports.ConfirmChannel.prototype, 'publish'); + } + return moduleExports; + } + + private getConnectPatch( + original: ( + url: string | Options.Connect, + socketOptions: any, + openCallback: (err: any, connection: Connection) => void, + ) => Connection, + ) { + const self = this; + return function patchedConnect( + this: unknown, + url: string | Options.Connect, + socketOptions: any, + openCallback: Function, + ) { + return original.call( + this, + url, + socketOptions, + function (this: unknown, err: any, conn: InstrumentationConnection) { + if (err == null) { + const urlAttributes = getConnectionAttributesFromUrl(url, self._netSemconvStability); + const serverAttributes = getConnectionAttributesFromServer(conn); + conn[CONNECTION_ATTRIBUTES] = { + ...urlAttributes, + ...serverAttributes, + }; + } + openCallback.apply(this, arguments); + }, + ); + }; + } + + private getChannelEmitPatch(original: Function) { + const self = this; + return function emit(this: InstrumentationConsumeChannel, eventName: string) { + if (eventName === 'close') { + self.endAllSpansOnChannel(this, true, EndOperation.ChannelClosed, undefined); + const activeTimer = this[CHANNEL_CONSUME_TIMEOUT_TIMER]; + if (activeTimer) { + clearInterval(activeTimer); + } + this[CHANNEL_CONSUME_TIMEOUT_TIMER] = undefined; + } else if (eventName === 'error') { + self.endAllSpansOnChannel(this, true, EndOperation.ChannelError, undefined); + } + return original.apply(this, arguments); + }; + } + + private getAckAllPatch(isRejected: boolean, endOperation: EndOperation, original: Function) { + const self = this; + return function ackAll(this: InstrumentationConsumeChannel, requeueOrEmpty?: boolean): void { + self.endAllSpansOnChannel(this, isRejected, endOperation, requeueOrEmpty); + return original.apply(this, arguments); + }; + } + + private getAckPatch(isRejected: boolean, endOperation: EndOperation, original: Function) { + const self = this; + return function ack( + this: InstrumentationConsumeChannel, + message: Message, + allUpToOrRequeue?: boolean, + requeue?: boolean, + ): void { + const channel = this; + // we use this patch in reject function as well, but it has different signature + const requeueResolved = endOperation === EndOperation.Reject ? allUpToOrRequeue : requeue; + + const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; + const msgIndex = spansNotEnded.findIndex(msgDetails => msgDetails.msg === message); + if (msgIndex < 0) { + // should not happen in happy flow + // but possible if user is calling the api function ack twice with same message + self.endConsumerSpan(message, isRejected, endOperation, requeueResolved); + } else if (endOperation !== EndOperation.Reject && allUpToOrRequeue) { + for (let i = 0; i <= msgIndex; i++) { + self.endConsumerSpan(spansNotEnded[i]!.msg, isRejected, endOperation, requeueResolved); + } + spansNotEnded.splice(0, msgIndex + 1); + } else { + self.endConsumerSpan(message, isRejected, endOperation, requeueResolved); + spansNotEnded.splice(msgIndex, 1); + } + return original.apply(this, arguments); + }; + } + + private getConsumePatch(moduleVersion: string | undefined, original: Function) { + const self = this; + return function consume( + this: InstrumentationConsumeChannel, + queue: string, + onMessage: (msg: ConsumeMessage | null) => void, + options?: Options.Consume, + ): Promise { + const channel = this; + if (!Object.prototype.hasOwnProperty.call(channel, CHANNEL_SPANS_NOT_ENDED)) { + const { consumeTimeoutMs } = self.getConfig(); + if (consumeTimeoutMs) { + const timer = setInterval(() => { + self.checkConsumeTimeoutOnChannel(channel); + }, consumeTimeoutMs); + timer.unref(); + channel[CHANNEL_CONSUME_TIMEOUT_TIMER] = timer; + } + channel[CHANNEL_SPANS_NOT_ENDED] = []; + } + + const patchedOnMessage = function (this: unknown, msg: InstrumentationConsumeMessage | null) { + // msg is expected to be null for signaling consumer cancel notification + // https://www.rabbitmq.com/consumer-cancel.html + // in this case, we do not start a span, as this is not a real message. + if (!msg) { + return onMessage.call(this, msg); + } + + const headers = msg.properties.headers ?? {}; + let parentContext: Context | undefined = propagation.extract(ROOT_CONTEXT, headers); + const exchange = msg.fields?.exchange; + let links: Link[] | undefined; + if (self._config.useLinksForConsume) { + const parentSpanContext = parentContext ? trace.getSpan(parentContext)?.spanContext() : undefined; + parentContext = undefined; + if (parentSpanContext) { + links = [ + { + context: parentSpanContext, + }, + ]; + } + } + const span = self.tracer.startSpan( + `${queue} process`, + { + kind: SpanKind.CONSUMER, + attributes: { + ...channel?.connection?.[CONNECTION_ATTRIBUTES], + [ATTR_MESSAGING_DESTINATION]: exchange, + [ATTR_MESSAGING_DESTINATION_KIND]: MESSAGING_DESTINATION_KIND_VALUE_TOPIC, + [ATTR_MESSAGING_RABBITMQ_ROUTING_KEY]: msg.fields?.routingKey, + [ATTR_MESSAGING_OPERATION]: MESSAGING_OPERATION_VALUE_PROCESS, + [OLD_ATTR_MESSAGING_MESSAGE_ID]: msg?.properties.messageId, + [ATTR_MESSAGING_CONVERSATION_ID]: msg?.properties.correlationId, + }, + links, + }, + parentContext, + ); + + const { consumeHook } = self.getConfig(); + if (consumeHook) { + safeExecuteInTheMiddle( + () => consumeHook(span, { moduleVersion, msg }), + e => { + if (e) { + diag.error('amqplib instrumentation: consumerHook error', e); + } + }, + true, + ); + } + + if (!options?.noAck) { + // store the message on the channel so we can close the span on ackAll etc + channel[CHANNEL_SPANS_NOT_ENDED]!.push({ + msg, + timeOfConsume: hrTime(), + }); + + // store the span on the message, so we can end it when user call 'ack' on it + msg[MESSAGE_STORED_SPAN] = span; + } + const setContext: Context = parentContext ? parentContext : ROOT_CONTEXT; + context.with(trace.setSpan(setContext, span), () => { + onMessage.call(this, msg); + }); + + if (options?.noAck) { + self.callConsumeEndHook(span, msg, false, EndOperation.AutoAck); + span.end(); + } + }; + arguments[1] = patchedOnMessage; + return original.apply(this, arguments); + }; + } + + private getConfirmedPublishPatch(moduleVersion: string | undefined, original: Function) { + const self = this; + return function confirmedPublish( + this: InstrumentationConsumeChannel, + exchange: string, + routingKey: string, + content: Buffer, + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void, + ): boolean { + const channel = this; + const { span, modifiedOptions } = self.createPublishSpan(self, exchange, routingKey, channel, options); + + const { publishHook } = self.getConfig(); + if (publishHook) { + safeExecuteInTheMiddle( + () => + publishHook(span, { + moduleVersion, + exchange, + routingKey, + content, + options: modifiedOptions, + isConfirmChannel: true, + }), + e => { + if (e) { + diag.error('amqplib instrumentation: publishHook error', e); + } + }, + true, + ); + } + + const patchedOnConfirm = function (this: unknown, err: any, ok: Replies.Empty) { + try { + callback?.call(this, err, ok); + } finally { + const { publishConfirmHook } = self.getConfig(); + if (publishConfirmHook) { + safeExecuteInTheMiddle( + () => + publishConfirmHook(span, { + moduleVersion, + exchange, + routingKey, + content, + options, + isConfirmChannel: true, + confirmError: err, + }), + e => { + if (e) { + diag.error('amqplib instrumentation: publishConfirmHook error', e); + } + }, + true, + ); + } + + if (err) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: "message confirmation has been nack'ed", + }); + } + span.end(); + } + }; + + // calling confirm channel publish function is storing the message in queue and registering the callback for broker confirm. + // span ends in the patched callback. + const markedContext = markConfirmChannelTracing(context.active()); + const argumentsCopy = [...arguments]; + argumentsCopy[3] = modifiedOptions; + argumentsCopy[4] = context.bind( + unmarkConfirmChannelTracing(trace.setSpan(markedContext, span)), + patchedOnConfirm, + ); + return context.with(markedContext, original.bind(this, ...argumentsCopy)); + }; + } + + private getPublishPatch(moduleVersion: string | undefined, original: Function) { + const self = this; + return function publish( + this: InstrumentationPublishChannel, + exchange: string, + routingKey: string, + content: Buffer, + options?: Options.Publish, + ): boolean { + if (isConfirmChannelTracing(context.active())) { + // work already done + return original.apply(this, arguments); + } else { + const channel = this; + const { span, modifiedOptions } = self.createPublishSpan(self, exchange, routingKey, channel, options); + + const { publishHook } = self.getConfig(); + if (publishHook) { + safeExecuteInTheMiddle( + () => + publishHook(span, { + moduleVersion, + exchange, + routingKey, + content, + options: modifiedOptions, + isConfirmChannel: false, + }), + e => { + if (e) { + diag.error('amqplib instrumentation: publishHook error', e); + } + }, + true, + ); + } + + // calling normal channel publish function is only storing the message in queue. + // it does not send it and waits for an ack, so the span duration is expected to be very short. + const argumentsCopy = [...arguments]; + argumentsCopy[3] = modifiedOptions; + const originalRes = original.apply(this, argumentsCopy as any); + span.end(); + return originalRes; + } + }; + } + + private createPublishSpan( + self: this, + exchange: string, + routingKey: string, + channel: InstrumentationPublishChannel, + options?: Options.Publish, + ) { + const normalizedExchange = normalizeExchange(exchange); + + const span = self.tracer.startSpan(`publish ${normalizedExchange}`, { + kind: SpanKind.PRODUCER, + attributes: { + ...channel.connection[CONNECTION_ATTRIBUTES], + [ATTR_MESSAGING_DESTINATION]: exchange, + [ATTR_MESSAGING_DESTINATION_KIND]: MESSAGING_DESTINATION_KIND_VALUE_TOPIC, + + [ATTR_MESSAGING_RABBITMQ_ROUTING_KEY]: routingKey, + [OLD_ATTR_MESSAGING_MESSAGE_ID]: options?.messageId, + [ATTR_MESSAGING_CONVERSATION_ID]: options?.correlationId, + }, + }); + const modifiedOptions = options ?? {}; + modifiedOptions.headers = modifiedOptions.headers ?? {}; + + propagation.inject(trace.setSpan(context.active(), span), modifiedOptions.headers); + + return { span, modifiedOptions }; + } + + private endConsumerSpan( + message: InstrumentationMessage, + isRejected: boolean | null, + operation: EndOperation, + requeue: boolean | undefined, + ) { + const storedSpan: Span | undefined = message[MESSAGE_STORED_SPAN]; + if (!storedSpan) return; + if (isRejected !== false) { + storedSpan.setStatus({ + code: SpanStatusCode.ERROR, + message: + operation !== EndOperation.ChannelClosed && operation !== EndOperation.ChannelError + ? `${operation} called on message${ + requeue === true ? ' with requeue' : requeue === false ? ' without requeue' : '' + }` + : operation, + }); + } + this.callConsumeEndHook(storedSpan, message, isRejected, operation); + storedSpan.end(); + message[MESSAGE_STORED_SPAN] = undefined; + } + + private endAllSpansOnChannel( + channel: InstrumentationConsumeChannel, + isRejected: boolean, + operation: EndOperation, + requeue: boolean | undefined, + ) { + const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; + spansNotEnded.forEach(msgDetails => { + this.endConsumerSpan(msgDetails.msg, isRejected, operation, requeue); + }); + channel[CHANNEL_SPANS_NOT_ENDED] = []; + } + + private callConsumeEndHook( + span: Span, + msg: InstrumentationMessage, + rejected: boolean | null, + endOperation: EndOperation, + ) { + const { consumeEndHook } = this.getConfig(); + if (!consumeEndHook) return; + + safeExecuteInTheMiddle( + () => consumeEndHook(span, { msg, rejected, endOperation }), + e => { + if (e) { + diag.error('amqplib instrumentation: consumerEndHook error', e); + } + }, + true, + ); + } + + private checkConsumeTimeoutOnChannel(channel: InstrumentationConsumeChannel) { + const currentTime = hrTime(); + const spansNotEnded = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; + let i: number; + const { consumeTimeoutMs } = this.getConfig(); + for (i = 0; i < spansNotEnded.length; i++) { + const currMessage = spansNotEnded[i]!; + const timeFromConsume = hrTimeDuration(currMessage.timeOfConsume, currentTime); + if (hrTimeToMilliseconds(timeFromConsume) < consumeTimeoutMs!) { + break; + } + this.endConsumerSpan(currMessage.msg, null, EndOperation.InstrumentationTimeout, true); + } + spansNotEnded.splice(0, i); + } +} diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/semconv-obsolete.ts b/packages/node/src/integrations/tracing/amqplib/vendored/semconv-obsolete.ts new file mode 100644 index 000000000000..3d6bb5f15d2d --- /dev/null +++ b/packages/node/src/integrations/tracing/amqplib/vendored/semconv-obsolete.ts @@ -0,0 +1,101 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NOTICE from the Sentry authors: + * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib + * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 + */ +/* eslint-disable */ + +/* + * This file contains constants for values that where replaced/removed from + * Semantic Conventions long enough ago that they do not have `ATTR_*` + * constants in the `@opentelemetry/semantic-conventions` package. Eventually + * it is expected that this instrumention will be updated to emit telemetry + * using modern Semantic Conventions, dropping the need for the constants in + * this file. + */ + +/** + * The message destination name. This might be equal to the span name but is required nevertheless. + * + * @deprecated Use ATTR_MESSAGING_DESTINATION_NAME in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). + */ +export const ATTR_MESSAGING_DESTINATION = 'messaging.destination' as const; + +/** + * The kind of message destination. + * + * @deprecated Removed in semconv v1.20.0. + */ +export const ATTR_MESSAGING_DESTINATION_KIND = 'messaging.destination_kind' as const; + +/** + * RabbitMQ message routing key. + * + * @deprecated Use ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). + */ +export const ATTR_MESSAGING_RABBITMQ_ROUTING_KEY = 'messaging.rabbitmq.routing_key' as const; + +/** + * A string identifying the kind of message consumption as defined in the [Operation names](#operation-names) section above. If the operation is "send", this attribute MUST NOT be set, since the operation can be inferred from the span kind in that case. + * + * @deprecated Use MESSAGING_OPERATION_TYPE_VALUE_PROCESS in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). + */ +export const MESSAGING_OPERATION_VALUE_PROCESS = 'process' as const; + +/** + * The name of the transport protocol. + * + * @deprecated Use ATTR_NETWORK_PROTOCOL_NAME. + */ +export const ATTR_MESSAGING_PROTOCOL = 'messaging.protocol' as const; + +/** + * The version of the transport protocol. + * + * @deprecated Use ATTR_NETWORK_PROTOCOL_VERSION. + */ +export const ATTR_MESSAGING_PROTOCOL_VERSION = 'messaging.protocol_version' as const; + +/** + * Connection string. + * + * @deprecated Removed in semconv v1.17.0. + */ +export const ATTR_MESSAGING_URL = 'messaging.url' as const; + +/** + * The kind of message destination. + * + * @deprecated Removed in semconv v1.20.0. + */ +export const MESSAGING_DESTINATION_KIND_VALUE_TOPIC = 'topic' as const; + +/** + * A value used by the messaging system as an identifier for the message, represented as a string. + * + * @deprecated Use ATTR_MESSAGING_MESSAGE_ID in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). + * + * Note: changing to `ATTR_MESSAGING_MESSAGE_ID` means a change in value from `messaging.message_id` to `messaging.message.id`. + */ +export const OLD_ATTR_MESSAGING_MESSAGE_ID = 'messaging.message_id' as const; + +/** + * The [conversation ID](#conversations) identifying the conversation to which the message belongs, represented as a string. Sometimes called "Correlation ID". + * + * @deprecated Use ATTR_MESSAGING_MESSAGE_CONVERSATION_ID in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). + */ +export const ATTR_MESSAGING_CONVERSATION_ID = 'messaging.conversation_id' as const; diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/semconv.ts b/packages/node/src/integrations/tracing/amqplib/vendored/semconv.ts new file mode 100644 index 000000000000..cf9a5d214911 --- /dev/null +++ b/packages/node/src/integrations/tracing/amqplib/vendored/semconv.ts @@ -0,0 +1,70 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NOTICE from the Sentry authors: + * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib + * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 + */ +/* eslint-disable */ + +/* + * This file contains a copy of unstable semantic convention definitions + * used by this package. + * @see https://github.com/open-telemetry/opentelemetry-js/tree/main/semantic-conventions#unstable-semconv + */ + +/** + * Deprecated, use `messaging.operation.type` instead. + * + * @example publish + * @example create + * @example process + * + * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + * + * @deprecated Replaced by `messaging.operation.type`. + */ +export const ATTR_MESSAGING_OPERATION = 'messaging.operation' as const; + +/** + * The messaging system as identified by the client instrumentation. + * + * @note The actual messaging system may differ from the one known by the client. For example, when using Kafka client libraries to communicate with Azure Event Hubs, the `messaging.system` is set to `kafka` based on the instrumentation's best knowledge. + * + * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + */ +export const ATTR_MESSAGING_SYSTEM = 'messaging.system' as const; + +/** + * Deprecated, use `server.address` on client spans and `client.address` on server spans. + * + * @example example.com + * + * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + * + * @deprecated Replaced by `server.address` on client spans and `client.address` on server spans. + */ +export const ATTR_NET_PEER_NAME = 'net.peer.name' as const; + +/** + * Deprecated, use `server.port` on client spans and `client.port` on server spans. + * + * @example 8080 + * + * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. + * + * @deprecated Replaced by `server.port` on client spans and `client.port` on server spans. + */ +export const ATTR_NET_PEER_PORT = 'net.peer.port' as const; diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/types.ts b/packages/node/src/integrations/tracing/amqplib/vendored/types.ts new file mode 100644 index 000000000000..e8cbf8b38dbf --- /dev/null +++ b/packages/node/src/integrations/tracing/amqplib/vendored/types.ts @@ -0,0 +1,184 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NOTICE from the Sentry authors: + * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib + * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 + */ +/* eslint-disable */ + +import { Span } from '@opentelemetry/api'; +import { InstrumentationConfig } from '@opentelemetry/instrumentation'; + +export interface PublishInfo { + moduleVersion: string | undefined; + exchange: string; + routingKey: string; + content: Buffer; + options?: AmqplibPublishOptions; + isConfirmChannel?: boolean; +} + +export interface PublishConfirmedInfo extends PublishInfo { + confirmError?: any; +} + +export interface ConsumeInfo { + moduleVersion: string | undefined; + msg: ConsumeMessage; +} + +export interface ConsumeEndInfo { + msg: ConsumeMessage; + rejected: boolean | null; + endOperation: EndOperation; +} + +export interface AmqplibPublishCustomAttributeFunction { + (span: Span, publishInfo: PublishInfo): void; +} + +export interface AmqplibPublishConfirmCustomAttributeFunction { + (span: Span, publishConfirmedInto: PublishConfirmedInfo): void; +} + +export interface AmqplibConsumeCustomAttributeFunction { + (span: Span, consumeInfo: ConsumeInfo): void; +} + +export interface AmqplibConsumeEndCustomAttributeFunction { + (span: Span, consumeEndInfo: ConsumeEndInfo): void; +} + +export enum EndOperation { + AutoAck = 'auto ack', + Ack = 'ack', + AckAll = 'ackAll', + Reject = 'reject', + Nack = 'nack', + NackAll = 'nackAll', + ChannelClosed = 'channel closed', + ChannelError = 'channel error', + InstrumentationTimeout = 'instrumentation timeout', +} + +export interface AmqplibInstrumentationConfig extends InstrumentationConfig { + /** hook for adding custom attributes before publish message is sent */ + publishHook?: AmqplibPublishCustomAttributeFunction; + + /** hook for adding custom attributes after publish message is confirmed by the broker */ + publishConfirmHook?: AmqplibPublishConfirmCustomAttributeFunction; + + /** hook for adding custom attributes before consumer message is processed */ + consumeHook?: AmqplibConsumeCustomAttributeFunction; + + /** hook for adding custom attributes after consumer message is acked to server */ + consumeEndHook?: AmqplibConsumeEndCustomAttributeFunction; + + /** + * When user is setting up consume callback, it is user's responsibility to call + * ack/nack etc on the msg to resolve it in the server. + * If user is not calling the ack, the message will stay in the queue until + * channel is closed, or until server timeout expires (if configured). + * While we wait for the ack, a reference to the message is stored in plugin, which + * will never be garbage collected. + * To prevent memory leak, plugin has it's own configuration of timeout, which + * will close the span if user did not call ack after this timeout. + * If timeout is not big enough, span might be closed with 'InstrumentationTimeout', + * and then received valid ack from the user later which will not be instrumented. + * + * Default is 1 minute + */ + consumeTimeoutMs?: number; + + /** option to use a span link for the consume message instead of continuing a trace */ + useLinksForConsume?: boolean; +} + +export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = { + consumeTimeoutMs: 1000 * 60, // 1 minute + useLinksForConsume: false, +}; + +// The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510 + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L108 +// This exists in `@types/amqplib` as `Options.Publish`. We're renaming things +// here to avoid importing the whole Options namespace. +export interface AmqplibPublishOptions { + expiration?: string | number; + userId?: string; + CC?: string | string[]; + + mandatory?: boolean; + persistent?: boolean; + deliveryMode?: boolean | number; + BCC?: string | string[]; + + contentType?: string; + contentEncoding?: string; + headers?: any; + priority?: number; + correlationId?: string; + replyTo?: string; + messageId?: string; + timestamp?: number; + type?: string; + appId?: string; +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L142 +export interface Message { + content: Buffer; + fields: MessageFields; + properties: MessageProperties; +} + +export interface ConsumeMessage extends Message { + fields: ConsumeMessageFields; +} + +export interface CommonMessageFields { + deliveryTag: number; + redelivered: boolean; + exchange: string; + routingKey: string; +} + +export interface MessageFields extends CommonMessageFields { + messageCount?: number; + consumerTag?: string; +} + +export interface ConsumeMessageFields extends CommonMessageFields { + deliveryTag: number; +} + +export interface MessageProperties { + contentType: any | undefined; + contentEncoding: any | undefined; + headers: any; + deliveryMode: any | undefined; + priority: any | undefined; + correlationId: any | undefined; + replyTo: any | undefined; + expiration: any | undefined; + messageId: any | undefined; + timestamp: any | undefined; + type: any | undefined; + userId: any | undefined; + appId: any | undefined; + clusterId: any | undefined; +} diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/utils.ts b/packages/node/src/integrations/tracing/amqplib/vendored/utils.ts new file mode 100644 index 000000000000..5b682634b078 --- /dev/null +++ b/packages/node/src/integrations/tracing/amqplib/vendored/utils.ts @@ -0,0 +1,202 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NOTICE from the Sentry authors: + * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib + * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 + * - Some types vendored from @types/amqplib with simplifications + */ +/* eslint-disable */ + +import { Context, createContextKey, diag, HrTime, Span, Attributes, AttributeValue } from '@opentelemetry/api'; +import { SemconvStability } from '@opentelemetry/instrumentation'; +import { ATTR_SERVER_ADDRESS, ATTR_SERVER_PORT } from '@opentelemetry/semantic-conventions'; +import { ATTR_MESSAGING_SYSTEM, ATTR_NET_PEER_NAME, ATTR_NET_PEER_PORT } from './semconv'; +import { ATTR_MESSAGING_PROTOCOL, ATTR_MESSAGING_PROTOCOL_VERSION, ATTR_MESSAGING_URL } from './semconv-obsolete'; +import type { Connection, Channel, ConfirmChannel, Options } from './amqplib-types'; +import type { ConsumeMessage, Message } from './types'; + +export const MESSAGE_STORED_SPAN: unique symbol = Symbol('opentelemetry.amqplib.message.stored-span'); +export const CHANNEL_SPANS_NOT_ENDED: unique symbol = Symbol('opentelemetry.amqplib.channel.spans-not-ended'); +export const CHANNEL_CONSUME_TIMEOUT_TIMER: unique symbol = Symbol( + 'opentelemetry.amqplib.channel.consumer-timeout-timer', +); +export const CONNECTION_ATTRIBUTES: unique symbol = Symbol('opentelemetry.amqplib.connection.attributes'); + +export type InstrumentationConnection = Connection & { + [CONNECTION_ATTRIBUTES]?: Attributes; +}; +export type InstrumentationPublishChannel = (Channel | ConfirmChannel) & { + connection: InstrumentationConnection; +}; +export type InstrumentationConsumeChannel = Channel & { + connection: InstrumentationConnection; + [CHANNEL_SPANS_NOT_ENDED]?: { + msg: ConsumeMessage; + timeOfConsume: HrTime; + }[]; + [CHANNEL_CONSUME_TIMEOUT_TIMER]?: NodeJS.Timeout; +}; +export type InstrumentationMessage = Message & { + [MESSAGE_STORED_SPAN]?: Span; +}; +export type InstrumentationConsumeMessage = ConsumeMessage & { + [MESSAGE_STORED_SPAN]?: Span; +}; + +const IS_CONFIRM_CHANNEL_CONTEXT_KEY: symbol = createContextKey('opentelemetry.amqplib.channel.is-confirm-channel'); + +export const normalizeExchange = (exchangeName: string) => (exchangeName !== '' ? exchangeName : ''); + +const censorPassword = (url: string): string => { + return url.replace(/:[^:@/]*@/, ':***@'); +}; + +const getPort = (portFromUrl: number | undefined, resolvedProtocol: string): number => { + // we are using the resolved protocol which is upper case + // this code mimic the behavior of the amqplib which is used to set connection params + return portFromUrl || (resolvedProtocol === 'AMQP' ? 5672 : 5671); +}; + +const getProtocol = (protocolFromUrl: string | undefined): string => { + const resolvedProtocol = protocolFromUrl || 'amqp'; + // the substring removed the ':' part of the protocol ('amqp:' -> 'amqp') + const noEndingColon = resolvedProtocol.endsWith(':') + ? resolvedProtocol.substring(0, resolvedProtocol.length - 1) + : resolvedProtocol; + // upper cases to match spec + return noEndingColon.toUpperCase(); +}; + +const getHostname = (hostnameFromUrl: string | undefined): string => { + // if user supplies empty hostname, it gets forwarded to 'net' package which default it to localhost. + // https://nodejs.org/docs/latest-v12.x/api/net.html#net_socket_connect_options_connectlistener + return hostnameFromUrl || 'localhost'; +}; + +const extractConnectionAttributeOrLog = ( + url: string | Options.Connect, + attributeKey: string, + attributeValue: AttributeValue, + nameForLog: string, +): Attributes => { + if (attributeValue) { + return { [attributeKey]: attributeValue }; + } else { + diag.error(`amqplib instrumentation: could not extract connection attribute ${nameForLog} from user supplied url`, { + url, + }); + return {}; + } +}; + +export const getConnectionAttributesFromServer = (conn: Connection): Attributes => { + const product = conn.serverProperties.product?.toLowerCase?.(); + if (product) { + return { + [ATTR_MESSAGING_SYSTEM]: product, + }; + } else { + return {}; + } +}; + +export const getConnectionAttributesFromUrl = ( + url: string | Options.Connect, + netSemconvStability: SemconvStability, +): Attributes => { + const attributes: Attributes = { + [ATTR_MESSAGING_PROTOCOL_VERSION]: '0.9.1', // this is the only protocol supported by the instrumented library + }; + + url = url || 'amqp://localhost'; + if (typeof url === 'object') { + const connectOptions = url as Options.Connect; + + const protocol = getProtocol(connectOptions?.protocol); + Object.assign(attributes, { + ...extractConnectionAttributeOrLog(url, ATTR_MESSAGING_PROTOCOL, protocol, 'protocol'), + }); + + const hostname = getHostname(connectOptions?.hostname); + if (netSemconvStability & SemconvStability.OLD) { + Object.assign(attributes, { + ...extractConnectionAttributeOrLog(url, ATTR_NET_PEER_NAME, hostname, 'hostname'), + }); + } + if (netSemconvStability & SemconvStability.STABLE) { + Object.assign(attributes, { + ...extractConnectionAttributeOrLog(url, ATTR_SERVER_ADDRESS, hostname, 'hostname'), + }); + } + + const port = getPort(connectOptions.port, protocol); + if (netSemconvStability & SemconvStability.OLD) { + Object.assign(attributes, extractConnectionAttributeOrLog(url, ATTR_NET_PEER_PORT, port, 'port')); + } + if (netSemconvStability & SemconvStability.STABLE) { + Object.assign(attributes, extractConnectionAttributeOrLog(url, ATTR_SERVER_PORT, port, 'port')); + } + } else { + const censoredUrl = censorPassword(url); + attributes[ATTR_MESSAGING_URL] = censoredUrl; + try { + const urlParts = new URL(censoredUrl); + + const protocol = getProtocol(urlParts.protocol); + Object.assign(attributes, { + ...extractConnectionAttributeOrLog(censoredUrl, ATTR_MESSAGING_PROTOCOL, protocol, 'protocol'), + }); + + const hostname = getHostname(urlParts.hostname); + if (netSemconvStability & SemconvStability.OLD) { + Object.assign(attributes, { + ...extractConnectionAttributeOrLog(censoredUrl, ATTR_NET_PEER_NAME, hostname, 'hostname'), + }); + } + if (netSemconvStability & SemconvStability.STABLE) { + Object.assign(attributes, { + ...extractConnectionAttributeOrLog(censoredUrl, ATTR_SERVER_ADDRESS, hostname, 'hostname'), + }); + } + + const port = getPort(urlParts.port ? parseInt(urlParts.port) : undefined, protocol); + if (netSemconvStability & SemconvStability.OLD) { + Object.assign(attributes, extractConnectionAttributeOrLog(censoredUrl, ATTR_NET_PEER_PORT, port, 'port')); + } + if (netSemconvStability & SemconvStability.STABLE) { + Object.assign(attributes, extractConnectionAttributeOrLog(censoredUrl, ATTR_SERVER_PORT, port, 'port')); + } + } catch (err) { + diag.error('amqplib instrumentation: error while extracting connection details from connection url', { + censoredUrl, + err, + }); + } + } + return attributes; +}; + +export const markConfirmChannelTracing = (context: Context) => { + return context.setValue(IS_CONFIRM_CHANNEL_CONTEXT_KEY, true); +}; + +export const unmarkConfirmChannelTracing = (context: Context) => { + return context.deleteValue(IS_CONFIRM_CHANNEL_CONTEXT_KEY); +}; + +export const isConfirmChannelTracing = (context: Context) => { + return context.getValue(IS_CONFIRM_CHANNEL_CONTEXT_KEY) === true; +}; diff --git a/yarn.lock b/yarn.lock index 7337d680cc8f..25fdde3ae6c8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6058,15 +6058,6 @@ "@opentelemetry/resources" "2.6.1" "@opentelemetry/sdk-trace-base" "2.6.1" -"@opentelemetry/instrumentation-amqplib@0.61.0": - version "0.61.0" - resolved "https://registry.yarnpkg.com/@opentelemetry/instrumentation-amqplib/-/instrumentation-amqplib-0.61.0.tgz#e9d52f56dfc4cb8a26837f31c1832af18859f1f2" - integrity sha512-mCKoyTGfRNisge4br0NpOFSy2Z1NnEW8hbCJdUDdJFHrPqVzc4IIBPA/vX0U+LUcQqrQvJX+HMIU0dbDRe0i0Q== - dependencies: - "@opentelemetry/core" "^2.0.0" - "@opentelemetry/instrumentation" "^0.214.0" - "@opentelemetry/semantic-conventions" "^1.33.0" - "@opentelemetry/instrumentation-aws-sdk@0.69.0": version "0.69.0" resolved "https://registry.yarnpkg.com/@opentelemetry/instrumentation-aws-sdk/-/instrumentation-aws-sdk-0.69.0.tgz#461de2337b1931c195f0d284760206a657bdee06"