diff --git a/packages/instrumentation-amqplib/src/amqplib.ts b/packages/instrumentation-amqplib/src/amqplib.ts index a2c45385..c4e6afff 100644 --- a/packages/instrumentation-amqplib/src/amqplib.ts +++ b/packages/instrumentation-amqplib/src/amqplib.ts @@ -1,4 +1,4 @@ -import { context, diag, propagation, trace, Span, SpanKind, SpanStatusCode } from '@opentelemetry/api'; +import { context, diag, propagation, trace, Span, SpanKind, SpanStatusCode, Link } from '@opentelemetry/api'; import { InstrumentationBase, InstrumentationModuleDefinition, @@ -16,12 +16,15 @@ import type amqp from 'amqplib'; import { AmqplibInstrumentationConfig, DEFAULT_CONFIG, EndOperation } from './types'; import { CHANNEL_CONSUME_TIMEOUT_TIMER, - CHANNEL_SPANS_NOT_ENDED, + CHANNEL_MESSAGES_NOT_SETTLED, CONNECTION_ATTRIBUTES, + extractConsumerMessageAttributes, getConnectionAttributesFromUrl, + getSettlementLinks, isConfirmChannelTracing, markConfirmChannelTracing, - MESSAGE_STORED_SPAN, + MESSAGE_SETTLE_INFO, + MsgInfoForSettlement, normalizeExchange, unmarkConfirmChannelTracing, } from './utils'; @@ -182,25 +185,72 @@ export class AmqplibInstrumentation extends InstrumentationBase { private getChannelEmitPatch(original: (eventName: string, ...args: unknown[]) => void) { const self = this; return function emit(eventName: string) { + let endOperation: string | undefined; + let links: Link[] | undefined; if (eventName === 'close') { - self.endAllSpansOnChannel(this, true, EndOperation.ChannelClosed); + endOperation = EndOperation.ChannelClosed; + links = self.settleAllSpansOnChannel(this); const activeTimer = this[CHANNEL_CONSUME_TIMEOUT_TIMER]; if (activeTimer) { clearInterval(activeTimer); } delete this[CHANNEL_CONSUME_TIMEOUT_TIMER]; } else if (eventName === 'error') { - self.endAllSpansOnChannel(this, true, EndOperation.ChannelError); + endOperation = EndOperation.ChannelError; + links = self.settleAllSpansOnChannel(this); + } + + if (endOperation) { + return self.tracer.startActiveSpan( + endOperation, + { + kind: SpanKind.CLIENT, + attributes: { + [SemanticAttributes.MESSAGING_OPERATION]: 'settle', + }, + links, + }, + (span) => { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: endOperation, + }); + const res = original.apply(this, arguments); + span.end(); + return res; + } + ); + } else { + return original.apply(this, arguments); } - return original.apply(this, arguments); }; } private getAckAllPatch(isRejected: boolean, endOperation: EndOperation, original: () => void) { const self = this; return function ackAll(): void { - self.endAllSpansOnChannel(this, isRejected, endOperation); - return original.apply(this, arguments); + const links = self.settleAllSpansOnChannel(this); + return self.tracer.startActiveSpan( + endOperation, + { + kind: SpanKind.CLIENT, + attributes: { + [SemanticAttributes.MESSAGING_OPERATION]: 'settle', + }, + links, + }, + (span: Span) => { + if (isRejected) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: `${endOperation} called on channel with requeue`, + }); + } + const res = original.apply(this, arguments); + span.end(); + return res; + } + ); }; } @@ -215,22 +265,41 @@ export class AmqplibInstrumentation extends InstrumentationBase { // we use this patch in reject function as well, but it has different signature const requeueResolved = endOperation === EndOperation.Reject ? allUpTo : requeue; - const spansNotEnded: { msg: amqp.Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; - const msgIndex = spansNotEnded.findIndex((msgDetails) => msgDetails.msg === message); + let links; + const messagesNotSettled: MsgInfoForSettlement[] = channel[CHANNEL_MESSAGES_NOT_SETTLED] ?? []; + const msgIndex = messagesNotSettled.findIndex((msgDetails) => msgDetails.msg === message); if (msgIndex < 0) { // should not happen in happy flow // but possible if user is calling the api function ack twice with same message - self.endConsumerSpan(message, isRejected, endOperation, requeueResolved); + links = getSettlementLinks(message); } else if (endOperation !== EndOperation.Reject && allUpTo) { - for (let i = 0; i <= msgIndex; i++) { - self.endConsumerSpan(spansNotEnded[i].msg, isRejected, endOperation, requeueResolved); - } - spansNotEnded.splice(0, msgIndex + 1); + const settledMessages = messagesNotSettled.splice(0, msgIndex + 1); + links = settledMessages.map((info) => getSettlementLinks(info.msg)).flat(); } else { - self.endConsumerSpan(message, isRejected, endOperation, requeueResolved); - spansNotEnded.splice(msgIndex, 1); + links = getSettlementLinks(message); + messagesNotSettled.splice(msgIndex, 1); } - return original.apply(this, arguments); + return self.tracer.startActiveSpan( + endOperation, + { + kind: SpanKind.CLIENT, + attributes: { + [SemanticAttributes.MESSAGING_OPERATION]: 'settle', + }, + links, + }, + (span: Span) => { + if (isRejected) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: `${endOperation} called on message ${requeue ? 'with' : 'without'} requeue`, + }); + } + const res = original.apply(this, arguments); + span.end(); + return res; + } + ); }; } @@ -249,7 +318,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { options?: amqp.Options.Consume ): Promise { const channel = this; - if (!channel.hasOwnProperty(CHANNEL_SPANS_NOT_ENDED)) { + if (!channel.hasOwnProperty(CHANNEL_MESSAGES_NOT_SETTLED)) { if (self._config.consumeTimeoutMs) { const timer = setInterval(() => { self.checkConsumeTimeoutOnChannel(channel); @@ -261,7 +330,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { configurable: true, }); } - Object.defineProperty(channel, CHANNEL_SPANS_NOT_ENDED, { + Object.defineProperty(channel, CHANNEL_MESSAGES_NOT_SETTLED, { value: [], enumerable: false, configurable: true, @@ -277,23 +346,29 @@ export class AmqplibInstrumentation extends InstrumentationBase { } const headers = msg.properties.headers ?? {}; - const parentContext = propagation.extract(context.active(), headers); - const exchange = msg.fields?.exchange; + const senderContext = propagation.extract(context.active(), headers); + const senderSpanContext = trace.getSpanContext(senderContext); + const msgAttributes = extractConsumerMessageAttributes(msg); + + const senderLink: Link = { + context: senderSpanContext, + attributes: { ...msgAttributes, [SemanticAttributes.MESSAGING_OPERATION]: 'send' }, + }; + const links = [senderLink]; + const span = self.tracer.startSpan( - `${queue} process`, + `${queue} deliver`, { kind: SpanKind.CONSUMER, attributes: { ...channel?.connection?.[CONNECTION_ATTRIBUTES], - [SemanticAttributes.MESSAGING_DESTINATION]: exchange, - [SemanticAttributes.MESSAGING_DESTINATION_KIND]: MessagingDestinationKindValues.TOPIC, - [SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]: msg.fields?.routingKey, - [SemanticAttributes.MESSAGING_OPERATION]: MessagingOperationValues.PROCESS, - [SemanticAttributes.MESSAGING_MESSAGE_ID]: msg?.properties.messageId, - [SemanticAttributes.MESSAGING_CONVERSATION_ID]: msg?.properties.correlationId, + [SemanticAttributes.MESSAGING_DESTINATION]: queue, + [SemanticAttributes.MESSAGING_DESTINATION_KIND]: MessagingDestinationKindValues.QUEUE, + [SemanticAttributes.MESSAGING_OPERATION]: 'deliver', }, + links, }, - parentContext + context.active() ); if (self._config.moduleVersionAttributeName) { @@ -313,25 +388,30 @@ export class AmqplibInstrumentation extends InstrumentationBase { } if (!options?.noAck) { + const settlementInfo: MsgInfoForSettlement = { + deliverContext: span.spanContext(), + senderContext: senderSpanContext, + msgAttributes, + msg, + timeOfConsume: new Date(), + }; // store the message on the channel so we can close the span on ackAll etc - channel[CHANNEL_SPANS_NOT_ENDED].push({ msg, timeOfConsume: new Date() }); + channel[CHANNEL_MESSAGES_NOT_SETTLED].push(settlementInfo); // store the span on the message, so we can end it when user call 'ack' on it - Object.defineProperty(msg, MESSAGE_STORED_SPAN, { - value: span, + Object.defineProperty(msg, MESSAGE_SETTLE_INFO, { + value: settlementInfo, enumerable: false, configurable: true, }); } - context.with(trace.setSpan(context.active(), span), () => { - onMessage.call(this, msg); - }); - - if (options?.noAck) { - self.callConsumeEndHook(span, msg, false, EndOperation.AutoAck); + const onMessageRes = context.with(trace.setSpan(context.active(), span), onMessage, this, msg); + // always end the span when the callback returns + Promise.resolve(onMessageRes).then(() => { span.end(); - } + }); + return onMessageRes; }; arguments[1] = patchedOnMessage; return original.apply(this, arguments); @@ -494,7 +574,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { ) { const normalizedExchange = normalizeExchange(exchange); - const span = self.tracer.startSpan(`${normalizedExchange} -> ${routingKey} send`, { + const span = self.tracer.startSpan(`${normalizedExchange} publish`, { kind: SpanKind.PRODUCER, attributes: { ...channel.connection[CONNECTION_ATTRIBUTES], @@ -516,61 +596,36 @@ export class AmqplibInstrumentation extends InstrumentationBase { return { span, modifiedOptions }; } - private endConsumerSpan(message: amqp.Message, isRejected: boolean, operation: EndOperation, requeue: boolean) { - const storedSpan: Span = message[MESSAGE_STORED_SPAN]; - if (!storedSpan) return; - if (isRejected !== false) { - storedSpan.setStatus({ - code: SpanStatusCode.ERROR, - message: - operation !== EndOperation.ChannelClosed && operation !== EndOperation.ChannelError - ? `${operation} called on message ${requeue ? 'with' : 'without'} requeue` - : operation, - }); - } - this.callConsumeEndHook(storedSpan, message, isRejected, operation); - storedSpan.end(); - delete message[MESSAGE_STORED_SPAN]; - } - - private endAllSpansOnChannel(channel: amqp.Channel, isRejected: boolean, operation: EndOperation) { - const spansNotEnded: { msg: amqp.Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; - spansNotEnded.forEach((msgDetails) => { - this.endConsumerSpan(msgDetails.msg, isRejected, operation, null); - }); - Object.defineProperty(channel, CHANNEL_SPANS_NOT_ENDED, { + private settleAllSpansOnChannel(channel: amqp.Channel): Link[] { + const msgsNotSettled: MsgInfoForSettlement[] = channel[CHANNEL_MESSAGES_NOT_SETTLED] ?? []; + const links = msgsNotSettled.map((info) => getSettlementLinks(info.msg)).flat(); + Object.defineProperty(channel, CHANNEL_MESSAGES_NOT_SETTLED, { value: [], enumerable: false, configurable: true, }); - } - - private callConsumeEndHook(span: Span, msg: amqp.ConsumeMessage, rejected: boolean, endOperation: EndOperation) { - if (!this._config.consumeEndHook) return; - - safeExecuteInTheMiddle( - () => this._config.consumeEndHook(span, msg, rejected, endOperation), - (e) => { - if (e) { - diag.error('amqplib instrumentation: consumerEndHook error', e); - } - }, - true - ); + return links; } private checkConsumeTimeoutOnChannel(channel: amqp.Channel) { const currentTime = new Date().getTime(); - const spansNotEnded: { msg: amqp.Message; timeOfConsume: Date }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; + const spansNotSettled: MsgInfoForSettlement[] = channel[CHANNEL_MESSAGES_NOT_SETTLED] ?? []; let i: number; - for (i = 0; i < spansNotEnded.length; i++) { - const currMessage = spansNotEnded[i]; + for (i = 0; i < spansNotSettled.length; i++) { + const currMessage = spansNotSettled[i]; const timeFromConsume = currentTime - currMessage.timeOfConsume.getTime(); if (timeFromConsume < this._config.consumeTimeoutMs) { break; } - this.endConsumerSpan(currMessage.msg, null, EndOperation.InstrumentationTimeout, true); } - spansNotEnded.splice(0, i); + + // common case - no messages + if (i === 0) { + return; + } + + const timedoutMsgs = spansNotSettled.splice(0, i); + // TODO: how should we report these messages being timed out? + // should we create settlement span for them? just not settle them? } } diff --git a/packages/instrumentation-amqplib/src/types.ts b/packages/instrumentation-amqplib/src/types.ts index 2263f358..db5aaef9 100644 --- a/packages/instrumentation-amqplib/src/types.ts +++ b/packages/instrumentation-amqplib/src/types.ts @@ -22,10 +22,6 @@ export interface AmqplibConsumerCustomAttributeFunction { (span: Span, msg: amqp.ConsumeMessage): void; } -export interface AmqplibConsumerEndCustomAttributeFunction { - (span: Span, msg: amqp.ConsumeMessage, rejected: boolean, endOperation: EndOperation): void; -} - export enum EndOperation { AutoAck = 'auto ack', Ack = 'ack', @@ -48,9 +44,6 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig { /** hook for adding custom attributes before consumer message is processed */ consumeHook?: AmqplibConsumerCustomAttributeFunction; - /** hook for adding custom attributes after consumer message is acked to server */ - consumeEndHook?: AmqplibConsumerEndCustomAttributeFunction; - /** * If passed, a span attribute will be added to all spans with key of the provided "moduleVersionAttributeName" * and value of the module version. diff --git a/packages/instrumentation-amqplib/src/utils.ts b/packages/instrumentation-amqplib/src/utils.ts index edc75a91..e85516b5 100644 --- a/packages/instrumentation-amqplib/src/utils.ts +++ b/packages/instrumentation-amqplib/src/utils.ts @@ -1,9 +1,17 @@ -import { Context, createContextKey, diag, SpanAttributes, SpanAttributeValue } from '@opentelemetry/api'; +import { + Context, + createContextKey, + diag, + SpanAttributes, + SpanAttributeValue, + Link, + SpanContext, +} from '@opentelemetry/api'; import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; import type amqp from 'amqplib'; -export const MESSAGE_STORED_SPAN: unique symbol = Symbol('opentelemetry.amqplib.message.stored-span'); -export const CHANNEL_SPANS_NOT_ENDED: unique symbol = Symbol('opentelemetry.amqplib.channel.spans-not-ended'); +export const MESSAGE_SETTLE_INFO: unique symbol = Symbol('opentelemetry.amqplib.message.settle-info'); +export const CHANNEL_MESSAGES_NOT_SETTLED: unique symbol = Symbol('opentelemetry.amqplib.channel.messages-not-settled'); export const CHANNEL_CONSUME_TIMEOUT_TIMER: unique symbol = Symbol( 'opentelemetry.amqplib.channel.consumer-timeout-timer' ); @@ -11,6 +19,14 @@ export const CONNECTION_ATTRIBUTES: unique symbol = Symbol('opentelemetry.amqpli const IS_CONFIRM_CHANNEL_CONTEXT_KEY: symbol = createContextKey('opentelemetry.amqplib.channel.is-confirm-channel'); +export interface MsgInfoForSettlement { + deliverContext: SpanContext; + senderContext: SpanContext; + msgAttributes: SpanAttributes; + msg: amqp.Message; + timeOfConsume: Date; +} + export const normalizeExchange = (exchangeName: string) => (exchangeName !== '' ? exchangeName : ''); const censorPassword = (url: string): string => { @@ -135,3 +151,33 @@ export const unmarkConfirmChannelTracing = (context: Context) => { export const isConfirmChannelTracing = (context: Context) => { return context.getValue(IS_CONFIRM_CHANNEL_CONTEXT_KEY) === true; }; + +// those are attributes in the scope of a single message +export const extractConsumerMessageAttributes = (msg: amqp.ConsumeMessage): SpanAttributes => { + return { + ['messagin.rabbitmq.exchange_name']: msg.fields?.exchange, + [SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]: msg.fields?.routingKey, + [SemanticAttributes.MESSAGING_MESSAGE_ID]: msg.properties.messageId, + [SemanticAttributes.MESSAGING_CONVERSATION_ID]: msg.properties.correlationId, + }; +}; + +export const getSettlementLinks = (message: amqp.Message): Link[] => { + const settlementInfo: MsgInfoForSettlement = message[MESSAGE_SETTLE_INFO]; + if (!settlementInfo) return []; + delete message[MESSAGE_SETTLE_INFO]; + const deliverLink = { + context: settlementInfo.deliverContext, + attributes: { + ...settlementInfo.msgAttributes, + [SemanticAttributes.MESSAGING_OPERATION]: 'deliver', + }, + }; + const senderLink = { + context: settlementInfo.senderContext, + attributes: { + [SemanticAttributes.MESSAGING_OPERATION]: 'send', + }, + }; + return [deliverLink, senderLink]; +};