Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The stream has no active subscriptions when using camel-quarkus-smallrye-reactive-messaging #1851

Closed
mswiderski opened this issue Sep 27, 2020 · 11 comments · Fixed by #1866
Closed
Assignees
Labels
bug Something isn't working
Milestone

Comments

@mswiderski
Copy link

When using camel-quarkus-smallrye-reactive-messaging to simply receive messages when new files show up in a folder I get

2020-09-27 15:29:40,433 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (Camel (camel-1) thread #0 - file:///tmp/orders/) Failed delivery for (MessageId: 762AE8F6A4E12DF-0000000000000002 on ExchangeId: 762AE8F6A4E12DF-0000000000000002). Exhausted after delivery attempt: 1 caught: java.lang.IllegalStateException: The stream has no active subscriptions

Message History (complete message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [from[file:///tmp/orders/?charset=utf-8&delete=true]                           ] [         1]
	...
[route1            ] [to1               ] [reactive-streams:762AE8F6A4E12DF-0000000000000000                             ] [         0]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
: java.lang.IllegalStateException: The stream has no active subscriptions
	at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:110)
	at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:151)
	at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:169)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:404)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:147)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287)
	at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:483)
	at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:237)
	at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:198)
	at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
	at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

I tried Quarkus 1.7 and 1.8 with camel-quarkus-smallrye-reactive-messaging 1.1.0

The code is really simple as it consists of following class

package org.acme;

import java.io.File;
import java.util.concurrent.CompletionStage;

import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Message;

@javax.enterprise.context.ApplicationScoped()
public class FilesMessageConsumer_1 {

    @org.eclipse.microprofile.reactive.messaging.Incoming("files")
    public CompletionStage<Void> consume(Message<GenericFile<File>> msg) {

        try {
            System.out.println(msg.getPayload());
            return msg.ack();
        } catch (Exception e) {

            return msg.nack(e);
        }
    }
}

and then application.properties have following

mp.messaging.incoming.files.connector=smallrye-camel
mp.messaging.incoming.files.endpoint-uri=file:/tmp/orders/?delete=true&charset=utf-8

as soon as file exists in that folder (/tmp/orders) it constantly throws above exception.

Any ideas what do I miss to make this work properly?

@mswiderski
Copy link
Author

further investigation shows that following code is not really respected on startup
https://github.com/apache/camel-quarkus/blob/1.1.0/extensions/smallrye-reactive-messaging/deployment/src/main/java/org/apache/camel/quarkus/component/smallrye/reactive/messaging/deployment/SmallRyeReactiveMessagingProcessor.java#L73

as the init method is still invoked and actually overrides the reactive instance which makes it to register to publishers but only one subscriber and that's why the exception is thrown as the publisher used is the one without any subscribers.

the only workaround I found is to do a null check on reactive field in Smallrye Reactive Massaging Camel component which made it work.

@jamesnetherton
Copy link
Contributor

Thanks for investigating. I'll look into seeing if there's a short term fix for SmallRyeReactiveMessagingProcessor and then a better fix in smallrye-reactive-messaging-camel.

@jamesnetherton jamesnetherton self-assigned this Sep 28, 2020
@jamesnetherton jamesnetherton added the bug Something isn't working label Sep 28, 2020
@jamesnetherton jamesnetherton added this to the 1.2.0 milestone Sep 28, 2020
@mswiderski
Copy link
Author

@jamesnetherton excellent, thanks.

let me know if you need me to give it a try or any additional info is needed.

@jamesnetherton
Copy link
Contributor

@mswiderski are you able to point me at the code for your application?

It'd help me to write an integration test that replicates the problem.

@mswiderski
Copy link
Author

have a look at the attached sample

quarkus-camel-rm.zip

@jamesnetherton
Copy link
Contributor

There's a change in smallrye-reactive-messaging-camel that should fix this problem for camel-quarkus. But, we'll have to wait until smallrye-reactive-messaging 2.4.0 is released before we can consume it.

Until then, I'll see if we can work around the problem, but it is proving to be quite tricky to do.

@mswiderski
Copy link
Author

Excellent! thanks a lot @jamesnetherton

do you know if it will make it before 1.2.0?

@jamesnetherton
Copy link
Contributor

do you know if it will make it before 1.2.0?

The good news is that 2.4.0 is planned to be released tomorrow, so it should make Quarkus 1.9.0 & Camel Quarkus 1.2.0.

@sqasim2329
Copy link

I am seeing the same issue in camel reactive streams

@ppalaga
Copy link
Contributor

ppalaga commented Dec 18, 2020

@sqasim2329 could you please file a new issue, ideally with a minimal reproducer app?

@gongqian
Copy link

gongqian commented Feb 18, 2021

I encountered the same issue with pulsar component, it is a simple project..with inbound config like the following
Inbound configuration in the application.yaml

mp.messaging.incoming.pulasr-test.connector=smallrye-camel
mp.messaging.incoming.pulasr-test.endpoint-uri: pulsar:persistent://default/poc-topic?subscriptionType=SHARED&subscriptionName=reactive

Quarkus has the following warning

2021-02-17 22:42:53,976 INFO [org.apa.cam.imp.eng.InternalRouteStartupManager] (Quarkus Main Thread) Route: route7 started and consuming from: reactive-streams://0C994938016BAD2-0000000000000002
2021-02-17 22:42:53,982 WARN [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00207: Some components are not connected to either downstream consumers or upstream producers:
- IncomingConnector{channel:'pulasr-test', attribute:'mp.messaging.incoming.pulasr-test'} has no downstream
- SubscriberMethod{method:'com.gm.devit.drp.pulsar.inbound.IncomingConsumer#consume', incoming:'pulsar-test'} has no upstream

Quarkus is the latest 1.11.3.Final

=================================
when I put an message to pulsar queue.. It throws a similar error reported

2021-02-17 23:13:43,386 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (pulsar-external-listener-3-1) Failed delivery for (MessageId: 0C994938016BAD2-0000000000000004 on ExchangeId: 0C994938016BAD2-0000000000000005). Exhausted after delivery attempt: 1 caught: java.lang.IllegalStateException: The stream has no active subscriptions

Message History (complete message history is disabled)

RouteId ProcessorId Processor Elapsed (ms)
[route5 ] [route5 ] [from[pulsar://persistent://default/poc-topic?subscription] [ 4]
...
[route5 ] [to7 ] [reactive-streams:0C994938016BAD2-0000000000000000 ] [ 0]

Stacktrace

: java.lang.IllegalStateException: The stream has no active subscriptions
at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:110)
at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:151)
at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:169)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:147)
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:312)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
at org.apache.camel.component.pulsar.PulsarMessageListener.process(PulsarMessageListener.java:64)
at org.apache.camel.component.pulsar.PulsarMessageListener.received(PulsarMessageListener.java:53)
at org.apache.pulsar.client.impl.ConsumerImpl.lambda$triggerListener$13(ConsumerImpl.java:1332)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants