Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws2s3 sink connector cannot access kafka headers #1200

Closed
venkat-oss opened this issue Jun 9, 2021 · 6 comments
Closed

aws2s3 sink connector cannot access kafka headers #1200

venkat-oss opened this issue Jun 9, 2021 · 6 comments

Comments

@venkat-oss
Copy link

Hi @oscerd I'm unable to use the simple language expression to get the kafka headers in the sink configuration.

my sink configuration is like this:

{
"connector.class": "org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector",
"camel.component.aws2-s3.region": "us-east-1",
"camel.sink.endpoint.keyName": "${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}-${header.kafka.TOPIC}",
"topics": "s3.source",
"camel.sink.path.bucketNameOrArn": "bucket-to-hold-more-data-auto",
"camel.beans.aggregate": "#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator",
"camel.aggregation.size": "1",
"camel.sink.endpoint.useDefaultCredentialsProvider": "true",
"name": "source_kafka_dest_s3_connector",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"camel.aggregation.timeout": "1000"
}

If I takeout the ${header.kafka.TOPIC} part it works and populates the date and exchangeId., If I add it, then the connector fails with the simple lang exception. I tried using ${headers[kafka.TOPIC]} as well, but it fails with the same exception. Couldn't get down to what went wrong.

"trace": "org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context\n\tat org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:308)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.camel.FailedToCreateRouteException: Failed to create route route26 at: >>> Aggregate[org.apache.camel.builder.ExpressionClause@11734ac4 -> [DynamicTo[aws2-s3://bucket-to-hold-more-data-auto?keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}-${header.kafka.TOPIC}&useDefaultCredentialsProvider=true]]] <<< in route: Route(route26)[From[direct:start] -> [Aggregate[org.apache.c... because of No language could be found for: bean\n\tat org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:240)\n\tat org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:74)\n\tat org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49)\n\tat org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:826)\n\tat org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:716)\n\tat org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2756)\n\tat org.apache.camel.support.service.BaseService.init(BaseService.java:83)\n\tat org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2475)\n\tat org.apache.camel.support.service.BaseService.start(BaseService.java:111)\n\tat org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2494)\n\tat org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:245)\n\tat org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)\n\tat org.apache.camel.support.service.BaseService.start(BaseService.java:119)\n\tat org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:145)\n\t... 9 more\nCaused by: org.apache.camel.language.simple.types.SimpleIllegalSyntaxException: No language could be found for: bean\n\tat org.apache.camel.language.simple.SimpleExpressionParser.parseExpression(SimpleExpressionParser.java:62)\n\tat org.apache.camel.language.simple.SimpleLanguage.createExpression(SimpleLanguage.java:199)\n\tat org.apache.camel.reifier.ToDynamicReifier.createExpression(ToDynamicReifier.java:80)\n\tat org.apache.camel.reifier.ToDynamicReifier.createProcessor(ToDynamicReifier.java:45)\n\tat org.apache.camel.reifier.ProcessorReifier.createProcessor(ProcessorReifier.java:815)\n\tat org.apache.camel.reifier.ProcessorReifier.createOutputsProcessor(ProcessorReifier.java:770)\n\tat org.apache.camel.reifier.ProcessorReifier.createOutputsProcessor(ProcessorReifier.java:548)\n\tat org.apache.camel.reifier.ProcessorReifier.createChildProcessor(ProcessorReifier.java:569)\n\tat org.apache.camel.reifier.AggregateReifier.createAggregator(AggregateReifier.java:56)\n\tat org.apache.camel.reifier.AggregateReifier.createProcessor(AggregateReifier.java:52)\n\tat org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:838)\n\tat org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:579)\n\tat org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:236)\n\t... 22 more\nCaused by: org.apache.camel.NoSuchLanguageException: No language could be found for: bean\n\tat org.apache.camel.impl.engine.DefaultLanguageResolver.noSpecificLanguageFound(DefaultLanguageResolver.java:89)\n\tat org.apache.camel.impl.engine.DefaultLanguageResolver.resolveLanguage(DefaultLanguageResolver.java:63)\n\tat org.apache.camel.impl.engine.AbstractCamelContext$4.apply(AbstractCamelContext.java:1722)\n\tat org.apache.camel.impl.engine.AbstractCamelContext$4.apply(AbstractCamelContext.java:1705)\n\tat java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)\n\tat org.apache.camel.impl.engine.AbstractCamelContext.resolveLanguage(AbstractCamelContext.java:1705)\n\tat org.apache.camel.language.simple.SimpleExpressionBuilder$KeyedOgnlExpressionAdapter.init(SimpleExpressionBuilder.java:939)\n\tat org.apache.camel.language.simple.ast.SimpleFunctionExpression.createExpression(SimpleFunctionExpression.java:63)\n\tat org.apache.camel.language.simple.ast.SimpleFunctionStart.doCreateLiteralExpression(SimpleFunctionStart.java:74)\n\tat org.apache.camel.language.simple.ast.SimpleFunctionStart.createExpression(SimpleFunctionStart.java:64)\n\tat org.apache.camel.language.simple.SimpleExpressionParser.createExpressions(SimpleExpressionParser.java:193)\n\tat org.apache.camel.language.simple.SimpleExpressionParser.doParseExpression(SimpleExpressionParser.java:116)\n\tat org.apache.camel.language.simple.SimpleExpressionParser.parseExpression(SimpleExpressionParser.java:56)\n\t... 34 more\n"

@oscerd
Copy link
Contributor

oscerd commented Jun 9, 2021

What is the connector version?

The error says there is no bean language jar in your connector folder.

@oscerd
Copy link
Contributor

oscerd commented Jun 9, 2021

The header kafka.topic when you are consuming from a topic through kafka connect is not set as you would have while consuming from kafka through camel-kafka component. The header you're set has no value.

@oscerd
Copy link
Contributor

oscerd commented Jun 9, 2021

So essentially you need to create an SMT, set an header kafka.topic and use the SMT before arriving to the sink.

@venkat-oss
Copy link
Author

venkat-oss commented Jun 9, 2021

Thanks @oscerd I built the package from the main branch and unpacked the tar.gz in the connect folder it is version 0.11.0-Snapshot. I'll add the bean language jar and try with SMT. One last question, how can we determine which/what headers are available in the config?

@oscerd
Copy link
Contributor

oscerd commented Jun 9, 2021

The bean language is not needed. Since the header is not present, it won't be replaced with a value and the simple language will evaluate the expression like accessing a bean called header on Kafka field. So you don't need the bean language, you need to set the kafka topic as header.

@venkat-oss
Copy link
Author

Got it. understood.

@oscerd oscerd closed this as completed Aug 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants