Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar IO][Issue 5633]Support avro schema for debezium connector #6034

Merged

Conversation

tuteng
Copy link
Member

@tuteng tuteng commented Jan 12, 2020

Fixes #5633

Motivation

Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding addProp function

Modifications

  • Add a package kafka-connect-avro-converter-shaded
  • Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema

Verifying this change

Unit test and integration tests

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tuteng overall looks good. Can you add an integration test for it?

@tuteng
Copy link
Member Author

tuteng commented Feb 3, 2020

You are right, the current integration test has been covered, and this feature supports JSONSchema and AVROSchema. @sijie

@@ -188,15 +198,48 @@ public void close() {
@Getter
Optional<String> destinationTopic;

private final AvroData avroData;

private org.apache.pulsar.kafka.shade.avro.Schema keyAvroSchema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be a class variable. what method is using this variable except the constructor?


private org.apache.pulsar.kafka.shade.avro.Schema keyAvroSchema;

private org.apache.pulsar.kafka.shade.avro.Schema valueAvroSchema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments as above.


private final KafkaSchema valueSchema;

private byte[] keyBytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to make this variable a class variable? it is only used in the constructor.


private byte[] keyBytes;

private byte[] valueBytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above.

String keyName = this.topicName.get() + "-key";
String valueName = this.topicName.get() + "-value";

if (readerCache.getIfPresent(keyName) == null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I think the cache here is incorrect. you need a cache from kafka connect schema to pulsar schema.

Cache<org.apache.kafka.connect.data.Schema, KafkaSchema>

@dennisylyung
Copy link

dennisylyung commented Mar 25, 2020

I have tried to build and run it.
When I use JsonConverter ("org.apache.kafka.connect.json.JsonConverter")
There will be an error of:

12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Step 9: scanned 40 rows in 1 tables in 00:00:00.228
12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Step 10: committing transaction
12:29:55.939 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Step 11: releasing table read locks to enable MySQL writes
12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Writes to MySQL prevented for a total of 00:00:02.03
12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  io.debezium.connector.mysql.SnapshotReader - Completed snapshot in 00:00:03.829
12:29:56.095 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [null] Creating producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650]
12:29:56.336 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] Created producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650]
12:29:56.634 [pulsar-client-io-1-1] WARN  org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] GetOrCreateSchema error
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Key schemas or Value schemas are different schema type, from key schema type is BYTES and to key schema is JSON, from value schema is BYTES and to value schema is JSON
        at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:997) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
        at org.apache.pulsar.client.impl.ClientCnx.lambda$sendGetOrCreateSchema$22(ClientCnx.java:839) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [?:1.8.0_221]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) [?:1.8.0_221]
        at org.apache.pulsar.client.impl.ClientCnx.handleGetOrCreateSchemaResponse(ClientCnx.java:733) [pulsar-client-original.jar:2.6.0-SNAPSHOT]
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:329) [pulsar-common.jar:2.6.0-SNAPSHOT]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) [netty-codec-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) [netty-common-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.43.Final.jar:4.1.43.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]

I have made sure to delete all the existing schemas before running so the schema mutation error is not stem from leftover schema from previous runs.

And if I use AvroConverter (org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter)
There will be an error of:

12:24:17,895 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = binlog-client
12:24:17,907 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = snapshot
12:24:17,908 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Creating thread debezium-mysqlconnector-ztoreSalesDb-snapshot
12:24:22,622 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] AvroDataConfig - AvroDataConfig values: 
	connect.meta.data = true
	enhanced.avro.schema.support = false
	schemas.cache.config = 1000

12:24:24,488 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in sink write: 
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (byte[])"������"; line: 1, column: 2]
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (byte[])"������"; line: 1, column: 2]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231) ~[jackson-databind-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) ~[jackson-databind-2.10.1.jar:2.10.1]
	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45) ~[?:?]
	at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90) ~[?:?]
	at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43) ~[?:?]
	at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) ~[pulsar-client-api.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
12:24:24,499 WARN [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Failed to process result of message org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord@37912f1e
java.lang.RuntimeException: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (byte[])"������"; line: 1, column: 2]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:448) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (byte[])"������"; line: 1, column: 2]
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (byte[])"������"; line: 1, column: 2]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672) ~[jackson-core-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231) ~[jackson-databind-2.10.1.jar:2.10.1]
	at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) ~[jackson-databind-2.10.1.jar:2.10.1]
	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45) ~[?:?]
	at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90) ~[?:?]
	at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43) ~[?:?]
	at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) ~[pulsar-client-api.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	... 3 more
12:24:24,501 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in source read: 
java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221]
	at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165) ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.Exception: Sink Error
	at org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322) ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	... 1 more
12:24:24,502 ERROR [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - [ztore-data/debezium-local/debezium-mysql-source:0] Uncaught exception in Java Instance
java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:464) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221]
	at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165) ~[?:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	... 2 more
Caused by: java.lang.Exception: Sink Error
	at org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322) ~[?:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
	... 1 more
12:24:24,503 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Closing instance

@jiazhai
Copy link
Member

jiazhai commented Apr 2, 2020

@tuteng What is the status for this PR?

@jiazhai
Copy link
Member

jiazhai commented Apr 10, 2020

@gaoran10

@gaoran10
Copy link
Contributor

gaoran10 commented Apr 13, 2020

@dennisylyung Hi, could you provide some config info about the error logs, such as the pulsar source config and the reproduce steps, thanks.

This is my test steps to follow the demo http://pulsar.apache.org/docs/en/io-debezium-source/#example-of-mysql, but I didn't reproduce your problem.

# start debezium
docker run -it --rm \
--name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8

# start pulsar
bin/pulsar standalone

# start source connector yaml
bin/pulsar-admin source localrun \
--source-config-file debezium-mysql-source-config.yaml

# subscribe the topic
bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0

# start mysql
docker run -it --rm \
--name mysqlterm \
--link mysql \
--rm mysql:5.7 sh \
-c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

@jiazhai
Copy link
Member

jiazhai commented Apr 14, 2020

@gaoran10 is helping on this issue.

@dennisylyung
Copy link

dennisylyung commented Apr 14, 2020

@dennisylyung Hi, could you provide some config info about the error logs, such as the pulsar source config and the reproduce steps, thanks.

This is the config I was using.
When I tested with the json converter, I just changed both the key and value converters to org.apache.kafka.connect.json.JsonConverter
I also checked the schema registrey REST API, and the returned schemas are KeyValue schemas of <byte, byte>
@gaoran10

tenant: "my-tenant"
namespace: "debezium"
name: "debezium-mysql-source"
topicName: "my-tenant/debezium/debezium-mysql-topic"
archive: "connectors/pulsar-io-debezium-mysql-2.6.0-SNAPSHOT.nar"

parallelism: 1

resources:
  cpu: 0.2
  ram: 1073741824

configs:
  ## sourceTask
  task.class: "io.debezium.connector.mysql.MySqlConnectorTask"

  ## config for mysql, docker image: debezium/example-mysql:0.8
  database.hostname: "db.hostname"
  database.port: "3306"
  database.user: "kafka_connect"
  database.password: "password"
  database.server.id: "89447"
  database.server.name: "myname"
  database.whitelist: "myschema"
  table.whitelist: "myschema.mytable"

  database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
  database.history.pulsar.topic: "my-tenant/debezium/history-topic"
  database.history.pulsar.service.url: "pulsar://pulsar-broker.dev-pulsar.svc.cluster.local:6650"
  ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
  key.converter: "org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter"
  value.converter: "org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter"
  schema.registry.url: "http://schema-registry.kafka.internal.data.mydomain.com:8081"
  ## PULSAR_SERVICE_URL_CONFIG
  pulsar.service.url: "pulsar://pulsar-broker.dev-pulsar.svc.cluster.local:6650"
  ## OFFSET_STORAGE_TOPIC_CONFIG
  offset.storage.topic: "my-tenant/debezium/offset-topic"
  ## Extra settings
  # database.history.store.only.monitored.tables.ddl: "true"
  database.serverTimezone: "Asia/Hong_Kong"

@gaoran10 gaoran10 force-pushed the feature/debezium-support-avro-schema branch from f3f1974 to a94785f Compare April 17, 2020 03:12
@gaoran10
Copy link
Contributor

gaoran10 commented Apr 17, 2020

I have made some changes, use the JsonConverter as before, and the AvroConverter is used differently from the JsonConverter, the user has two ways to use it.

  1. The schema is the same as below, we could process the GenericRecord. Tip: use the KeyValueEncodingType.SEPARATED encoding type.

    Schema<KeyValue<GenericRecord, GenericRecord>> schema = Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED);

  2. We could get the original byte array through the method getKeyBytes() and getData() of the Message, the original byte array is converted by the AvroConverter.

@tuteng @sijie @jiazhai @codelipenghui Please have a review. Thanks.

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how users pass offload to the schema info. Currently, users use SchemaDefination to create schemas, so it's better to add a method in the SchemaDefinationBuilder.

And, we also need a doc to tell users how to consume messages from the topic that the debezium writes. If we have debezium connector related docs, it's better to add them directly. Otherwise, you can create a issue or a new doc for debezium connector.

@@ -47,6 +47,10 @@
private final List<Field> fields;
private final Schema schema;
private final byte[] schemaVersion;

private int offset;
public final static String OFFSET_PROP = "AVRO_READ_OFFSET";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two properties name definition in the SchemaDefinitionBuilderImpl. I think you can move all of them to SchemaDefinition and keep the with prefix "__".

@codelipenghui codelipenghui added the doc-required Your PR changes impact docs and you will update later. label Apr 18, 2020
@gaoran10
Copy link
Contributor

/pulsarbot run-failure-checks

@tuteng tuteng closed this Apr 24, 2020
@tuteng tuteng reopened this Apr 24, 2020
@gaoran10 gaoran10 force-pushed the feature/debezium-support-avro-schema branch from 1e71e0a to 4e077b5 Compare April 27, 2020 03:58
@gaoran10
Copy link
Contributor

/pulsarbot run-failure-checks

2 similar comments
@gaoran10
Copy link
Contributor

/pulsarbot run-failure-checks

@gaoran10
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit e08be96 into apache:master Apr 30, 2020
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request May 27, 2020
…ache#6034)

Fixes apache#5633 


### Motivation

Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding `addProp` function

### Modifications

* Add a package `kafka-connect-avro-converter-shaded`
* Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema

### Verifying this change 

Unit test and integration tests
@Anonymitaet Anonymitaet removed the doc-required Your PR changes impact docs and you will update later. label Jun 10, 2020
@Anonymitaet
Copy link
Member

huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
…ache#6034)

Fixes apache#5633 


### Motivation

Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding `addProp` function

### Modifications

* Add a package `kafka-connect-avro-converter-shaded`
* Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema

### Verifying this change 

Unit test and integration tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Avro schema from Debezium source connector
7 participants