Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IMPROVEMENT REQUEST: Add a transformer mode to the connectors to make lightweight edits to the messages while they are being produced/consumed #410

Closed
codexetreme opened this issue Sep 2, 2020 · 19 comments

Comments

@codexetreme
Copy link
Contributor

Hello,
In our recent use case, we have to move messages from an AWS SQS Queue to a Kafka topic. This is a perfect use case where we can leverage the power of the Camel Connectors. More specifically the aws2-sqs connector.

According to the docs, the connector adds the SQS messageId as a header to the Kafka record. and inserts the record as-is into the specified Kafka topic.

A link the documentation for context: Message headers set by the sqs producer

Now, in our use case, we need to set this header to something specific based on the data before it is inserted into the Kafka topic. In the sqs connector docs, I can see no mention where a transformer can be hooked in and run before the data is sent to the target destination. The only way around this, as suggested by @oscerd in the gitter channel, was to use custom archetypes and then generate the connector source code and add functionality as required.

While this approach definitely works, from my experience, a connector reading and making light edits to the data before inserting it into its target destination is a very common use case. In the example of Kafka specifically, their connect API already provides a framework to add-in your own transformations.

A link to the relevant Kafka documentation for context: Transformation with the Connect API

My request is to add functionality to these connectors so that the users of these connectors can chain/add lightweight transformers to make light edits to the data before the connector finally sends it off to its target destination.

PS: if this request is approved, I would love to help out and contribute in implementing this feature :)

@codexetreme codexetreme changed the title IMPROVEMENT REQUEST: Add a transformer mode to the connectors to make lightweight edits to the messages are they are produced/consumed IMPROVEMENT REQUEST: Add a transformer mode to the connectors to make lightweight edits to the messages while they are being produced/consumed Sep 2, 2020
@oscerd
Copy link
Contributor

oscerd commented Sep 2, 2020

Basically what I meant was just providing a specific transformer for this case in the connector. A general solution seems to be a bit too much complex. But you're welcome to try.

@codexetreme
Copy link
Contributor Author

I can try out the specific transformer for this case. You are probably right about the general solution being complex

@codexetreme
Copy link
Contributor Author

I tried creating an archetype for this, from the archetype page. I cannot seem to find the correct archetype params for this

what I tried:

mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-aws2-sqs -DarchetypeVersion=0.4.0

what I got:
......
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/archetypes/camel-aws2-sqs/0.4.0/camel-aws2-sqs-0.4.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.035 s
[INFO] Finished at: 2020-09-02T16:48:22+05:30
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on project standalone-pom: The desired archetype does not exist (org.apache.camel.kafkaconnector.archetypes:camel-aws2-sqs:0.4.0) -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

I understand my params are incorrect, can you perhaps tell me how to figure out the correct params?

@oscerd
Copy link
Contributor

oscerd commented Sep 2, 2020

Please read how to use the archetype: https://camel.apache.org/camel-kafka-connector/latest/archetypes.html

It's all explained. You need to provide the archetype reported there and then following the instructions. In the project created you'll need to create the transformer.

If you want to create the transformer in the actual codebase you need to modify the connector under connectors folder.

What you are trying to do cannot work. The connector artifact is not an archetype

@oscerd
Copy link
Contributor

oscerd commented Sep 2, 2020

There is only one archetype to use.

@valdar
Copy link
Member

valdar commented Sep 3, 2020

@codexetreme try with: mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.4.0

then answer the usual maven questions then you should have the skeleton project created.

@codexetreme
Copy link
Contributor Author

codexetreme commented Sep 4, 2020 via email

@oscerd
Copy link
Contributor

oscerd commented Sep 4, 2020

Hello,

  1. Basically you'll need to build the connector and you'll get a new package.zip
  • in the connector configuration you want to use
transforms=S3ObjectTransformer
transforms.S3ObjectTransformer.type=org.apache.camel.kafkaconnector.transforms.S3ObjectTransformer

Where you point your new transformation class.

for example. In this way you'll use the transformation you implemented in the extended connector.

  1. You can do a unit test like this class: CamelTypeConverterTransformTest in camel-kafka-connector repo

@oscerd
Copy link
Contributor

oscerd commented Sep 4, 2020

Once the transformations class work and you're happy with the result, you could add that class to the aws2-sqs connector in the camel-kafka-connector repository and open a PR.

@codexetreme
Copy link
Contributor Author

I see, this makes the whole scenario a lot more clear. Let me experiment with this over the weekend, in case I hit any snags I will ask you for more details. One more question I have is, right now the headers that are being sent are not a part of the connector , I assume they are being autogenerated sent from somewhere upstream from the code. How do I set the header in the connector code?

If you can guide me to a sample code that might be present in the other connectors that will also help

Cheers

@oscerd
Copy link
Contributor

oscerd commented Sep 4, 2020

If you are using the source connector the headers will be generated from the camel component consumer under the hood, so you don't have to set any header.

@oscerd
Copy link
Contributor

oscerd commented Sep 4, 2020

It's possibile I don't get what you're asking well anyway. So what is the use case for you to set an header in the connector? What do you want to set?

@codexetreme
Copy link
Contributor Author

codexetreme commented Sep 4, 2020 via email

@oscerd
Copy link
Contributor

oscerd commented Sep 4, 2020

I need to test the behavior to understand. I'll do that.

@oscerd
Copy link
Contributor

oscerd commented Sep 7, 2020

For each connector we apply a mapping of the Camel headers and properties. So you should able to get the message Id, by looking at the record.headers and check for the following name "CamelHeader.CamelAwsSqsMessageId" and then set it as key record if you want.

@codexetreme
Copy link
Contributor Author

hello,

I worked that out over the weekend, and now, I have the key from the SQS Message as required. Our use case is currently satisfied now. I can send a PR, but it is quite a custom functionality where we parse the SQS message and make our key.

I can send a PR where the transformer simply sets the CamelHeader.CamelAwsSqsMessageId as the key of the message. Let me know you want to proceed.

@oscerd
Copy link
Contributor

oscerd commented Sep 7, 2020

Yes, it would be nice to have the transformer for setting the key, thanks.

@codexetreme
Copy link
Contributor Author

cool let me open a PR

@codexetreme
Copy link
Contributor Author

opened PR #431 please check

@oscerd oscerd closed this as completed Sep 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants