Skip to content

Commit fa07ecf

Browse files
authored
fix amqplib flaky dsm tests (#5445)
1 parent 9d3ec95 commit fa07ecf

File tree

1 file changed

+67
-53
lines changed

1 file changed

+67
-53
lines changed

packages/datadog-plugin-amqplib/test/index.spec.js

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,24 @@
22

33
const agent = require('../../dd-trace/test/plugins/agent')
44
const { ERROR_MESSAGE, ERROR_STACK, ERROR_TYPE } = require('../../dd-trace/src/constants')
5+
const id = require('../../dd-trace/src/id')
6+
const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor')
7+
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
58

69
const { expectedSchema, rawExpectedSchema } = require('./naming')
710

811
describe('Plugin', () => {
912
let tracer
1013
let connection
1114
let channel
15+
let queue
1216

1317
describe('amqplib', () => {
1418
withVersions('amqplib', 'amqplib', version => {
1519
beforeEach(() => {
1620
process.env.DD_DATA_STREAMS_ENABLED = 'true'
1721
tracer = require('../../dd-trace')
22+
queue = `test-${id()}`
1823
})
1924

2025
afterEach(() => {
@@ -40,26 +45,22 @@ describe('Plugin', () => {
4045
})
4146
})
4247

48+
afterEach(() => {
49+
return agent.close({ ritmReset: false })
50+
})
51+
4352
describe('without plugin', () => {
4453
it('should run commands normally', done => {
45-
channel.assertQueue('test', {}, () => { done() })
54+
channel.assertQueue(queue, {}, () => { done() })
4655
})
4756
})
4857

4958
describe('when using a callback', () => {
50-
before(() => {
51-
return agent.load('amqplib')
52-
})
53-
54-
after(() => {
55-
return agent.close({ ritmReset: false })
56-
})
57-
5859
describe('when sending commands', () => {
5960
withPeerService(
6061
() => tracer,
6162
'amqplib',
62-
() => channel.assertQueue('test', {}, () => {}),
63+
() => channel.assertQueue(queue, {}, () => {}),
6364
'localhost',
6465
'out.host'
6566
)
@@ -70,7 +71,7 @@ describe('Plugin', () => {
7071
const span = traces[0][0]
7172
expect(span).to.have.property('name', expectedSchema.controlPlane.opName)
7273
expect(span).to.have.property('service', expectedSchema.controlPlane.serviceName)
73-
expect(span).to.have.property('resource', 'queue.declare test')
74+
expect(span).to.have.property('resource', `queue.declare ${queue}`)
7475
expect(span).to.not.have.property('type')
7576
expect(span.meta).to.have.property('span.kind', 'client')
7677
expect(span.meta).to.have.property('out.host', 'localhost')
@@ -80,7 +81,7 @@ describe('Plugin', () => {
8081
.then(done)
8182
.catch(done)
8283

83-
channel.assertQueue('test', {}, () => {})
84+
channel.assertQueue(queue, {}, () => {})
8485
})
8586

8687
it('should do automatic instrumentation for queued commands', done => {
@@ -90,7 +91,7 @@ describe('Plugin', () => {
9091

9192
expect(span).to.have.property('name', expectedSchema.controlPlane.opName)
9293
expect(span).to.have.property('service', expectedSchema.controlPlane.serviceName)
93-
expect(span).to.have.property('resource', 'queue.delete test')
94+
expect(span).to.have.property('resource', `queue.delete ${queue}`)
9495
expect(span).to.not.have.property('type')
9596
expect(span.meta).to.have.property('span.kind', 'client')
9697
expect(span.meta).to.have.property('out.host', 'localhost')
@@ -100,8 +101,8 @@ describe('Plugin', () => {
100101
.then(done)
101102
.catch(done)
102103

103-
channel.assertQueue('test', {}, () => {})
104-
channel.deleteQueue('test', () => {})
104+
channel.assertQueue(queue, {}, () => {})
105+
channel.deleteQueue(queue, () => {})
105106
})
106107

107108
it('should handle errors', done => {
@@ -128,7 +129,7 @@ describe('Plugin', () => {
128129
})
129130

130131
withNamingSchema(
131-
() => channel.assertQueue('test', {}, () => {}),
132+
() => channel.assertQueue(queue, {}, () => {}),
132133
rawExpectedSchema.controlPlane
133134
)
134135
})
@@ -137,7 +138,7 @@ describe('Plugin', () => {
137138
withPeerService(
138139
() => tracer,
139140
'amqplib',
140-
() => channel.assertQueue('test', {}, () => {}),
141+
() => channel.assertQueue(queue, {}, () => {}),
141142
'localhost',
142143
'out.host'
143144
)
@@ -181,7 +182,7 @@ describe('Plugin', () => {
181182
.catch(done)
182183

183184
try {
184-
channel.sendToQueue('test', 'invalid')
185+
channel.sendToQueue(queue, 'invalid')
185186
} catch (e) {
186187
error = e
187188
}
@@ -293,7 +294,7 @@ describe('Plugin', () => {
293294
})
294295

295296
it('should run the callback in the parent context', done => {
296-
channel.assertQueue('test', {})
297+
channel.assertQueue(queue, {})
297298
.then(() => {
298299
expect(tracer.scope().active()).to.be.null
299300
done()
@@ -305,22 +306,32 @@ describe('Plugin', () => {
305306
describe('when data streams monitoring is enabled', function () {
306307
this.timeout(10000)
307308

308-
const expectedProducerHashWithTopic = '16804605750389532869'
309-
const expectedProducerHashWithExchange = '2722596631431228032'
310-
311-
const expectedConsumerHash = '17529824252700998941'
312-
313-
before(() => {
314-
tracer = require('../../dd-trace')
315-
tracer.use('amqplib')
316-
})
309+
let expectedProducerHashWithTopic
310+
let expectedProducerHashWithExchange
311+
let expectedConsumerHash
317312

318-
before(async () => {
319-
return agent.load('amqplib')
320-
})
321-
322-
after(() => {
323-
return agent.close({ ritmReset: false })
313+
beforeEach(() => {
314+
const producerHashWithTopic = computePathwayHash('test', 'tester', [
315+
'direction:out',
316+
'has_routing_key:true',
317+
`topic:${queue}`,
318+
'type:rabbitmq'
319+
], ENTRY_PARENT_HASH)
320+
321+
expectedProducerHashWithTopic = producerHashWithTopic.readBigUInt64BE(0).toString()
322+
323+
expectedProducerHashWithExchange = computePathwayHash('test', 'tester', [
324+
'direction:out',
325+
'exchange:namedExchange',
326+
'has_routing_key:true',
327+
'type:rabbitmq'
328+
], ENTRY_PARENT_HASH).readBigUInt64BE(0).toString()
329+
330+
expectedConsumerHash = computePathwayHash('test', 'tester', [
331+
'direction:in',
332+
`topic:${queue}`,
333+
'type:rabbitmq'
334+
], producerHashWithTopic).readBigUInt64BE(0).toString()
324335
})
325336

326337
it('Should emit DSM stats to the agent when sending a message on an unnamed exchange', done => {
@@ -338,13 +349,13 @@ describe('Plugin', () => {
338349
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
339350
'direction:out',
340351
'has_routing_key:true',
341-
'topic:testDSM',
352+
`topic:${queue}`,
342353
'type:rabbitmq'
343354
])
344355
expect(agent.dsmStatsExist(agent, expectedProducerHashWithTopic)).to.equal(true)
345356
}, { timeoutMs: 10000 }).then(done, done)
346357

347-
channel.assertQueue('testDSM', {}, (err, ok) => {
358+
channel.assertQueue(queue, {}, (err, ok) => {
348359
if (err) return done(err)
349360

350361
channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
@@ -390,15 +401,16 @@ describe('Plugin', () => {
390401
})
391402
}
392403
})
393-
expect(statsPointsReceived.length).to.be.at.least(1)
394-
expect(statsPointsReceived[0].EdgeTags).to.deep.equal(
395-
['direction:in', 'topic:testDSM', 'type:rabbitmq'])
404+
expect(statsPointsReceived.length).to.equal(2)
405+
expect(statsPointsReceived[1].EdgeTags).to.deep.equal(
406+
['direction:in', `topic:${queue}`, 'type:rabbitmq'])
396407
expect(agent.dsmStatsExist(agent, expectedConsumerHash)).to.equal(true)
397408
}, { timeoutMs: 10000 }).then(done, done)
398409

399-
channel.assertQueue('testDSM', {}, (err, ok) => {
410+
channel.assertQueue(queue, {}, (err, ok) => {
400411
if (err) return done(err)
401412

413+
channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
402414
channel.consume(ok.queue, () => {}, {}, (err, ok) => {
403415
if (err) done(err)
404416
})
@@ -416,17 +428,17 @@ describe('Plugin', () => {
416428
})
417429
}
418430
})
419-
expect(statsPointsReceived.length).to.be.at.least(1)
431+
expect(statsPointsReceived.length).to.equal(1)
420432
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
421433
'direction:out',
422434
'has_routing_key:true',
423-
'topic:testDSM',
435+
`topic:${queue}`,
424436
'type:rabbitmq'
425437
])
426438
expect(agent.dsmStatsExist(agent, expectedProducerHashWithTopic)).to.equal(true)
427439
}, { timeoutMs: 10000 }).then(done, done)
428440

429-
channel.assertQueue('testDSM', {}, (err, ok) => {
441+
channel.assertQueue(queue, {}, (err, ok) => {
430442
if (err) return done(err)
431443

432444
channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
@@ -444,23 +456,24 @@ describe('Plugin', () => {
444456
})
445457
}
446458
})
447-
expect(statsPointsReceived.length).to.be.at.least(1)
448-
expect(statsPointsReceived[0].EdgeTags).to.deep.equal(
449-
['direction:in', 'topic:testDSM', 'type:rabbitmq'])
459+
expect(statsPointsReceived.length).to.equal(2)
460+
expect(statsPointsReceived[1].EdgeTags).to.deep.equal(
461+
['direction:in', `topic:${queue}`, 'type:rabbitmq'])
450462
expect(agent.dsmStatsExist(agent, expectedConsumerHash)).to.equal(true)
451463
}, { timeoutMs: 10000 }).then(done, done)
452464

453-
channel.assertQueue('testDSM', {}, (err, ok) => {
465+
channel.assertQueue(queue, {}, (err, ok) => {
454466
if (err) return done(err)
455467

468+
channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
456469
channel.get(ok.queue, {}, (err, ok) => {
457470
if (err) done(err)
458471
})
459472
})
460473
})
461474

462475
it('Should set pathway hash tag on a span when producing', (done) => {
463-
channel.assertQueue('testDSM', {}, (err, ok) => {
476+
channel.assertQueue(queue, {}, (err, ok) => {
464477
if (err) return done(err)
465478

466479
channel.sendToQueue(ok.queue, Buffer.from('dsm test'))
@@ -481,9 +494,10 @@ describe('Plugin', () => {
481494
})
482495

483496
it('Should set pathway hash tag on a span when consuming', (done) => {
484-
channel.assertQueue('testDSM', {}, (err, ok) => {
497+
channel.assertQueue(queue, {}, (err, ok) => {
485498
if (err) return done(err)
486499

500+
channel.sendToQueue(ok.queue, Buffer.from('dsm test'))
487501
channel.consume(ok.queue, () => {}, {}, (err, ok) => {
488502
if (err) return done(err)
489503

@@ -506,7 +520,7 @@ describe('Plugin', () => {
506520
})
507521

508522
describe('with configuration', () => {
509-
after(() => {
523+
afterEach(() => {
510524
return agent.close({ ritmReset: false })
511525
})
512526

@@ -531,16 +545,16 @@ describe('Plugin', () => {
531545
agent
532546
.use(traces => {
533547
expect(traces[0][0]).to.have.property('service', 'test-custom-service')
534-
expect(traces[0][0]).to.have.property('resource', 'queue.declare test')
548+
expect(traces[0][0]).to.have.property('resource', `queue.declare ${queue}`)
535549
}, 2)
536550
.then(done)
537551
.catch(done)
538552

539-
channel.assertQueue('test', {}, () => {})
553+
channel.assertQueue(queue, {}, () => {})
540554
})
541555

542556
withNamingSchema(
543-
() => channel.assertQueue('test', {}, () => {}),
557+
() => channel.assertQueue(queue, {}, () => {}),
544558
{
545559
v0: {
546560
opName: 'amqp.command',

0 commit comments

Comments
 (0)