Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Cloud Stream - Subscribing to More Than One Topic in the Same App #2909

Closed
Vyse777 opened this issue May 21, 2024 · 4 comments
Closed

Comments

@Vyse777
Copy link

Vyse777 commented May 21, 2024

I am completely confused how to utilize Spring Cloud Stream with Google PubSub with multiple topics.

Here is the landscape:
I have 1 application that needs to process messages from 3 different topics, each with their own subscribers this app is to use.
These topics and subscribers are pre-existing (meaning, created via infrastructure and should not be auto-created by the app)
Lastly, these topics are all correlated to the same use-case, so 1 app for processing the 3 topics makes sense. As opposed to 3 applications, one for each topic.

Based on the docs, that I managed to find through insane amounts of research, I understand some of how the bindings work and I would expect I could do something like this in my application.yaml config (maybe I am wrong):

spring:
    cloud:
        stream:
            bindings:
                topicProcessorOneMethodName-in-0:
                    destination: name-of-topic-one-in-gcp
                    group: name-of-subscriber-to-topic-one-in-gcp
                topicProcessorTwoMethodName-in-0:
                    destination: name-of-topic-two-in-gcp
                    group: name-of-subscriber-to-topic-two-in-gcp
                topicProcessorThreeMethodName-in-0:
                    destination: name-of-topic-three-in-gcp
                    group: name-of-subscriber-to-topic-three-in-gcp

For a single topic/subscriber pair (say, just topicProcessorOneMethodName-in-0) this actually works. And I can receive messages to a single method. But for more than one, please read on...

As well, I would expect, the following config to ensure it does not try to create any resources (and try to use the existing ones instead):

spring:
    cloud:
        stream:
          gcp:
            pubsub:
              bindings:
                topicProcessorOneMethodName-in-0:
                  consumer:
                    auto-create-resources: false
                    subscription-name: name-of-subscriber-to-topic-one-in-gcp
                topicProcessorTwoMethodName-in-0:
                  consumer:
                    auto-create-resources: false
                    subscription-name: name-of-subscriber-to-topic-two-in-gcp
                topicProcessorThreeMethodName-in-0:
                  consumer:
                    auto-create-resources: false
                    subscription-name: name-of-subscriber-to-topic-three-in-gcp

I would then expect to be able to create three beans like so:

    @Bean
    fun topicProcessorOneMethodName(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message: Message<String> ->
            // Process the message that came from "name-of-topic-one-in-gcp" - utilizing subscriber "name-of-subscriber-to-topic-one-in-gcp"
        }
    }

    @Bean
    fun topicProcessorTwoMethodName(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message: Message<String> ->
            // Process the message that came from "name-of-topic-two-in-gcp" - utilizing subscriber "name-of-subscriber-to-topic-two-in-gcp"
        }
    }

// And a third similarly named according to the application.yaml config for the 3rd topic & subscriber.

However this does not work.
Either because I am misunderstanding how to use this, or I am missing something else.

When starting my application I get the following WARNING (not an error/exception) stating this:
Multiple functional beans were found [topicProcessorOneMethodName, topicProcessorTwoMethodName, topicProcessorThreeMethodName], thus can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it.

If I try to apply config like this, which I found researching online how to "use spring.cloud.function.definition properly":

spring:
  cloud:
    function:
      definition: testingMethod,testingMethodTwo,testingMethodThree

I see the message channel being created incorrectly in the startup logs:

INFO   - [main] org.springframework.integration.monitor.IntegrationMBeanExporter:649 : Registering MessageChannel topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0

And anonymous error channels created:

INFO   - [main] org.springframework.cloud.stream.binder.BinderErrorChannel:174 : Channel 'anonymous.topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0.d439f252-1018-46a0-842a-9ace72a1332d.errors' has 1 subscriber(s).
INFO   - [main] org.springframework.cloud.stream.binder.BinderErrorChannel:174 : Channel 'anonymous.topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0.d439f252-1018-46a0-842a-9ace72a1332d.errors' has 0 subscriber(s).
INFO   - [main] org.springframework.cloud.stream.binder.BinderErrorChannel:174 : Channel 'anonymous.topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0.d439f252-1018-46a0-842a-9ace72a1332d.errors' has 1 subscriber(s).
INFO   - [main] org.springframework.cloud.stream.binder.BinderErrorChannel:174 : Channel 'anonymous.topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0.d439f252-1018-46a0-842a-9ace72a1332d.errors'  has 2 subscriber(s).

At this point the app does not receive any messages pushed to the applicable topics. Implying it's not attached right.

So it all boils down to:
"Am I doing something wrong here?"
"Am I misunderstanding how to configure bindings?"
"Is this concept supported to begin with (i.e. multiple topics with single subscribers in a single application)?"
"All of the above, or none of the above?"

Please help me.

Also, please let me know if there is any additional information I can provide to assist.

@meltsufin
Copy link
Member

This is more of a Spring Cloud Stream question than a Spring Cloud GCP question, since we simply provide an implementation of the binder.
I believe the list of function definitions should use the ; separator.

spring:
  cloud:
    function:
      definition: testingMethod;testingMethodTwo;testingMethodThree

@Vyse777
Copy link
Author

Vyse777 commented May 22, 2024

Thanks for the reply, I honestly had no idea where to go first.

I'll give that separator a try, didn't read anywhere to try that. I've used "," and "|" based on various docs I've read.

@Vyse777
Copy link
Author

Vyse777 commented May 22, 2024

Wow so despite the documentation for Spring Cloud Stream (located here: https://cloud.spring.io/spring-cloud-function/reference/html/spring-cloud-function.html#_declarative_function_composition) stating you can use "," or "|" the thing that worked for me was using ";" as you recommended @meltsufin

After doing this, it looks like all of the above config managed to work exactly as I expected it to.
¯\(ツ)

Thanks for that. I guess my issue is fixed...

@Vyse777
Copy link
Author

Vyse777 commented May 22, 2024

Closing since this is covered above

@Vyse777 Vyse777 closed this as completed May 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants