Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Salesforce Camel Kafka Source Connector not converting to JSON #1271

Closed
hakidehari opened this issue Oct 12, 2021 · 13 comments
Closed

Salesforce Camel Kafka Source Connector not converting to JSON #1271

hakidehari opened this issue Oct 12, 2021 · 13 comments

Comments

@hakidehari
Copy link

We are producing events from Salesforce to Kafka via something in SF called platform events. To get these events into Kafka, we are using the Salesforce Kafka Source Connector from Camel. Documentation for this Kafka connector can be found here:

https://camel.apache.org/camel-kafka-connector/latest/reference/connectors/camel-salesforce-kafka-source-connector.html

The events generated from Salesforce are in JSON format. The format looks something like this:

{ "data": { "schema": "NhgeDyLTvEyVPQ9uOzDqeQ", "payload": { "AccountId__c": "00119000013q2g3AAA", "AccountUUID__c": "4654fefb-e3d1-4b08-a4e2-5dabaa504abb", "GUID__c": null, "CreatedById": "0056g000005YeBAAA0", "CreatedDate": "2021-10-10T15:24:43.819Z" }, "event": { "replayId": "1256093" } }, "channel": "/event/Order_Completed__e" }

This leads us to our issue. We are using the following configuration for our source connector:

{ "name": "sf_order_p_event_connector", "config": { "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "connector.class": "org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector", "camel.component.salesforce.loginUrl": "<redacted>", "camel.component.salesforce.instanceUrl": "<redacted>", "topics": "<redacted>", "camel.source.endpoint.rawPayload": "true", "camel.source.path.topicName": "/event/Order_Completed__e", "camel.source.endpoint.replayId": "-1", "camel.component.salesforce.authenticationType": "USERNAME_PASSWORD", "camel.component.salesforce.clientId": "<redacted", "camel.component.salesforce.clientSecret": "<redacted>", "camel.component.salesforce.password": "<redacted5", "camel.component.salesforce.userName": "<redacted>", "camel.source.endpoint.apiVersion": "52.0" } }

When using this value.converter from the configuration above, we get the following exception:

ERROR [sf_account_change_connector|task-0] WorkerSourceTask{id=sf_account_change_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:677) at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:592) at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:346) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 11 more

It seems it cannot convert whatever value is coming through to Kafka into JSON. I changed value.converter to "org.apache.kafka.connect.storage.StringConverter" to see what I am actually getting in the topic. Once I did that, this is the message I saw coming through to Kafka from Salesforce:

{data={schema=NhgeDyLTvEyVPQ9uOzDqeQ, payload={AccountId__c=00119000013q2g3AAA, AccountUUID__c=4654fefb-e3d1-4b08-a4e2-5dabaa504abb, GUID__c=null, CreatedById=0056g000005YeBAAA0, CreatedDate=2021-10-10T15:24:43.819Z}, event= {replayId=1256093}}, channel=/event/Order_Completed__e}

It seems Kafka is not processing this as JSON but as a key=value (whatever this is) type of value. My question is, why am I seeing this type of payload and not JSON? Also, what configuration (if any) for the source connector can I use to get past this and perhaps convert the value into JSON? I need this value in JSON for my faust agent to properly process it. I have tried multiple different configurations for the source connector but nothing seems to be working.

Any help would be appreciated. Thank you in advance!

@zregvart
Copy link
Member

You need to set camel.source.endpoint.rawPayload=true to get JSON payloads, otherwise you're seeing toString() representation of a Java object org.apache.camel.component.salesforce.api.dto.PlatformEvent.

@hakidehari
Copy link
Author

hakidehari commented Oct 12, 2021

Sorry I should have formatted my config above better. I am setting in the config the "camel.source.endpoint.rawPayload": "true" value. I am still seeing the same error though. @zregvart

Current config I tried looks like this

{ "name": "sf_account_change_connector", "config": { "camel.source.endpoint.rawPayload": true, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":false, "key.converter.schemas.enable":false, "key.converter":"org.apache.kafka.connect.storage.StringConverter", "connector.class":"org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector", "camel.component.salesforce.loginUrl":"redacted", "camel.component.salesforce.instanceUrl":"redacted", "topics": "CISL_SF_ASSET_INFO_INPUT", "camel.source.path.topicName":"/data/AccountChangeEvent", "camel.source.endpoint.replayId":"-1", "camel.component.salesforce.authenticationType":"USERNAME_PASSWORD", "camel.component.salesforce.clientId":"redacted", "camel.component.salesforce.clientSecret":"redacted", "camel.component.salesforce.password":"redacted", "camel.component.salesforce.userName":"redacted" } }

@zregvart
Copy link
Member

@hakidehari
Copy link
Author

@zregvart Is there any workaround that you could think of for this?

@zregvart
Copy link
Member

zregvart commented Oct 12, 2021

@zregvart Is there any workaround that you could think of for this?

Apart from marshalling to JSON in a data transformation step, I can't think of any.

@oscerd
Copy link
Contributor

oscerd commented Oct 12, 2021

Create an SMT as workaround.

@hakidehari
Copy link
Author

I have created a pull request for the change here apache/camel#6260. Sorry, @oscerd , what is a SMT?
@zregvart Do you have any documentation on how to marshal to JSON?

@oscerd
Copy link
Contributor

oscerd commented Oct 12, 2021

An SMT is a single message transformation. If you look in the codebase you'll find some of them

@hakidehari
Copy link
Author

@zregvart @oscerd Thanks to both of you for the quick responses! Sorry for all of the questions. I am completely new to Kafka and Camel. Is there a particular type I should transform to in the SMT? I have tried Map and HashMap with a jsonconverter but no luck so far.

@oscerd
Copy link
Contributor

oscerd commented Oct 13, 2021

If you know your message is an instance of Message class (org.cometd.bayeux.Message), you can do like the following SMT https://github.com/apache/camel-kafka-connector/blob/main/connectors/camel-sftp-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/sftp/transformers/SftpRemoteFileTransforms.java#L41

and invoke a message.getJson in the newRecord method invocation.

@hakidehari
Copy link
Author

@oscerd In this case, I would need to create my own transform in Java correct? Would I then need to put the jar file somewhere specific to call via my configuration for the connector?

@hakidehari
Copy link
Author

I managed to resolve my issue by using a solution provided here #1082

I added these 2 lines to my configuration for the connector and I am now seeing the payload come into Kafka in JSON format:

"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "camel.source.marshal":"json-jackson",

Thank you both for the help!

@apoorvmintri
Copy link

For anyone else that has run into this issue and trying to use "camel.source.marshal":"json-jackson" but its throwing exceptions on the later kafka connector versions. Use this instead -
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "camel.main.streamCachingEnabled": "false"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants