Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Context propagation between producer and consumer doesn't work. #88

Closed
IvanovOleg opened this issue Mar 25, 2021 · 10 comments
Closed

Context propagation between producer and consumer doesn't work. #88

IvanovOleg opened this issue Mar 25, 2021 · 10 comments

Comments

@IvanovOleg
Copy link

Hello,
I've created two simple apps: producer and consumer and enabled autoinstrumentation. I am getting spans from those apps in Jaeger, but they are not connected. I added the debug logging to console and see that on the producer side opentelemetry-instrumentation doesn't add any headers. Is there any specific step required to enable context propagation?
Thanks

@blumamir
Copy link
Contributor

Hi @IvanovOleg ,
Thanks for opening the issue.
What instrumentation library are you using?
aws-sdk, kafkajs or amqplib?

Can you share more of your setup?

@IvanovOleg
Copy link
Author

IvanovOleg commented Mar 25, 2021

@blumamir kafkajs

console.log(message.headers);
api_1.propagation.inject(api_1.setSpan(api_1.context.active(), span), message.headers);
console.log(message.headers);

returns empty array (I've modified the instrumentation code for debugging)

producer app:

const opentelemetry = require('@opentelemetry/api');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
const { BasicTracerProvider, ConsoleSpanExporter, SimpleSpanProcessor } = require('@opentelemetry/tracing');
const { NodeTracerProvider } = require('@opentelemetry/node');
const { CollectorTraceExporter } = require('@opentelemetry/exporter-collector');
const { KafkaJsInstrumentation } = require('opentelemetry-instrumentation-kafkajs');

const collectorOptions = {
  serviceName: 'producer-service'
};

// Create and configure NodeTracerProvider
const traceProvider = new NodeTracerProvider({
  // be sure to disable old plugin
  plugins: {
    kafkajs: { enabled: false, path: 'opentelemetry-plugin-kafkajs' }
  }
});

const exporter = new CollectorTraceExporter(collectorOptions);
traceProvider.addSpanProcessor(new SimpleSpanProcessor(exporter));
traceProvider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));

traceProvider.register()

registerInstrumentations({
  traceProvider,
  instrumentations: [
    new KafkaJsInstrumentation({
      // see under for available configuration
    })
  ]
});

const tracer = opentelemetry.trace.getTracer('producer-service');

const { Kafka, logLevel } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'producer',
  brokers: ['kafka:9092'],
  logLevel: logLevel.DEBUG
});

const producer = kafka.producer();

const run = async () => {
  await producer.connect()
  await producer.send({
    topic: 'staff',
    messages: [
      { value: 'Hello Oleg' }
    ]
  })
  await producer.disconnect()
}

run().catch(console.error)

consumer app:

const opentelemetry = require('@opentelemetry/api');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
const { BasicTracerProvider, ConsoleSpanExporter, SimpleSpanProcessor } = require('@opentelemetry/tracing');
const { NodeTracerProvider } = require('@opentelemetry/node');
const { CollectorTraceExporter } = require('@opentelemetry/exporter-collector');
const { KafkaJsInstrumentation } = require('opentelemetry-instrumentation-kafkajs');

const collectorOptions = {
  serviceName: 'consumer-service'
};

// Create and configure NodeTracerProvider
const traceProvider = new NodeTracerProvider({
  // be sure to disable old plugin
  plugins: {
    kafkajs: { enabled: false, path: 'opentelemetry-plugin-kafkajs' }
  }
});

const exporter = new CollectorTraceExporter(collectorOptions);
traceProvider.addSpanProcessor(new SimpleSpanProcessor(exporter));
traceProvider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));

traceProvider.register()

registerInstrumentations({
  traceProvider,
  instrumentations: [
    new KafkaJsInstrumentation({
      // see under for available configuration
    })
  ]
});

const tracer = opentelemetry.trace.getTracer('consumer-service');

const { Kafka, logLevel } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'consumer',
  brokers: ['kafka:9092'],
  logLevel: logLevel.DEBUG
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'staff', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        message,
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

@blumamir
Copy link
Contributor

blumamir commented Mar 25, 2021

Can you please paste here your package.json dependencies? specifically @opentelemetry/api and opentelemetry-instrumentation-kafkajs

Are you configuring any special propagators in opentelemetry? or using the default propagators that are shipped with NodeTracerProvider?

@IvanovOleg
Copy link
Author

producer:

{
  "name": "producer",
  "version": "0.1.0",
  "description": "A simple kafka producer.",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "repository": {
    "type": "git",
    "url": "git+https://github.com/IvanovOleg/nodejs-otel-sample.git"
  },
  "keywords": [
    "opentelemetry"
  ],
  "author": "Oleg Ivanov",
  "license": "ISC",
  "bugs": {
    "url": "https://github.com/IvanovOleg/nodejs-otel-sample/issues"
  },
  "homepage": "https://github.com/IvanovOleg/nodejs-otel-sample#readme",
  "dependencies": {
    "@opentelemetry/api": "^1.0.0-rc.0",
    "@opentelemetry/exporter-collector": "^0.18.0",
    "@opentelemetry/instrumentation": "^0.18.0",
    "@opentelemetry/node": "^0.18.0",
    "@opentelemetry/tracing": "^0.18.0",
    "kafkajs": "^1.15.0",
    "log4js": "^6.3.0",
    "opentelemetry-instrumentation-kafkajs": "^0.2.3"
  }
}

consumer:

{
  "name": "consumer",
  "version": "0.1.0",
  "description": "A simple kafka consumer app.",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "repository": {
    "type": "git",
    "url": "git+https://github.com/IvanovOleg/nodejs-otel-sample.git"
  },
  "keywords": [
    "opentelemetry"
  ],
  "author": "Oleg Ivanov",
  "license": "ISC",
  "bugs": {
    "url": "https://github.com/IvanovOleg/nodejs-otel-sample/issues"
  },
  "homepage": "https://github.com/IvanovOleg/nodejs-otel-sample#readme",
  "dependencies": {
    "@opentelemetry/api": "^1.0.0-rc.0",
    "@opentelemetry/exporter-collector": "^0.18.0",
    "@opentelemetry/instrumentation": "^0.18.0",
    "@opentelemetry/node": "^0.18.0",
    "@opentelemetry/tracing": "^0.18.0",
    "kafkajs": "^1.15.0",
    "log4js": "^6.3.0",
    "opentelemetry-instrumentation-kafkajs": "^0.3.0"
  }
}

@blumamir
Copy link
Contributor

I'm not sure about this case, but there have been cases of incompatibility with the @opentelemetry/api versions between the plugin and the provider.

The instrumentation library depends on "@opentelemetry/api": "^0.18.0", which grabs a different version than the one used by the provider.
I'm hoping to get to testing it soon. In the meantime, you can try to change the dependency in your application to "@opentelemetry/api": "^0.18.0". It might be a temporary workaround until we release a version compatible with the new api major version.

@IvanovOleg
Copy link
Author

@blumamir Will try. Thanks

@IvanovOleg
Copy link
Author

@blumamir I tried "@opentelemetry/api": "^0.18.0" and it doesn't work either.

@blumamir
Copy link
Contributor

Cool thanks for letting us know.
I'll need to dive deeper into this issue, probably one day next week.
I Will update here on the progress.

Please share if you find other issues with the plugin functionality/documentation

@blumamir
Copy link
Contributor

blumamir commented Mar 25, 2021

@IvanovOleg , I took a second look on your setup. your producer is using "opentelemetry-instrumentation-kafkajs": "^0.2.3", which is compatible with version v0.17.0 of opentelemetry.
You should upgrade to "opentelemetry-instrumentation-kafkajs": "^0.3.0" which is compatible with opentelemetry v0.18.0.

Until opentelemetry apis stabilize, we need to publish a separate version of instrumentations which are compatible with each breaking change in the core packages.

@IvanovOleg
Copy link
Author

IvanovOleg commented Mar 26, 2021

@blumamir Yes, 0.3.0 fixes the issue

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants