diff --git a/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario.mjs b/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario.mjs index b518ac6f19fb..821f5073bdba 100644 --- a/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/kafkajs/scenario.mjs @@ -23,15 +23,23 @@ async function run() { await consumer.connect(); await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }); - consumer.run({ + // Resolve once the consumer has actually joined its group. A fixed sleep + // here was racy on slow CI runners: if the producer sent before the + // consumer joined, the consumer transaction sometimes wasn't created + // within the test timeout. Register the listener before `run()` so the + // event can't fire before we're listening. + const groupJoined = new Promise(resolve => { + consumer.on(consumer.events.GROUP_JOIN, () => resolve()); + }); + + await consumer.run({ eachMessage: async ({ message }) => { // eslint-disable-next-line no-console console.debug('Received message', message.value.toString()); }, }); - // Wait for the consumer to be ready - await new Promise(resolve => setTimeout(resolve, 4000)); + await groupJoined; await producer.send({ topic: 'test-topic',