Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiving base64 encoded messages in kafka from pubsub #309

Open
hitk6 opened this issue Jan 26, 2022 · 1 comment
Open

Receiving base64 encoded messages in kafka from pubsub #309

hitk6 opened this issue Jan 26, 2022 · 1 comment

Comments

@hitk6
Copy link

hitk6 commented Jan 26, 2022

When using ByteArrayConverter
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

The output is

      State:      FAILED
      Trace:      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
                  at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
                  at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
                  at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:316)
                  at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:342)
                  at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
                  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
                  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
                  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
                  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                  at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteArrayConverter: STRUCT
  at org.apache.kafka.connect.converters.ByteArrayConverter.fromConnectData(ByteArrayConverter.java:55)
  at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
  at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:316)
  at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
  at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
  ... 11 more

When using JsonConverter
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter

The output is

$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka1-cluster-kafka-bootstrap:9097 --topic topic1 --from-beginning --consumer.config /tmp/config.properties

{"schema":{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"message"},{"type":"string","optional":false,"field":"_namespace"},{"type":"string","optional":false,"field":"_stream"}],"optional":false},"payload":{"message":"eyJfYWlyYnl0ZV9hYl9pZCI6IjA1YjA3MzgxLWE2NmEtNDMwOC05Mjg0LWU1ZTBiZjk4NmYxNyIsIl9haXJieXRlX2RhdGEiOnsic2Vzc2lvbl9kYXRlIjoiMjAyMS0xMi0xM1QwMDowMDowMFoiLCJjb3VudHJ5IjoiSW5kaWEiLCJjaXR5IjoiSmFpcHVyIiwic2Vzc2lvbl9jb3VudCI6MzI2NTM3fSwiX2FpcmJ5dGVfZW1pdHRlZF9hdCI6MTY0MzIxNDY5NzU1NX0=","_namespace":"my-space","_stream":"pb_test_table"}}

You can find details about the code i used here

My issue is How to get decoded value of payload["message"], the message that is sent by pubsub?

@hitk6 hitk6 changed the title Receiving base64 encoded messages in kafka Receiving base64 encoded messages in kafka from pubsub Jan 26, 2022
@samarthsingal
Copy link
Collaborator

Some clarifying questions: How are messages publish to the CPS topic? What schema re the messages publihsed with? As noted in https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector#cloud-pubsub-connector, the source connector returns a struct type with attributes converted to individual fields and message corresponding to the base64-encoded value of the message. Have we tried base64 decoding the payload["message"]?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants