Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/ecosystem/doris-kafka-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ errors.deadletterqueue.topic.replication.factor=1
| debezium.schema.evolution | `none`,<br/> `basic` | none | N | Use Debezium to collect upstream database systems (such as MySQL), and when structural changes occur, the added fields can be synchronized to Doris. <br/>`none` means that when the structure of the upstream database system changes, the changed structure will not be synchronized to Doris. <br/> `basic` means synchronizing the data change operation of the upstream database. Since changing the column structure is a dangerous operation (it may lead to accidentally deleting columns of the Doris table structure), currently only the operation of adding columns synchronously upstream is supported. When a column is renamed, the old column remains unchanged, and the Connector will add a new column in the target table and sink the renamed new data into the new column. |
| database.time_zone | - | UTC | N | When `converter.mode` is not `normal` mode, it provides a way to specify time zone conversion for date data types (such as datetime, date, timestamp, etc.). The default is UTC time zone. |
| avro.topic2schema.filepath | - | - | N | By reading the locally provided Avro Schema file, the Avro file content in the Topic is parsed to achieve decoupling from the Schema registration center provided by Confluent. <br/> This configuration needs to be used with the `key.converter` or `value.converter` prefix. For example, the local Avro Schema file for configuring avro-user and avro-product Topic is as follows: `"value.converter.avro.topic2schema. filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"` <br/> For specific usage, please refer to: [#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | Configure this parameter, data from one kafka topic can flow to multiple doris tables. For configuration details, refer to: [#58](https://github.com/apache/doris-kafka-connector/pull/58) |

For other Kafka Connect Sink common configuration items, please refer to: [connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)

Expand Down Expand Up @@ -392,3 +393,21 @@ Increase `max.poll.interval.ms` in Kafka according to the scenario. The default
- If it is started in Distributed mode, add the `max.poll.interval.ms` and `consumer.max.poll.interval.ms` parameters in the configuration file of config/connect-distributed.properties, and configure the parameter values.

After adjusting the parameters, restart kafka-connect

**3. Doris-kafka-connector reports an error when upgrading version from 1.0.0 or 1.1.0 to 24.0.0**
```
org.apache.kafka.common.config.ConfigException: Topic 'connect-status' supplied via the 'status.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of connector and task statuses, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing connector and task statuses and problems restarting this Connect cluster in the future. Change the 'status.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'.
at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:228)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:164)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run
```
**Solution:**
Adjust the clearing strategy of `connect-configs` `connect-status` Topic to compact
```
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-configs --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-status --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
```
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ errors.deadletterqueue.topic.replication.factor=1
| debezium.schema.evolution | `none`,<br/> `basic` | none | N | 通过 Debezium 采集上游数据库系统(如 MySQL),发生结构变更时,可以将增加的字段同步到 Doris 中。<br/>`none`表示上游数据库系统发生结构变更时,不同步变更后的结构到 Doris 中。 <br/> `basic`表示同步上游数据库的数据变更操作。由于列结构变更是一个危险操作(可能会导致误删 Doris 表结构的列),目前仅支持同步上游增加列的操作。当列被重命名后,则旧列保持原样,Connector 会在目标表中新增一列,将重命名后的新增数据 Sink 到新列中。 |
| database.time_zone | - | UTC | N | 当 `converter.mode` 为非 `normal` 模式时,对于日期数据类型(如 datetime, date, timestamp 等等)提供指定时区转换的方式,默认为 UTC 时区。 |
| avro.topic2schema.filepath | - | - | N | 通过读取本地提供的 Avro Schema 文件,来解析 Topic 中的 Avro 文件内容,实现与 Confluent 提供 Schema 注册中心解耦。<br/> 此配置需要与 `key.converter` 或 `value.converter` 前缀一起使用,例如配置 avro-user、avro-product Topic 的本地 Avro Schema 文件如下: `"value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"` <br/> 具体使用可以参考:[#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | 开启该参数后,可实现一个 Topic 的数据流向多个 Doris 表。 配置详情参考: [#58](https://github.com/apache/doris-kafka-connector/pull/58) |

其他Kafka Connect Sink通用配置项可参考:[connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)

Expand Down Expand Up @@ -393,3 +394,21 @@ org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be

调整参数后,重启kafka-connect

**3. Doris-kafka-connector 从 1.0.0 或 1.1.0 升级到 24.0.0 版本报错**
```
org.apache.kafka.common.config.ConfigException: Topic 'connect-status' supplied via the 'status.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of connector and task statuses, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing connector and task statuses and problems restarting this Connect cluster in the future. Change the 'status.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'.
at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:228)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:164)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run
```
**解决方案:**
调整 `connect-configs` `connect-status` Topic 的清除策略为 compact
```
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-configs --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-status --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
```

Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ errors.deadletterqueue.topic.replication.factor=1
| debezium.schema.evolution | `none`,<br/> `basic` | none | N | 通过 Debezium 采集上游数据库系统(如 MySQL),发生结构变更时,可以将增加的字段同步到 Doris 中。<br/>`none`表示上游数据库系统发生结构变更时,不同步变更后的结构到 Doris 中。 <br/> `basic`表示同步上游数据库的数据变更操作。由于列结构变更是一个危险操作(可能会导致误删 Doris 表结构的列),目前仅支持同步上游增加列的操作。当列被重命名后,则旧列保持原样,Connector 会在目标表中新增一列,将重命名后的新增数据 Sink 到新列中。 |
| database.time_zone | - | UTC | N | 当 `converter.mode` 为非 `normal` 模式时,对于日期数据类型(如 datetime, date, timestamp 等等)提供指定时区转换的方式,默认为 UTC 时区。 |
| avro.topic2schema.filepath | - | - | N | 通过读取本地提供的 Avro Schema 文件,来解析 Topic 中的 Avro 文件内容,实现与 Confluent 提供 Schema 注册中心解耦。<br/> 此配置需要与 `key.converter` 或 `value.converter` 前缀一起使用,例如配置 avro-user、avro-product Topic 的本地 Avro Schema 文件如下: `"value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"` <br/> 具体使用可以参考:[#32](https://github.com/apache/doris-kafka-connector/pull/32) |
| record.tablename.field | - | - | N | 开启该参数后,可实现一个 Topic 的数据流向多个 Doris 表。 配置详情参考: [#58](https://github.com/apache/doris-kafka-connector/pull/58) |

其他Kafka Connect Sink通用配置项可参考:[connect_configuring](https://kafka.apache.org/documentation/#connect_configuring)

Expand Down Expand Up @@ -393,3 +394,20 @@ org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be

调整参数后,重启kafka-connect

**3. Doris-kafka-connector 从 1.0.0 或 1.1.0 升级到 24.0.0 版本报错**
```
org.apache.kafka.common.config.ConfigException: Topic 'connect-status' supplied via the 'status.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of connector and task statuses, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing connector and task statuses and problems restarting this Connect cluster in the future. Change the 'status.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'.
at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:228)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:164)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run
```
**解决方案:**
调整 `connect-configs` `connect-status` Topic 的清除策略为 compact
```
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-configs --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-status --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
```
Loading