Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: prototype new spec for messaging systems #210

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
221 changes: 138 additions & 83 deletions packages/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { context, diag, propagation, trace, Span, SpanKind, SpanStatusCode } from '@opentelemetry/api';
import { context, diag, propagation, trace, Span, SpanKind, SpanStatusCode, Link } from '@opentelemetry/api';
import {
InstrumentationBase,
InstrumentationModuleDefinition,
Expand All @@ -16,12 +16,15 @@ import type amqp from 'amqplib';
import { AmqplibInstrumentationConfig, DEFAULT_CONFIG, EndOperation } from './types';
import {
CHANNEL_CONSUME_TIMEOUT_TIMER,
CHANNEL_SPANS_NOT_ENDED,
CHANNEL_MESSAGES_NOT_SETTLED,
CONNECTION_ATTRIBUTES,
extractConsumerMessageAttributes,
getConnectionAttributesFromUrl,
getSettlementLinks,
isConfirmChannelTracing,
markConfirmChannelTracing,
MESSAGE_STORED_SPAN,
MESSAGE_SETTLE_INFO,
MsgInfoForSettlement,
normalizeExchange,
unmarkConfirmChannelTracing,
} from './utils';
Expand Down Expand Up @@ -182,25 +185,72 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
private getChannelEmitPatch(original: (eventName: string, ...args: unknown[]) => void) {
const self = this;
return function emit(eventName: string) {
let endOperation: string | undefined;
let links: Link[] | undefined;
if (eventName === 'close') {
self.endAllSpansOnChannel(this, true, EndOperation.ChannelClosed);
endOperation = EndOperation.ChannelClosed;
links = self.settleAllSpansOnChannel(this);
const activeTimer = this[CHANNEL_CONSUME_TIMEOUT_TIMER];
if (activeTimer) {
clearInterval(activeTimer);
}
delete this[CHANNEL_CONSUME_TIMEOUT_TIMER];
} else if (eventName === 'error') {
self.endAllSpansOnChannel(this, true, EndOperation.ChannelError);
endOperation = EndOperation.ChannelError;
links = self.settleAllSpansOnChannel(this);
}

if (endOperation) {
return self.tracer.startActiveSpan(
endOperation,
{
kind: SpanKind.CLIENT,
attributes: {
[SemanticAttributes.MESSAGING_OPERATION]: 'settle',
},
links,
},
(span) => {
span.setStatus({
code: SpanStatusCode.ERROR,
message: endOperation,
});
const res = original.apply(this, arguments);
span.end();
return res;
}
);
} else {
return original.apply(this, arguments);
}
return original.apply(this, arguments);
};
}

private getAckAllPatch(isRejected: boolean, endOperation: EndOperation, original: () => void) {
const self = this;
return function ackAll(): void {
self.endAllSpansOnChannel(this, isRejected, endOperation);
return original.apply(this, arguments);
const links = self.settleAllSpansOnChannel(this);
return self.tracer.startActiveSpan(
endOperation,
{
kind: SpanKind.CLIENT,
attributes: {
[SemanticAttributes.MESSAGING_OPERATION]: 'settle',
},
links,
},
(span: Span) => {
if (isRejected) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: `${endOperation} called on channel with requeue`,
});
}
const res = original.apply(this, arguments);
span.end();
return res;
}
);
};
}

Expand All @@ -215,22 +265,41 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
// we use this patch in reject function as well, but it has different signature
const requeueResolved = endOperation === EndOperation.Reject ? allUpTo : requeue;

const spansNotEnded: { msg: amqp.Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? [];
const msgIndex = spansNotEnded.findIndex((msgDetails) => msgDetails.msg === message);
let links;
const messagesNotSettled: MsgInfoForSettlement[] = channel[CHANNEL_MESSAGES_NOT_SETTLED] ?? [];
const msgIndex = messagesNotSettled.findIndex((msgDetails) => msgDetails.msg === message);
if (msgIndex < 0) {
// should not happen in happy flow
// but possible if user is calling the api function ack twice with same message
self.endConsumerSpan(message, isRejected, endOperation, requeueResolved);
links = getSettlementLinks(message);
} else if (endOperation !== EndOperation.Reject && allUpTo) {
for (let i = 0; i <= msgIndex; i++) {
self.endConsumerSpan(spansNotEnded[i].msg, isRejected, endOperation, requeueResolved);
}
spansNotEnded.splice(0, msgIndex + 1);
const settledMessages = messagesNotSettled.splice(0, msgIndex + 1);
links = settledMessages.map((info) => getSettlementLinks(info.msg)).flat();
} else {
self.endConsumerSpan(message, isRejected, endOperation, requeueResolved);
spansNotEnded.splice(msgIndex, 1);
links = getSettlementLinks(message);
messagesNotSettled.splice(msgIndex, 1);
}
return original.apply(this, arguments);
return self.tracer.startActiveSpan(
endOperation,
{
kind: SpanKind.CLIENT,
attributes: {
[SemanticAttributes.MESSAGING_OPERATION]: 'settle',
},
links,
},
(span: Span) => {
if (isRejected) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: `${endOperation} called on message ${requeue ? 'with' : 'without'} requeue`,
});
}
const res = original.apply(this, arguments);
span.end();
return res;
}
);
};
}

Expand All @@ -249,7 +318,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
options?: amqp.Options.Consume
): Promise<amqp.Replies.Consume> {
const channel = this;
if (!channel.hasOwnProperty(CHANNEL_SPANS_NOT_ENDED)) {
if (!channel.hasOwnProperty(CHANNEL_MESSAGES_NOT_SETTLED)) {
if (self._config.consumeTimeoutMs) {
const timer = setInterval(() => {
self.checkConsumeTimeoutOnChannel(channel);
Expand All @@ -261,7 +330,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
configurable: true,
});
}
Object.defineProperty(channel, CHANNEL_SPANS_NOT_ENDED, {
Object.defineProperty(channel, CHANNEL_MESSAGES_NOT_SETTLED, {
value: [],
enumerable: false,
configurable: true,
Expand All @@ -277,23 +346,29 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
}

const headers = msg.properties.headers ?? {};
const parentContext = propagation.extract(context.active(), headers);
const exchange = msg.fields?.exchange;
const senderContext = propagation.extract(context.active(), headers);
const senderSpanContext = trace.getSpanContext(senderContext);
const msgAttributes = extractConsumerMessageAttributes(msg);

const senderLink: Link = {
context: senderSpanContext,
attributes: { ...msgAttributes, [SemanticAttributes.MESSAGING_OPERATION]: 'send' },
};
const links = [senderLink];

const span = self.tracer.startSpan(
`${queue} process`,
`${queue} deliver`,
{
kind: SpanKind.CONSUMER,
attributes: {
...channel?.connection?.[CONNECTION_ATTRIBUTES],
[SemanticAttributes.MESSAGING_DESTINATION]: exchange,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]: MessagingDestinationKindValues.TOPIC,
[SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY]: msg.fields?.routingKey,
[SemanticAttributes.MESSAGING_OPERATION]: MessagingOperationValues.PROCESS,
[SemanticAttributes.MESSAGING_MESSAGE_ID]: msg?.properties.messageId,
[SemanticAttributes.MESSAGING_CONVERSATION_ID]: msg?.properties.correlationId,
[SemanticAttributes.MESSAGING_DESTINATION]: queue,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]: MessagingDestinationKindValues.QUEUE,
[SemanticAttributes.MESSAGING_OPERATION]: 'deliver',
},
links,
},
parentContext
context.active()
);

if (self._config.moduleVersionAttributeName) {
Expand All @@ -313,25 +388,30 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
}

if (!options?.noAck) {
const settlementInfo: MsgInfoForSettlement = {
deliverContext: span.spanContext(),
senderContext: senderSpanContext,
msgAttributes,
msg,
timeOfConsume: new Date(),
};
// store the message on the channel so we can close the span on ackAll etc
channel[CHANNEL_SPANS_NOT_ENDED].push({ msg, timeOfConsume: new Date() });
channel[CHANNEL_MESSAGES_NOT_SETTLED].push(settlementInfo);

// store the span on the message, so we can end it when user call 'ack' on it
Object.defineProperty(msg, MESSAGE_STORED_SPAN, {
value: span,
Object.defineProperty(msg, MESSAGE_SETTLE_INFO, {
value: settlementInfo,
enumerable: false,
configurable: true,
});
}

context.with(trace.setSpan(context.active(), span), () => {
onMessage.call(this, msg);
});

if (options?.noAck) {
self.callConsumeEndHook(span, msg, false, EndOperation.AutoAck);
const onMessageRes = context.with(trace.setSpan(context.active(), span), onMessage, this, msg);
// always end the span when the callback returns
Promise.resolve(onMessageRes).then(() => {
span.end();
}
});
return onMessageRes;
};
arguments[1] = patchedOnMessage;
return original.apply(this, arguments);
Expand Down Expand Up @@ -494,7 +574,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
) {
const normalizedExchange = normalizeExchange(exchange);

const span = self.tracer.startSpan(`${normalizedExchange} -> ${routingKey} send`, {
const span = self.tracer.startSpan(`${normalizedExchange} publish`, {
kind: SpanKind.PRODUCER,
attributes: {
...channel.connection[CONNECTION_ATTRIBUTES],
Expand All @@ -516,61 +596,36 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
return { span, modifiedOptions };
}

private endConsumerSpan(message: amqp.Message, isRejected: boolean, operation: EndOperation, requeue: boolean) {
const storedSpan: Span = message[MESSAGE_STORED_SPAN];
if (!storedSpan) return;
if (isRejected !== false) {
storedSpan.setStatus({
code: SpanStatusCode.ERROR,
message:
operation !== EndOperation.ChannelClosed && operation !== EndOperation.ChannelError
? `${operation} called on message ${requeue ? 'with' : 'without'} requeue`
: operation,
});
}
this.callConsumeEndHook(storedSpan, message, isRejected, operation);
storedSpan.end();
delete message[MESSAGE_STORED_SPAN];
}

private endAllSpansOnChannel(channel: amqp.Channel, isRejected: boolean, operation: EndOperation) {
const spansNotEnded: { msg: amqp.Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? [];
spansNotEnded.forEach((msgDetails) => {
this.endConsumerSpan(msgDetails.msg, isRejected, operation, null);
});
Object.defineProperty(channel, CHANNEL_SPANS_NOT_ENDED, {
private settleAllSpansOnChannel(channel: amqp.Channel): Link[] {
const msgsNotSettled: MsgInfoForSettlement[] = channel[CHANNEL_MESSAGES_NOT_SETTLED] ?? [];
const links = msgsNotSettled.map((info) => getSettlementLinks(info.msg)).flat();
Object.defineProperty(channel, CHANNEL_MESSAGES_NOT_SETTLED, {
value: [],
enumerable: false,
configurable: true,
});
}

private callConsumeEndHook(span: Span, msg: amqp.ConsumeMessage, rejected: boolean, endOperation: EndOperation) {
if (!this._config.consumeEndHook) return;

safeExecuteInTheMiddle(
() => this._config.consumeEndHook(span, msg, rejected, endOperation),
(e) => {
if (e) {
diag.error('amqplib instrumentation: consumerEndHook error', e);
}
},
true
);
return links;
}

private checkConsumeTimeoutOnChannel(channel: amqp.Channel) {
const currentTime = new Date().getTime();
const spansNotEnded: { msg: amqp.Message; timeOfConsume: Date }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? [];
const spansNotSettled: MsgInfoForSettlement[] = channel[CHANNEL_MESSAGES_NOT_SETTLED] ?? [];
let i: number;
for (i = 0; i < spansNotEnded.length; i++) {
const currMessage = spansNotEnded[i];
for (i = 0; i < spansNotSettled.length; i++) {
const currMessage = spansNotSettled[i];
const timeFromConsume = currentTime - currMessage.timeOfConsume.getTime();
if (timeFromConsume < this._config.consumeTimeoutMs) {
break;
}
this.endConsumerSpan(currMessage.msg, null, EndOperation.InstrumentationTimeout, true);
}
spansNotEnded.splice(0, i);

// common case - no messages
if (i === 0) {
return;
}

const timedoutMsgs = spansNotSettled.splice(0, i);
// TODO: how should we report these messages being timed out?
// should we create settlement span for them? just not settle them?
}
}
7 changes: 0 additions & 7 deletions packages/instrumentation-amqplib/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ export interface AmqplibConsumerCustomAttributeFunction {
(span: Span, msg: amqp.ConsumeMessage): void;
}

export interface AmqplibConsumerEndCustomAttributeFunction {
(span: Span, msg: amqp.ConsumeMessage, rejected: boolean, endOperation: EndOperation): void;
}

export enum EndOperation {
AutoAck = 'auto ack',
Ack = 'ack',
Expand All @@ -48,9 +44,6 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig {
/** hook for adding custom attributes before consumer message is processed */
consumeHook?: AmqplibConsumerCustomAttributeFunction;

/** hook for adding custom attributes after consumer message is acked to server */
consumeEndHook?: AmqplibConsumerEndCustomAttributeFunction;

/**
* If passed, a span attribute will be added to all spans with key of the provided "moduleVersionAttributeName"
* and value of the module version.
Expand Down