Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Avro schema from Debezium source connector #5633

Closed
raghavi92 opened this issue Nov 12, 2019 · 9 comments · Fixed by #6034
Closed

Support Avro schema from Debezium source connector #5633

raghavi92 opened this issue Nov 12, 2019 · 9 comments · Fixed by #6034
Labels
doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. type/bug The PR fixed a bug or issue reported a bug

Comments

@raghavi92
Copy link

Describe the bug
Currently the Debezium postgres source produces KeyValueSchema to the topics. Support is required for Avro schema type.

To Reproduce
Steps to reproduce the behavior:

  1. The connector configuration I used for the debezium source connector is:

tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/pulsar-io-debezium-postgres-2.5.0-SNAPSHOT.nar"
schemaType: "avro"
parallelism: 1
configs:
database.hostname: "localhost"
database.port: "5432"
database.user: "hq_gosecure"
database.password: "postgres"
database.dbname: "hq_gosecure"
database.server.name: "dbserver1"
plugin.name: "wal2json"
pulsar.service.url: "pulsar://127.0.0.1:6650"

  1. Start the connector using the command :
    bin/pulsar-admin sources localrun --source-config-file conf/debezium-postgres-source-config.yaml
  2. See error

Expected behavior
The connector should start producing messages in avro schema. But I'm getting the following error:
19:15:12.353 [main] INFO org.apache.pulsar.functions.LocalRunner - RuntimeSpawner quit because of org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroTypeException: Unknown type: K at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.pulsar.client.impl.schema.StructSchema.createAvroSchema(StructSchema.java:136) ~[pulsar-client-original-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.client.impl.schema.StructSchema.parseSchemaInfo(StructSchema.java:149) ~[pulsar-client-original-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.client.impl.schema.AvroSchema.of(AvroSchema.java:90) ~[pulsar-client-original-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:144) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:189) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:209) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$0(TopicSchema.java:65) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at java.util.HashMap.computeIfAbsent(HashMap.java:1127) ~[?:1.8.0_201] at org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:65) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.sink.PulsarSink.initializeSchema(PulsarSink.java:327) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.sink.PulsarSink.open(PulsarSink.java:255) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:787) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:213) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:244) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201] Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroTypeException: Unknown type: K at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] ... 15 more Caused by: org.apache.avro.AvroTypeException: Unknown type: K at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:292) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:646) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:81) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] ... 15 more

Desktop (please complete the following information):

  • OS: Ubuntu 18.04
@raghavi92 raghavi92 added the type/bug The PR fixed a bug or issue reported a bug label Nov 12, 2019
@jiazhai
Copy link
Member

jiazhai commented Nov 13, 2019

@raghavi92 In Debezium, we have to use KeyValueSchema, and inside the KeyValueSchema, key and value could be avro type.

@jiazhai
Copy link
Member

jiazhai commented Nov 13, 2019

Please reference KeyValueSchema.java for more details.

@jiazhai
Copy link
Member

jiazhai commented Nov 13, 2019

@tuteng Could we add some doc for how to config and use schema in debezium connector?

@jiazhai jiazhai added triage/week-45 doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. labels Nov 13, 2019
@raghavi92
Copy link
Author

@jiazhai yes I went through the KeyValueSchema code. As mentioned in the actual issue, I have provided the "schemaType" as Avro in my source config yaml file. But still, the avro type exception is thrown in this line :

return new KeyValueSchema<>(AvroSchema.of(key), AvroSchema.of(value), KeyValueEncodingType.INLINE);

I guess its because of this issue in Avro library itself.

@jiazhai
Copy link
Member

jiazhai commented Nov 13, 2019

@raghavi92 This need to config the key.converter and value.converter for the debezium connector. The default converter is Jason converter org.apache.kafka.connect.json.JsonConverter.

@raghavi92
Copy link
Author

raghavi92 commented Nov 14, 2019

@jiazhai , I tried that also. Looks like we are yet to support Avro converter ?? The avro converter that io.confluent uses their own schema registry. It cannot be used with Pulsar.

@jiazhai
Copy link
Member

jiazhai commented Nov 14, 2019

@raghavi92 , Seems need to include related jar into the pom.xml file of pulsar-debezium to support it.

@jiazhai
Copy link
Member

jiazhai commented Nov 14, 2019

@tuteng to have a try and help provide a doc of how to do it.

@jiazhai
Copy link
Member

jiazhai commented Nov 25, 2019

This involves convert Avro schema that stores in Kafka registry into Pulsar readable data. We will provide a fix for this issue.

codelipenghui pushed a commit that referenced this issue Apr 30, 2020
)

Fixes #5633 


### Motivation

Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding `addProp` function

### Modifications

* Add a package `kafka-connect-avro-converter-shaded`
* Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema

### Verifying this change 

Unit test and integration tests
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this issue May 27, 2020
…ache#6034)

Fixes apache#5633 


### Motivation

Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding `addProp` function

### Modifications

* Add a package `kafka-connect-avro-converter-shaded`
* Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema

### Verifying this change 

Unit test and integration tests
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this issue Aug 24, 2020
…ache#6034)

Fixes apache#5633 


### Motivation

Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding `addProp` function

### Modifications

* Add a package `kafka-connect-avro-converter-shaded`
* Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema

### Verifying this change 

Unit test and integration tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants