Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS Fifo Kamelet Sink #78

Merged
merged 6 commits into from Mar 23, 2021
Merged

SQS Fifo Kamelet Sink #78

merged 6 commits into from Mar 23, 2021

Conversation

oscerd
Copy link
Contributor

@oscerd oscerd commented Mar 22, 2021

Fixes #77

Copy link
Member

@nicolaferraro nicolaferraro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking loud about these two properties, to try to find a better solution.
Here you see some Camel concepts emerging in the connector description (exchange and exchange id/properties) which should be hidden inside the connector and makes a bell to ring..

If I think to the use case of a user using this Kamelet to forward application events into a SQS FIFO queue, then we should allow them to set group and ID programmatically, which in this context means setting them in a header.

The Kamelet can specify the "group" (plus "ce-group") header as the way to specify the group and the "id" (plus "ce-id") header for the deduplication strategy, then it's responsibility of the data producer to set them in the message.
The "ce-id" field is also compatible with cloud events, so messages flowing from knative will have that field set out-of-the-box, with the right semantics.

So I'd go with an approach similar to the one in telegram-sink here, removing those two options from the connector configuration and making them dynamic. Wdyt?

Comment on lines 43 to 52
messageGroupIdStrategy:
title: Message Group ID Strategy
description: Strategy for setting the messageGroupId on the message. Can be one of the following options useConstant, useExchangeId, usePropertyValue.
type: string
default: useExchangeId
messageDeduplicationIdStrategy:
title: Message Deduplication ID Strategy
description: Strategy for setting the messageDeduplicationId on the message. Can be one of the following options useExchangeId, useContentBasedDeduplication
type: string
default: useExchangeId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was talking about these two

@nicolaferraro
Copy link
Member

I've pushed what I meant..

@oscerd
Copy link
Contributor Author

oscerd commented Mar 22, 2021

with camel-k cli 1.3.1 it is failing:

[1] 2021-03-22 17:40:22,163 ERROR [org.apa.cam.qua.mai.CamelMainRuntime] (main) Failed to start application: java.lang.IllegalArgumentException: Cannot compute endpoint URI: unable to find an EndpointUriFactory for scheme aws2-sqs
[1] 	at org.apache.camel.k.loader.yaml.support.StepParserSupport.getEndpointUriFactory(StepParserSupport.java:121)
[1] 	at org.apache.camel.k.loader.yaml.support.StepParserSupport.createEndpointUri(StepParserSupport.java:84)
[1] 	at org.apache.camel.k.loader.yaml.parser.ToStepParser.toProcessor(ToStepParser.java:34)
[1] 	at org.apache.camel.k.loader.yaml.spi.ProcessorStepParser.invoke(ProcessorStepParser.java:30)
[1] 	at org.apache.camel.k.loader.yaml.support.StepParserSupport.convertSteps(StepParserSupport.java:60)
[1] 	at org.apache.camel.k.loader.yaml.parser.ChoiceStepParser.toProcessor(ChoiceStepParser.java:45)
[1] 	at org.apache.camel.k.loader.yaml.spi.ProcessorStepParser.invoke(ProcessorStepParser.java:30)
[1] 	at org.apache.camel.k.loader.yaml.support.StepParserSupport.convertSteps(StepParserSupport.java:60)
[1] 	at org.apache.camel.k.loader.yaml.parser.FromStepParser.process(FromStepParser.java:47)
[1] 	at org.apache.camel.k.loader.yaml.spi.StartStepParser.invoke(StartStepParser.java:28)
[1] 	at org.apache.camel.k.loader.yaml.YamlSourceLoader$1.accept(YamlSourceLoader.java:80)
[1] 	at org.apache.camel.k.loader.yaml.YamlSourceLoader$1.accept(YamlSourceLoader.java:76)
[1] 	at org.apache.camel.k.support.RouteBuilders$3.configure(RouteBuilders.java:57)
[1] 	at org.apache.camel.builder.RouteBuilder.checkInitialized(RouteBuilder.java:483)
[1] 	at org.apache.camel.builder.RouteBuilder.configureRoutes(RouteBuilder.java:430)
[1] 	at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:405)
[1] 	at org.apache.camel.impl.engine.AbstractCamelContext.addRoutes(AbstractCamelContext.java:1110)
[1] 	at org.apache.camel.main.RoutesConfigurer.configureRoutes(RoutesConfigurer.java:94)
[1] 	at org.apache.camel.main.BaseMainSupport.configureRoutes(BaseMainSupport.java:454)
[1] 	at org.apache.camel.main.BaseMainSupport.postProcessCamelContext(BaseMainSupport.java:474)
[1] 	at org.apache.camel.quarkus.main.CamelMain.initCamelContext(CamelMain.java:97)
[1] 	at org.apache.camel.quarkus.main.CamelMain.doInit(CamelMain.java:67)
[1] 	at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
[1] 	at org.apache.camel.quarkus.main.CamelMain.startEngine(CamelMain.java:118)
[1] 	at org.apache.camel.quarkus.main.CamelMainRuntime.start(CamelMainRuntime.java:49)
[1] 	at org.apache.camel.quarkus.core.CamelBootstrapRecorder.start(CamelBootstrapRecorder.java:45)
[1] 	at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot-173480958.deploy_0(CamelBootstrapProcessor$boot-173480958.zig:101)
[1] 	at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot-173480958.deploy(CamelBootstrapProcessor$boot-173480958.zig:40)
[1] 	at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:534)
[1] 	at io.quarkus.runtime.Application.start(Application.java:90)
[1] 	at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:97)
[1] 	at io.quarkus.runtime.Quarkus.run(Quarkus.java:62)
[1] 	at io.quarkus.runtime.Quarkus.run(Quarkus.java:38)
[1] 	at io.quarkus.runtime.Quarkus.run(Quarkus.java:104)
[1] 	at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
[1] 
[1] 2021-03-22 17:40:22,169 ERROR [io.qua.run.Application] (main) Failed to start application (with profile prod): java.lang.IllegalArgumentException: Cannot compute endpoint URI: unable to find an EndpointUriFactory for scheme aws2-sqs
[1] 	at org.apache.camel.k.loader.yaml.support.StepParserSupport.getEndpointUriFactory(StepParserSupport.java:121)
[1] 	at org.apache.camel.k.loader.yaml.support.StepParserSupport.createEndpointUri(StepParserSupport.java:84)
[1] 	at org.apache.camel.k.loader.yaml.parser.ToStepParser.toProcessor(ToStepParser.java:34)
[1] 	at org.apache.camel.k.loader.yaml.spi.ProcessorStepParser.invoke(ProcessorStepParser.java:30)
[1] 	at org.apache.camel.k.loader.yaml.support.StepParserSupport.convertSteps(StepParserSupport.java:60)
[1] 	at org.apache.camel.k.loader.yaml.parser.ChoiceStepParser.toProcessor(ChoiceStepParser.java:45)
[1] 	at org.apache.camel.k.loader.yaml.spi.ProcessorStepParser.invoke(ProcessorStepParser.java:30)
[1] 	at org.apache.camel.k.loader.yaml.support.StepParserSupport.convertSteps(StepParserSupport.java:60)
[1] 	at org.apache.camel.k.loader.yaml.parser.FromStepParser.process(FromStepParser.java:47)
[1] 	at org.apache.camel.k.loader.yaml.spi.StartStepParser.invoke(StartStepParser.java:28)
[1] 	at org.apache.camel.k.loader.yaml.YamlSourceLoader$1.accept(YamlSourceLoader.java:80)
[1] 	at org.apache.camel.k.loader.yaml.YamlSourceLoader$1.accept(YamlSourceLoader.java:76)
[1] 	at org.apache.camel.k.support.RouteBuilders$3.configure(RouteBuilders.java:57)
[1] 	at org.apache.camel.builder.RouteBuilder.checkInitialized(RouteBuilder.java:483)
[1] 	at org.apache.camel.builder.RouteBuilder.configureRoutes(RouteBuilder.java:430)
[1] 	at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:405)
[1] 	at org.apache.camel.impl.engine.AbstractCamelContext.addRoutes(AbstractCamelContext.java:1110)
[1] 	at org.apache.camel.main.RoutesConfigurer.configureRoutes(RoutesConfigurer.java:94)
[1] 	at org.apache.camel.main.BaseMainSupport.configureRoutes(BaseMainSupport.java:454)
[1] 	at org.apache.camel.main.BaseMainSupport.postProcessCamelContext(BaseMainSupport.java:474)
[1] 	at org.apache.camel.quarkus.main.CamelMain.initCamelContext(CamelMain.java:97)
[1] 	at org.apache.camel.quarkus.main.CamelMain.doInit(CamelMain.java:67)
[1] 	at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
[1] 	at org.apache.camel.quarkus.main.CamelMain.startEngine(CamelMain.java:118)
[1] 	at org.apache.camel.quarkus.main.CamelMainRuntime.start(CamelMainRuntime.java:49)
[1] 	at org.apache.camel.quarkus.core.CamelBootstrapRecorder.start(CamelBootstrapRecorder.java:45)
[1] 	at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot-173480958.deploy_0(CamelBootstrapProcessor$boot-173480958.zig:101)
[1] 	at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot-173480958.deploy(CamelBootstrapProcessor$boot-173480958.zig:40)
[1] 	at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:534)
[1] 	at io.quarkus.runtime.Application.start(Application.java:90)
[1] 	at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:97)
[1] 	at io.quarkus.runtime.Quarkus.run(Quarkus.java:62)
[1] 	at io.quarkus.runtime.Quarkus.run(Quarkus.java:38)
[1] 	at io.quarkus.runtime.Quarkus.run(Quarkus.java:104)
[1] 	at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
[1] 

@oscerd oscerd merged commit 9ed7bde into master Mar 23, 2021
@oscerd
Copy link
Contributor Author

oscerd commented Mar 23, 2021

Looks good!

@oscerd oscerd deleted the sqs-fifo branch March 24, 2021 10:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add AWS SQS Fifo Sink Kamelet
2 participants