Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Camel Kafka OpenTelemetry Duplicate traceId #5379

Open
avsoskerem opened this issue Sep 29, 2023 · 1 comment
Open

Camel Kafka OpenTelemetry Duplicate traceId #5379

avsoskerem opened this issue Sep 29, 2023 · 1 comment
Labels

Comments

@avsoskerem
Copy link

avsoskerem commented Sep 29, 2023

Bug description

  I have many routes like the one below. When it comes under heavy load, that is, when there are too many messages from Kafka, the route seems to get stuck. Each event starts appearing with the same traceId. He only does this sometimes. Sometimes It happens after 3-4 days.  I expect to see 1 received log with 1 traceId.
@Inject
CamelContext camelContext;

route:

camelContext.setUseMDCLogging(true);
 from("kafka:{{kafka.test.in.topic.name}}").routeId(getRouteId("test", RouteType.KAFKA.getValue()))
                .log("Received : ${body}")
                .to("bean:EventUtil?method=createEvent(*,test,in)") -> mongo save async 
                .to("bean:testServiceImp?method=process(*)") ->process and postgre save if same data exist return null
                .choice().when(body().isNull())
                .log("The route was terminated because body was null")
                .endChoice().otherwise()
                .to("bean:EventUtil?method=setHeader(*,grn-pnf, com.test.event.test)") -> kafka message header set
                .process(exchange -> {
                    CloudEventDTO<testDTO> message = mapper.readValue(exchange.getIn().getBody(String.class), new TypeReference<CloudEventDTO<testDTO>>() {
                    });
                    message.setId(exchange.getMessage().getHeader(CloudEvent.CE_ID.getValue(), String.class));
                    message.setSource(exchange.getMessage().getHeader(CloudEvent.CE_SOURCE.getValue(), String.class));
                    message.setType(exchange.getMessage().getHeader(CloudEvent.CE_TYPE.getValue(), String.class));
                    message.setSubject(exchange.getMessage().getHeader(CloudEvent.CE_SUBJECT.getValue(), String.class));
                    message.setSpecVersion(exchange.getMessage().getHeader(CloudEvent.CE_SPECVERSION.getValue(), String.class));
                    message.setDataContentType(exchange.getMessage().getHeader(CloudEvent.CONTENT_TYPE.getValue(), String.class));
                    message.setTime(LocalDateTime.now(Clock.systemUTC()));
                    exchange.getMessage().setBody(mapper.writeValueAsString(message));
                })
                .to("bean:EventUtil?method=createEvent(*,test,out)")
                .to("kafka:{{kafka.test.topic.name}}")
                .log("Sent: ${body} To: " + getTopicName("test", RouteDirection.OUT.getValue()))
                .end();

pom:

<properties>
      <compiler-plugin.version>3.11.0</compiler-plugin.version>
      <failsafe.useModulePath>false</failsafe.useModulePath>
      <lombok-mapstruct-binding.version>0.2.0</lombok-mapstruct-binding.version>
      <maven.compiler.release>17</maven.compiler.release>
      <org.mapstruct.version>1.5.5.Final</org.mapstruct.version>
      <org.projectlombok.version>1.18.28</org.projectlombok.version>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
      <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
      <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
      <quarkus.platform.version>3.4.1</quarkus.platform.version>
      <quarkiverse.poi.version>2.0.3</quarkiverse.poi.version>
      <quarkiverse.tika.version>2.0.2</quarkiverse.tika.version>
      <skipITs>true</skipITs>
      <sonar.qualitygate.wait>true</sonar.qualitygate.wait>
      <surefire-plugin.version>3.1.2</surefire-plugin.version>
  </properties>
  <dependencyManagement>
      <dependencies>
          <dependency>
              <groupId>${quarkus.platform.group-id}</groupId>
              <artifactId>${quarkus.platform.artifact-id}</artifactId>
              <version>${quarkus.platform.version}</version>
              <type>pom</type>
              <scope>import</scope>
          </dependency>
          <dependency>
              <groupId>${quarkus.platform.group-id}</groupId>
              <artifactId>quarkus-camel-bom</artifactId>
              <version>${quarkus.platform.version}</version>
              <type>pom</type>
              <scope>import</scope>
          </dependency>
      </dependencies>
  </dependencyManagement>
  <dependency>
          <groupId>org.apache.camel.quarkus</groupId>
          <artifactId>camel-quarkus-opentelemetry</artifactId>
      </dependency>
      
      <dependency>
          <groupId>io.opentelemetry.instrumentation</groupId>
          <artifactId>opentelemetry-jdbc</artifactId>
      </dependency>
      <dependency>
          <groupId>org.apache.camel.quarkus</groupId>
          <artifactId>camel-quarkus-core</artifactId>
      </dependency>
      <dependency>
          <groupId>org.apache.camel.quarkus</groupId>
          <artifactId>camel-quarkus-kafka</artifactId>
      </dependency>
      <dependency>
          <groupId>org.apache.camel.quarkus</groupId>
          <artifactId>camel-quarkus-direct</artifactId>
      </dependency>
      <dependency>
          <groupId>org.apache.camel.quarkus</groupId>
          <artifactId>camel-quarkus-http</artifactId>
      </dependency>
      <dependency>
          <groupId>org.apache.camel.quarkus</groupId>
          <artifactId>camel-quarkus-jackson</artifactId>
      </dependency>
      <dependency>
          <groupId>org.apache.camel.quarkus</groupId>
          <artifactId>camel-quarkus-bean</artifactId>
      </dependency>

log example:
2023-09-29 01:14:04.376
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=09e679cba592a965, spanId=2e202a2bbe6d5360, sampled=true [testInfoKafkaRoute] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) Received : {"testJson}
2023-09-29 01:14:04.377
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=2e202a2bbe6d5360, spanId=d8fb606803bb3571, sampled=true [ri.gr.wo.se.pa.im.TestInfoServiceImp] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) Processing... Incoming Test Info
2023-09-29 01:14:04.378
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=2e202a2bbe6d5360, spanId=d8fb606803bb3571, sampled=true [ri.gr.wo.se.pa.im.TestInfoSsrServiceImp] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) Processing... Incoming Test Ssr Info
2023-09-29 01:14:04.378
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=2e202a2bbe6d5360, spanId=d8fb606803bb3571, sampled=true [ri.gr.wo.se.AbstractService] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) The same data exists. Entity id: 11,015,523
2023-09-29 01:14:04.379
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=09e679cba592a965, spanId=2e202a2bbe6d5360, sampled=true [testInfoKafkaRoute] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) The route was terminated because body was null
2023-09-29 01:14:04.379
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=09e679cba592a965, spanId=1bc2b6898c093d0f, sampled=true [testInfoKafkaRoute] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) Received : {"testJson"}
2023-09-29 01:14:04.379
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=1bc2b6898c093d0f, spanId=b34387e031536af4, sampled=true [ri.gr.wo.se.pa.im.TestInfoServiceImp] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) Processing... Incoming Test Info
2023-09-29 01:14:04.380
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=1bc2b6898c093d0f, spanId=b34387e031536af4, sampled=true [ri.gr.wo.se.pa.im.TestInfoSsrServiceImp] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) Processing... Incoming Test Ssr Info
2023-09-29 01:14:04.380
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=1bc2b6898c093d0f, spanId=b34387e031536af4, sampled=true [ri.gr.wo.se.AbstractService] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) The same data exists. Entity id: 11,015,766
2023-09-29 01:14:04.381
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=09e679cba592a965, spanId=1bc2b6898c093d0f, sampled=true [testInfoKafkaRoute] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) The route was terminated because body was null
2023-09-29 01:14:04.381
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=09e679cba592a965, spanId=5b9b07ed0b44243b, sampled=true [testInfoKafkaRoute] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) Received : {"testJson"}
2023-09-29 01:14:04.381
22:14:04 INFO traceId=6012f1fb50dadbf148ff61c7a764d7f7, parentId=5b9b07ed0b44243b, spanId=82c81dd9f9f2a097, sampled=true [ri.gr.wo.se.pa.im.TestInfoServiceImp] (Camel (camel-1) thread #6 - KafkaConsumer[com.test.topic.in.test.test]) Processing... Incoming Test Info

I don't know if it will solve it or have any effect, but I recently added the following properties. I'm monitoring it, if it increases again I'll write again, I'm not sure this is the problem.

quarkus.otel.traces.sampler=traceidratio
quarkus.otel.traces.sampler.arg=1.0

@avsoskerem
Copy link
Author

I found the problem. When I increase the application's pod to 2 or 3, the trace is multiplexed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant