Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Component Writing Messages with ProducerTemplate #743

Closed
bframke opened this issue Feb 21, 2020 · 28 comments
Closed

Kafka Component Writing Messages with ProducerTemplate #743

bframke opened this issue Feb 21, 2020 · 28 comments
Assignees

Comments

@bframke
Copy link

bframke commented Feb 21, 2020

We currently are trying to send a high load of messages from an Test over an Camel Application with the Kafka Component. We use the ProducerTemplate to send Body and Headers over the given route for the Topic but most of the times get stuck. Its like the ProducerTemplate just wont get finished with its Task.

We also tried to put the sendBodyAndHeaders into extra Threads so that we don't clog one Thread, but we still got stuck after some messages.

Test-CURL
for i in {1..100}; do echo "$i"; curl -v -d '{"Event": "Event $i" }' -H "content-type: application/json" http://localhost:8081/topic/curltopic3/event; done

Maybe it could also be the problem that we just use one route with an everchanging list of topics?

Camel-Route Building

    public void onInit(@Observes StartupEvent evt) {
        _currentContext = _camelMain.getCamelContext();
        _logger.info("context : {}", _currentContext);
    }

    public void camelStarted(@Observes CamelMainEvents.BeforeStart evt) throws Exception {
        _currentContext = _camelMain.getCamelContext();

        RoutesBuilder builder = new RouteBuilder() {
            @Override
            public void configure() {
                from("direct:start").
                        id("toKafka").
                        log("${body}").
                        to("kafka:"+_topicList+"?brokers="+_brokerList);
            }
        };

        _currentContext.addRoutes(builder);
    }

REST-Endpoint we call to send Events

    @Path("topic/{topicId}/event")
    @Consumes(MediaType.APPLICATION_JSON)
    public Response sendEventToTopic(String body,
                         @PathParam("topicId") String topic,
                         @Context HttpHeaders httpHeaders) {
        ProducerTemplate producerTemplate = _currentContext.createProducerTemplate();
        Map<String, Object> headers = new HashMap<>();
        Map<String, Object> innerHeaders = new HashMap<>();

        filterKafkaHeaders(httpHeaders, innerHeaders);
        createStandardHeadersIfNeeded(innerHeaders);

        headers.put(KafkaConstants.PARTITION_KEY, "0");
        headers.put(KafkaConstants.KEY, "1");
        headers.put(KafkaConstants.OVERRIDE_TOPIC, topic);

        _logger.info("keaders for kafka: {}", headers);
        _logger.info("headers for payload: {}", innerHeaders);

        producerTemplate.sendBodyAndHeaders("direct:start", new MessageEnvelope(innerHeaders, body), headers);

        return Response.ok(Response.Status.OK.toString()).build();
    }
@davsclaus
Copy link
Contributor

You should not create a new producer template per request, see the FAQ
https://camel.apache.org/manual/latest/faq/why-does-camel-use-too-many-threads-with-producertemplate.html

@ppalaga
Copy link
Contributor

ppalaga commented Feb 24, 2020

@bframke
Copy link
Author

bframke commented Feb 25, 2020

Changed it to Injection, but the same problem still appears. Just trying to send 200 Messages and it gets stuck on the 46th message.

@bframke
Copy link
Author

bframke commented Feb 25, 2020

Small Update: Our Project is combined with Quarkus and the project build as a Native Image does not have this Problem. As a Jar this error always happens.

@lburgazzoli
Copy link
Contributor

@bframke would it be possible to isolate the issue in a pure java/camel project ?
It does not look like a camel-quarkus specific thing to me

@bframke
Copy link
Author

bframke commented Feb 25, 2020

I try to do that.

@bframke
Copy link
Author

bframke commented Feb 25, 2020

Okay, so I build a basic JavaApplication with Camel Dependencies and sending 1000 messages via producerTemplate is not a problem.

@bframke
Copy link
Author

bframke commented Feb 25, 2020

So not using RestEasy Endpoint to send the messages via ProducerTemplate does also work in the Quarkus Application

@lburgazzoli
Copy link
Contributor

I was about to suggest to remove resteasy, mind opening an issue on quarkus and reference this one ?

@bframke
Copy link
Author

bframke commented Feb 25, 2020

Just created an issue there.

@lburgazzoli
Copy link
Contributor

noticed that the example provided there has a loop inside the sendEventToTopic which may be misleading

@bframke
Copy link
Author

bframke commented Feb 25, 2020

Thanks, that was the left overs of a test 👍

@bframke
Copy link
Author

bframke commented Feb 25, 2020

Made another test with REST-DSL and Netty-Http, both are the components from Camel and it also works fine. So calling a RESTEasy Endpoint to create one Route and then call the created Camel Route works also.

@bframke
Copy link
Author

bframke commented Feb 25, 2020

Trying to use it with "from" and a "processor" for the message and a "to" to the kafka it gets stuck.

With "rest", "post", "consumes", "to" kafka it works, but then we have no processor available for our messages.

@bframke
Copy link
Author

bframke commented Feb 25, 2020

Okay "from" and "rest" combined also works without a problem.

edit: okay, now it worked for one time and for all the others it doesn't anymore.
edit2: whats conspicuous is, that the threads are always getting higher and higher and then like thread-41 just gets stuck

@bframke
Copy link
Author

bframke commented Feb 25, 2020

Well it looks like logging is the problem here. After doing the rebuild to route only with processor we checked with Visual-VM and saw that one Thread just gets stuck on writing out to log.

We tried the Camel Logging Component and SLF4J-Logger and both got the Thread stuck at some point. Without logging it works fine.

@ppalaga
Copy link
Contributor

ppalaga commented Feb 25, 2020

Out of curiosity: would mind testing if the same problem occurs with an org.jboss.logging.Logger? Here an example how to create it:
https://github.com/apache/camel-quarkus/blob/master/integration-tests/base64/src/main/java/org/apache/camel/quarkus/component/base64/it/Base64Resource.java#L37

@bframke
Copy link
Author

bframke commented Feb 26, 2020

Tried it out and got stuck. From VisualVM I have this information regarding how it gets stuck. The State is RUNNABLE but you can see that nothing is happening on it.

java.lang.Thread.State: RUNNABLE
        at java.io.FileOutputStream.writeBytes(Native Method)
        at java.io.FileOutputStream.write(FileOutputStream.java:326)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
        - locked <0x000000064bc3b1a8> (a java.io.BufferedOutputStream)
        at java.io.PrintStream.write(PrintStream.java:480)
        - locked <0x000000064bc3b188> (a java.io.PrintStream)
        at org.jboss.logmanager.handlers.UncloseableOutputStream.write(UncloseableOutputStream.java:41)
        at org.jboss.logmanager.handlers.UncloseableOutputStream.write(UncloseableOutputStream.java:41)
        at org.jboss.logmanager.handlers.UninterruptibleOutputStream.write(UninterruptibleOutputStream.java:81)
        at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
        at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
        at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
        at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
        - locked <0x000000064bca1d20> (a java.io.OutputStreamWriter)
        at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
        at java.io.BufferedWriter.flush(BufferedWriter.java:254)
        - locked <0x000000064bca1d20> (a java.io.OutputStreamWriter)
        at org.jboss.logmanager.handlers.WriterHandler.safeFlush(WriterHandler.java:165)
        at org.jboss.logmanager.handlers.WriterHandler.flush(WriterHandler.java:135)
        - locked <0x000000064bca1ce8> (a java.lang.Object)
        at org.jboss.logmanager.ExtHandler.doPublish(ExtHandler.java:94)
        at org.jboss.logmanager.handlers.WriterHandler.doPublish(WriterHandler.java:64)
        - locked <0x000000064bca1ce8> (a java.lang.Object)
        at org.jboss.logmanager.ExtHandler.publish(ExtHandler.java:66)
        at org.jboss.logmanager.handlers.DelayedHandler.publishToChildren(DelayedHandler.java:208)
        at org.jboss.logmanager.handlers.DelayedHandler.doPublish(DelayedHandler.java:49)
        at org.jboss.logmanager.ExtHandler.publish(ExtHandler.java:66)
        at org.jboss.logmanager.LoggerNode.publish(LoggerNode.java:316)
        at org.jboss.logmanager.LoggerNode.publish(LoggerNode.java:324)
        at org.jboss.logmanager.LoggerNode.publish(LoggerNode.java:324)
        at org.jboss.logmanager.LoggerNode.publish(LoggerNode.java:324)
        at org.jboss.logmanager.LoggerNode.publish(LoggerNode.java:324)
        at org.jboss.logmanager.Logger.logRaw(Logger.java:748)
        at org.jboss.logmanager.Logger.log(Logger.java:706)
        at org.jboss.logging.JBossLogManagerLogger.doLogf(JBossLogManagerLogger.java:53)
        at org.jboss.logging.Logger.infof(Logger.java:1136)
        at de.telekom.camel.ProducerContextListener$1.lambda$configure$0(ProducerContextListener.java:74)
        at de.telekom.camel.ProducerContextListener$1$$Lambda$513/322112198.process(Unknown Source)
        at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:64)
        at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:476)
        at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:185)
        at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:87)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:228)
        at org.apache.camel.component.netty.handlers.ServerChannelHandler.processAsynchronously(ServerChannelHandler.java:141)
        at org.apache.camel.component.netty.handlers.ServerChannelHandler.channelRead0(ServerChannelHandler.java:112)
        at org.apache.camel.component.netty.http.handlers.HttpServerChannelHandler.channelRead0(HttpServerChannelHandler.java:219)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at org.apache.camel.component.netty.http.handlers.HttpServerMultiplexChannelHandler.channelRead0(HttpServerMultiplexChannelHandler.java:156)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:56)
        at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:365)
        at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:748)

@ppalaga
Copy link
Contributor

ppalaga commented Feb 26, 2020

Thanks a lot for the info, @bframke! Do you happen to have a shareable reproducer at hand?

@bframke
Copy link
Author

bframke commented Feb 26, 2020

I can create one. Also tested Logging via extra route in Camel, but this also got stuck after 534 messages.

@ppalaga
Copy link
Contributor

ppalaga commented Feb 26, 2020

I can create one.

That would be awesome, thanks!

@bframke
Copy link
Author

bframke commented Feb 26, 2020

Created an Reproducer, you can find it here: https://github.com/bframke/quarkus-examples/tree/master/camel-logging

@JiriOndrusek
Copy link
Contributor

@ppalaga Please, assign to me.

@JiriOndrusek
Copy link
Contributor

Hi @bframke , I'm trying to simulate an error with your producer. Unfortunately there is no error. Here are my execution steps. Would it be possible to validate, whether I'm using it in the same way?

I've also modified your script to show number of requests correctly
for i in {1..9999}; do echo "$i"; curl -v -d "{'Event': 'Event $i' }" -H "content-type: application/json" http://localhost:8081/topic/curltopic3/event; done

Do you have any idea, what am I doing differently?

@bframke
Copy link
Author

bframke commented Jun 17, 2020

Hi @JiriOndrusek, sorry for the late response, alot of to do in the last days and no, I got the Thread always stuck when there was traffic on it. I've tried it with a built quarkus image and a built native image.

@ppalaga
Copy link
Contributor

ppalaga commented Jul 13, 2020

@bframke on which operating system are you reproducing this?

@bframke
Copy link
Author

bframke commented Jul 13, 2020

We could reproduce this on Arch-Linux Manjaro and Ubuntu 16 & 18.04, but with our new application with newer camel and quarkus it no longer happens. So I'm not sure if this topic is still needed to be adressed.

@ppalaga
Copy link
Contributor

ppalaga commented Jul 13, 2020

Happy to hear that, thanks for the info! Let's close based on the fact that @JiriOndrusek could not reproduce either.

@ppalaga ppalaga closed this as completed Jul 13, 2020
@ppalaga ppalaga added this to the No fix/wont't fix milestone Aug 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants