Skip to content

Commit

Permalink
Instrument AMQP batched publishing (#2659)
Browse files Browse the repository at this point in the history
This fixes #2644 by configuring the extension to trace the necessary
methods.
  • Loading branch information
lcobucci committed May 13, 2024
1 parent d60b961 commit 2e11319
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 0 deletions.
56 changes: 56 additions & 0 deletions src/DDTrace/Integrations/AMQP/AMQPIntegration.php
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,62 @@ function ($This) use ($integration) {
]
);

trace_method(
"PhpAmqpLib\Channel\AMQPChannel",
"batch_basic_publish",
[
'prehook' => function (SpanData $span, $args) use ($integration) {
/** @var AMQPMessage $message */
$message = $args[0];
if (!is_null($message)) {
$integration->injectContext($message);
}
},
'posthook' => function (SpanData $span, $args, $exception) use ($integration) {
/** @var AMQPMessage $message */
$message = $args[0];
/** @var string $exchange */
$exchange = $args[1];
/** @var string $routing_key */
$routingKey = $args[2] ?? '';

$exchangeDisplayName = $integration->formatExchangeName($exchange);
$routingKeyDisplayName = $integration->formatRoutingKey($routingKey);

$integration->setGenericTags(
$span,
'batch_basic.add',
'producer',
"$exchangeDisplayName -> $routingKeyDisplayName",
$exception
);

$span->meta[Tag::RABBITMQ_ROUTING_KEY] = $routingKeyDisplayName;
$span->meta[Tag::RABBITMQ_EXCHANGE] = $exchangeDisplayName;

if (!is_null($message)) {
$span->meta[Tag::MQ_MESSAGE_PAYLOAD_SIZE] = strlen($message->getBody());
$integration->setOptionalMessageTags($span, $message);
}
}
]
);

trace_method(
"PhpAmqpLib\Channel\AMQPChannel",
"publish_batch",
function (SpanData $span, $args, $exception) use ($integration) {
$integration->setGenericTags(
$span,
'publish_batch',
'producer',
null,
$exception
);
$span->meta[Tag::MQ_OPERATION] = 'send';
}
);

trace_method(
"PhpAmqpLib\Channel\AMQPChannel",
"basic_consume",
Expand Down
110 changes: 110 additions & 0 deletions tests/Integrations/AMQP/AMQPTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -955,4 +955,114 @@ public function testDistributedTracingIsNotPropagatedIfDisabled()
$this->assertNotSame($basicPublishSpan['trace_id'], $basicDeliverSpan['trace_id']);
$this->assertArrayNotHasKey('parent_id', $basicDeliverSpan);
}

public function testBatchedPublishing()
{
$traces = $this->isolateTracer(function () {
$producerConnection = $this->connectionToServer();
$producerChannel = $producerConnection->channel();
$producerChannel->queue_declare('hello-1', false, false, false, false);
$producerChannel->queue_declare('hello-2', false, false, false, false);
$producerChannel->batch_basic_publish(new AMQPMessage('hello there'), '', 'hello-1');
$producerChannel->batch_basic_publish(new AMQPMessage('world'), '', 'hello-2');
$producerChannel->publish_batch();

// Wait for the server to ack the messages, but no more than 3 seconds
// Note: Blocking call
$producerChannel->wait_for_pending_acks(3);

$producerChannel->close();
$producerConnection->close();
});

$this->assertFlameGraph($traces, [
SpanAssertion::build(
'amqp.connect',
'amqp',
'queue',
'connect'
)->withExactTags([
Tag::SPAN_KIND => 'client',
Tag::COMPONENT => 'amqp',
Tag::MQ_SYSTEM => 'rabbitmq',
Tag::MQ_DESTINATION_KIND => 'queue',
Tag::MQ_PROTOCOL => 'AMQP',
Tag::MQ_PROTOCOL_VERSION => AMQPChannel::getProtocolVersion(),
]),
SpanAssertion::build(
'amqp.queue.declare',
'amqp',
'queue',
'queue.declare hello-1'
)->withExactTags([
Tag::SPAN_KIND => 'client',
Tag::COMPONENT => 'amqp',
Tag::MQ_SYSTEM => 'rabbitmq',
Tag::MQ_DESTINATION_KIND => 'queue',
Tag::MQ_PROTOCOL => 'AMQP',
Tag::MQ_PROTOCOL_VERSION => AMQPChannel::getProtocolVersion(),
Tag::MQ_DESTINATION => 'hello-1',
]),
SpanAssertion::build(
'amqp.queue.declare',
'amqp',
'queue',
'queue.declare hello-2'
)->withExactTags([
Tag::SPAN_KIND => 'client',
Tag::COMPONENT => 'amqp',
Tag::MQ_SYSTEM => 'rabbitmq',
Tag::MQ_DESTINATION_KIND => 'queue',
Tag::MQ_PROTOCOL => 'AMQP',
Tag::MQ_PROTOCOL_VERSION => AMQPChannel::getProtocolVersion(),
Tag::MQ_DESTINATION => 'hello-2',
]),
SpanAssertion::build(
'amqp.batch_basic.add',
'amqp',
'queue',
'batch_basic.add <default> -> hello-1'
)->withExactTags([
Tag::SPAN_KIND => 'producer',
Tag::COMPONENT => 'amqp',
Tag::MQ_SYSTEM => 'rabbitmq',
Tag::RABBITMQ_ROUTING_KEY => 'hello-1',
Tag::MQ_DESTINATION_KIND => 'queue',
Tag::MQ_PROTOCOL => 'AMQP',
Tag::MQ_PROTOCOL_VERSION => AMQPChannel::getProtocolVersion(),
Tag::MQ_MESSAGE_PAYLOAD_SIZE => 11,
Tag::RABBITMQ_EXCHANGE => '<default>',
]),
SpanAssertion::build(
'amqp.batch_basic.add',
'amqp',
'queue',
'batch_basic.add <default> -> hello-2'
)->withExactTags([
Tag::SPAN_KIND => 'producer',
Tag::COMPONENT => 'amqp',
Tag::MQ_SYSTEM => 'rabbitmq',
Tag::RABBITMQ_ROUTING_KEY => 'hello-2',
Tag::MQ_DESTINATION_KIND => 'queue',
Tag::MQ_PROTOCOL => 'AMQP',
Tag::MQ_PROTOCOL_VERSION => AMQPChannel::getProtocolVersion(),
Tag::MQ_MESSAGE_PAYLOAD_SIZE => 5,
Tag::RABBITMQ_EXCHANGE => '<default>',
]),
SpanAssertion::build(
'amqp.publish_batch',
'amqp',
'queue',
'publish_batch'
)->withExactTags([
Tag::SPAN_KIND => 'producer',
Tag::COMPONENT => 'amqp',
Tag::MQ_SYSTEM => 'rabbitmq',
Tag::MQ_DESTINATION_KIND => 'queue',
Tag::MQ_PROTOCOL => 'AMQP',
Tag::MQ_PROTOCOL_VERSION => AMQPChannel::getProtocolVersion(),
Tag::MQ_OPERATION => 'send',
])
]);
}
}

0 comments on commit 2e11319

Please sign in to comment.