From af2388bb15589b566ec7fc7ed5b86584673cb49a Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Mon, 17 Jul 2023 12:50:05 +0800 Subject: [PATCH] Address review comments --- .../mirror/MirrorCheckpointConfig.java | 180 ++++++------- .../connect/mirror/MirrorHeartbeatConfig.java | 16 +- .../connect/mirror/MirrorSourceConfig.java | 255 +++++++++--------- .../kafka/connect/mirror/MirrorUtils.java | 9 - docs/configuration.html | 13 +- 5 files changed, 236 insertions(+), 237 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java index e9db12db937f..e37cee4a79b8 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.Map; -import static org.apache.kafka.connect.mirror.MirrorUtils.mergeConnectorConfigDef; - public class MirrorCheckpointConfig extends MirrorConnectorConfig { protected static final String REFRESH_GROUPS = "refresh.groups"; @@ -167,96 +165,98 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } - protected static final ConfigDef CHECKPOINT_CONFIG_DEF = new ConfigDef() - .define( - CONSUMER_POLL_TIMEOUT_MILLIS, - ConfigDef.Type.LONG, - CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT, - ConfigDef.Importance.LOW, - CONSUMER_POLL_TIMEOUT_MILLIS_DOC) - .define( - GROUPS, - ConfigDef.Type.LIST, - GROUPS_DEFAULT, - ConfigDef.Importance.HIGH, - GROUPS_DOC) - .define( - GROUPS_EXCLUDE, - ConfigDef.Type.LIST, - GROUPS_EXCLUDE_DEFAULT, - ConfigDef.Importance.HIGH, - GROUPS_EXCLUDE_DOC) - .define( - GROUPS_EXCLUDE_ALIAS, - ConfigDef.Type.LIST, - null, - ConfigDef.Importance.HIGH, - "Deprecated. Use " + GROUPS_EXCLUDE + " instead.") - .define( - GROUP_FILTER_CLASS, - ConfigDef.Type.CLASS, - GROUP_FILTER_CLASS_DEFAULT, - ConfigDef.Importance.LOW, - GROUP_FILTER_CLASS_DOC) - .define( - REFRESH_GROUPS_ENABLED, - ConfigDef.Type.BOOLEAN, - REFRESH_GROUPS_ENABLED_DEFAULT, - ConfigDef.Importance.LOW, - REFRESH_GROUPS_ENABLED_DOC) - .define( - REFRESH_GROUPS_INTERVAL_SECONDS, - ConfigDef.Type.LONG, - REFRESH_GROUPS_INTERVAL_SECONDS_DEFAULT, - ConfigDef.Importance.LOW, - REFRESH_GROUPS_INTERVAL_SECONDS_DOC) - .define( - EMIT_CHECKPOINTS_ENABLED, - ConfigDef.Type.BOOLEAN, - EMIT_CHECKPOINTS_ENABLED_DEFAULT, - ConfigDef.Importance.LOW, - EMIT_CHECKPOINTS_ENABLED_DOC) - .define( - EMIT_CHECKPOINTS_INTERVAL_SECONDS, - ConfigDef.Type.LONG, - EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT, - ConfigDef.Importance.LOW, - EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC) - .define( - SYNC_GROUP_OFFSETS_ENABLED, - ConfigDef.Type.BOOLEAN, - SYNC_GROUP_OFFSETS_ENABLED_DEFAULT, - ConfigDef.Importance.LOW, - SYNC_GROUP_OFFSETS_ENABLED_DOC) - .define( - SYNC_GROUP_OFFSETS_INTERVAL_SECONDS, - ConfigDef.Type.LONG, - SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT, - ConfigDef.Importance.LOW, - SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC) - .define( - CHECKPOINTS_TOPIC_REPLICATION_FACTOR, - ConfigDef.Type.SHORT, - CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT, - ConfigDef.Importance.LOW, - CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC) - .define( - OFFSET_SYNCS_TOPIC_LOCATION, - ConfigDef.Type.STRING, - OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT, - ConfigDef.ValidString.in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT), - ConfigDef.Importance.LOW, - OFFSET_SYNCS_TOPIC_LOCATION_DOC) - .define( - TOPIC_FILTER_CLASS, - ConfigDef.Type.CLASS, - TOPIC_FILTER_CLASS_DEFAULT, - ConfigDef.Importance.LOW, - TOPIC_FILTER_CLASS_DOC); + private static ConfigDef defineCheckpointConfig(ConfigDef baseConfig) { + return baseConfig + .define( + CONSUMER_POLL_TIMEOUT_MILLIS, + ConfigDef.Type.LONG, + CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT, + ConfigDef.Importance.LOW, + CONSUMER_POLL_TIMEOUT_MILLIS_DOC) + .define( + GROUPS, + ConfigDef.Type.LIST, + GROUPS_DEFAULT, + ConfigDef.Importance.HIGH, + GROUPS_DOC) + .define( + GROUPS_EXCLUDE, + ConfigDef.Type.LIST, + GROUPS_EXCLUDE_DEFAULT, + ConfigDef.Importance.HIGH, + GROUPS_EXCLUDE_DOC) + .define( + GROUPS_EXCLUDE_ALIAS, + ConfigDef.Type.LIST, + null, + ConfigDef.Importance.HIGH, + "Deprecated. Use " + GROUPS_EXCLUDE + " instead.") + .define( + GROUP_FILTER_CLASS, + ConfigDef.Type.CLASS, + GROUP_FILTER_CLASS_DEFAULT, + ConfigDef.Importance.LOW, + GROUP_FILTER_CLASS_DOC) + .define( + REFRESH_GROUPS_ENABLED, + ConfigDef.Type.BOOLEAN, + REFRESH_GROUPS_ENABLED_DEFAULT, + ConfigDef.Importance.LOW, + REFRESH_GROUPS_ENABLED_DOC) + .define( + REFRESH_GROUPS_INTERVAL_SECONDS, + ConfigDef.Type.LONG, + REFRESH_GROUPS_INTERVAL_SECONDS_DEFAULT, + ConfigDef.Importance.LOW, + REFRESH_GROUPS_INTERVAL_SECONDS_DOC) + .define( + EMIT_CHECKPOINTS_ENABLED, + ConfigDef.Type.BOOLEAN, + EMIT_CHECKPOINTS_ENABLED_DEFAULT, + ConfigDef.Importance.LOW, + EMIT_CHECKPOINTS_ENABLED_DOC) + .define( + EMIT_CHECKPOINTS_INTERVAL_SECONDS, + ConfigDef.Type.LONG, + EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT, + ConfigDef.Importance.LOW, + EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC) + .define( + SYNC_GROUP_OFFSETS_ENABLED, + ConfigDef.Type.BOOLEAN, + SYNC_GROUP_OFFSETS_ENABLED_DEFAULT, + ConfigDef.Importance.LOW, + SYNC_GROUP_OFFSETS_ENABLED_DOC) + .define( + SYNC_GROUP_OFFSETS_INTERVAL_SECONDS, + ConfigDef.Type.LONG, + SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT, + ConfigDef.Importance.LOW, + SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC) + .define( + CHECKPOINTS_TOPIC_REPLICATION_FACTOR, + ConfigDef.Type.SHORT, + CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT, + ConfigDef.Importance.LOW, + CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC) + .define( + OFFSET_SYNCS_TOPIC_LOCATION, + ConfigDef.Type.STRING, + OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT, + ConfigDef.ValidString.in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT), + ConfigDef.Importance.LOW, + OFFSET_SYNCS_TOPIC_LOCATION_DOC) + .define( + TOPIC_FILTER_CLASS, + ConfigDef.Type.CLASS, + TOPIC_FILTER_CLASS_DEFAULT, + ConfigDef.Importance.LOW, + TOPIC_FILTER_CLASS_DOC); + } - protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(CHECKPOINT_CONFIG_DEF)); + protected final static ConfigDef CONNECTOR_CONFIG_DEF = defineCheckpointConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF)); public static void main(String[] args) { - System.out.println(CHECKPOINT_CONFIG_DEF.toHtml(4, config -> "mirror_checkpoint_" + config)); + System.out.println(defineCheckpointConfig(new ConfigDef()).toHtml(4, config -> "mirror_checkpoint_" + config)); } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java index 6319919ba495..4cdc37ea1219 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java @@ -21,8 +21,6 @@ import java.time.Duration; import java.util.Map; -import static org.apache.kafka.connect.mirror.MirrorUtils.mergeConnectorConfigDef; - public class MirrorHeartbeatConfig extends MirrorConnectorConfig { protected static final String EMIT_HEARTBEATS = "emit.heartbeats"; @@ -59,29 +57,31 @@ short heartbeatsTopicReplicationFactor() { return getShort(HEARTBEATS_TOPIC_REPLICATION_FACTOR); } - protected static final ConfigDef HEARTBEAT_CONFIG_DEF = new ConfigDef() - .define( + private static ConfigDef defineHeartbeatConfig(ConfigDef baseConfig) { + return baseConfig + .define( EMIT_HEARTBEATS_ENABLED, ConfigDef.Type.BOOLEAN, EMIT_HEARTBEATS_ENABLED_DEFAULT, ConfigDef.Importance.LOW, EMIT_HEARTBEATS_ENABLED_DOC) - .define( + .define( EMIT_HEARTBEATS_INTERVAL_SECONDS, ConfigDef.Type.LONG, EMIT_HEARTBEATS_INTERVAL_SECONDS_DEFAULT, ConfigDef.Importance.LOW, EMIT_HEARTBEATS_INTERVAL_SECONDS_DOC) - .define( + .define( HEARTBEATS_TOPIC_REPLICATION_FACTOR, ConfigDef.Type.SHORT, HEARTBEATS_TOPIC_REPLICATION_FACTOR_DEFAULT, ConfigDef.Importance.LOW, HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC); + } - protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(HEARTBEAT_CONFIG_DEF)); + protected final static ConfigDef CONNECTOR_CONFIG_DEF = defineHeartbeatConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF)); public static void main(String[] args) { - System.out.println(HEARTBEAT_CONFIG_DEF.toHtml(4, config -> "mirror_heartbeat_" + config)); + System.out.println(defineHeartbeatConfig(new ConfigDef()).toHtml(4, config -> "mirror_heartbeat_" + config)); } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java index 365e2ac072d1..f9368cb7e19c 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java @@ -26,7 +26,6 @@ import java.util.stream.Collectors; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; -import static org.apache.kafka.connect.mirror.MirrorUtils.mergeConnectorConfigDef; public class MirrorSourceConfig extends MirrorConnectorConfig { @@ -221,133 +220,135 @@ boolean addSourceAliasToMetrics() { return getBoolean(ADD_SOURCE_ALIAS_TO_METRICS); } - protected static final ConfigDef SOURCE_CONFIG_DEF = new ConfigDef() - .define( - TOPICS, - ConfigDef.Type.LIST, - TOPICS_DEFAULT, - ConfigDef.Importance.HIGH, - TOPICS_DOC) - .define( - TOPICS_EXCLUDE, - ConfigDef.Type.LIST, - TOPICS_EXCLUDE_DEFAULT, - ConfigDef.Importance.HIGH, - TOPICS_EXCLUDE_DOC) - .define( - TOPICS_EXCLUDE_ALIAS, - ConfigDef.Type.LIST, - null, - ConfigDef.Importance.HIGH, - "Deprecated. Use " + TOPICS_EXCLUDE + " instead.") - .define( - CONFIG_PROPERTIES_EXCLUDE, - ConfigDef.Type.LIST, - CONFIG_PROPERTIES_EXCLUDE_DEFAULT, - ConfigDef.Importance.HIGH, - CONFIG_PROPERTIES_EXCLUDE_DOC) - .define( - CONFIG_PROPERTIES_EXCLUDE_ALIAS, - ConfigDef.Type.LIST, - null, - ConfigDef.Importance.HIGH, - "Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE + " instead.") - .define( - TOPIC_FILTER_CLASS, - ConfigDef.Type.CLASS, - TOPIC_FILTER_CLASS_DEFAULT, - ConfigDef.Importance.LOW, - TOPIC_FILTER_CLASS_DOC) - .define( - CONFIG_PROPERTY_FILTER_CLASS, - ConfigDef.Type.CLASS, - CONFIG_PROPERTY_FILTER_CLASS_DEFAULT, - ConfigDef.Importance.LOW, - CONFIG_PROPERTY_FILTER_CLASS_DOC) - .define( - CONSUMER_POLL_TIMEOUT_MILLIS, - ConfigDef.Type.LONG, - CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT, - ConfigDef.Importance.LOW, - CONSUMER_POLL_TIMEOUT_MILLIS_DOC) - .define( - REFRESH_TOPICS_ENABLED, - ConfigDef.Type.BOOLEAN, - REFRESH_TOPICS_ENABLED_DEFAULT, - ConfigDef.Importance.LOW, - REFRESH_TOPICS_ENABLED_DOC) - .define( - REFRESH_TOPICS_INTERVAL_SECONDS, - ConfigDef.Type.LONG, - REFRESH_TOPICS_INTERVAL_SECONDS_DEFAULT, - ConfigDef.Importance.LOW, - REFRESH_TOPICS_INTERVAL_SECONDS_DOC) - .define( - SYNC_TOPIC_CONFIGS_ENABLED, - ConfigDef.Type.BOOLEAN, - SYNC_TOPIC_CONFIGS_ENABLED_DEFAULT, - ConfigDef.Importance.LOW, - SYNC_TOPIC_CONFIGS_ENABLED_DOC) - .define( - SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS, - ConfigDef.Type.LONG, - SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT, - ConfigDef.Importance.LOW, - SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC) - .define( - USE_INCREMENTAL_ALTER_CONFIGS, - ConfigDef.Type.STRING, - REQUEST_INCREMENTAL_ALTER_CONFIGS, - in(REQUEST_INCREMENTAL_ALTER_CONFIGS, REQUIRE_INCREMENTAL_ALTER_CONFIGS, NEVER_USE_INCREMENTAL_ALTER_CONFIGS), - ConfigDef.Importance.LOW, - USE_INCREMENTAL_ALTER_CONFIG_DOC) - .define( - SYNC_TOPIC_ACLS_ENABLED, - ConfigDef.Type.BOOLEAN, - SYNC_TOPIC_ACLS_ENABLED_DEFAULT, - ConfigDef.Importance.LOW, - SYNC_TOPIC_ACLS_ENABLED_DOC) - .define( - SYNC_TOPIC_ACLS_INTERVAL_SECONDS, - ConfigDef.Type.LONG, - SYNC_TOPIC_ACLS_INTERVAL_SECONDS_DEFAULT, - ConfigDef.Importance.LOW, - SYNC_TOPIC_ACLS_INTERVAL_SECONDS_DOC) - .define( - REPLICATION_FACTOR, - ConfigDef.Type.INT, - REPLICATION_FACTOR_DEFAULT, - ConfigDef.Importance.LOW, - REPLICATION_FACTOR_DOC) - .define( - OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR, - ConfigDef.Type.SHORT, - OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DEFAULT, - ConfigDef.Importance.LOW, - OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DOC) - .define( - OFFSET_LAG_MAX, - ConfigDef.Type.LONG, - OFFSET_LAG_MAX_DEFAULT, - ConfigDef.Importance.LOW, - OFFSET_LAG_MAX_DOC) - .define( - OFFSET_SYNCS_TOPIC_LOCATION, - ConfigDef.Type.STRING, - OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT, - in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT), - ConfigDef.Importance.LOW, - OFFSET_SYNCS_TOPIC_LOCATION_DOC) - .define( - ADD_SOURCE_ALIAS_TO_METRICS, - ConfigDef.Type.BOOLEAN, - ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT, - ConfigDef.Importance.LOW, - ADD_SOURCE_ALIAS_TO_METRICS_DOC); - - protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(SOURCE_CONFIG_DEF)); + private static ConfigDef defineSourceConfig(ConfigDef baseConfig) { + return baseConfig + .define( + TOPICS, + ConfigDef.Type.LIST, + TOPICS_DEFAULT, + ConfigDef.Importance.HIGH, + TOPICS_DOC) + .define( + TOPICS_EXCLUDE, + ConfigDef.Type.LIST, + TOPICS_EXCLUDE_DEFAULT, + ConfigDef.Importance.HIGH, + TOPICS_EXCLUDE_DOC) + .define( + TOPICS_EXCLUDE_ALIAS, + ConfigDef.Type.LIST, + null, + ConfigDef.Importance.HIGH, + "Deprecated. Use " + TOPICS_EXCLUDE + " instead.") + .define( + CONFIG_PROPERTIES_EXCLUDE, + ConfigDef.Type.LIST, + CONFIG_PROPERTIES_EXCLUDE_DEFAULT, + ConfigDef.Importance.HIGH, + CONFIG_PROPERTIES_EXCLUDE_DOC) + .define( + CONFIG_PROPERTIES_EXCLUDE_ALIAS, + ConfigDef.Type.LIST, + null, + ConfigDef.Importance.HIGH, + "Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE + " instead.") + .define( + TOPIC_FILTER_CLASS, + ConfigDef.Type.CLASS, + TOPIC_FILTER_CLASS_DEFAULT, + ConfigDef.Importance.LOW, + TOPIC_FILTER_CLASS_DOC) + .define( + CONFIG_PROPERTY_FILTER_CLASS, + ConfigDef.Type.CLASS, + CONFIG_PROPERTY_FILTER_CLASS_DEFAULT, + ConfigDef.Importance.LOW, + CONFIG_PROPERTY_FILTER_CLASS_DOC) + .define( + CONSUMER_POLL_TIMEOUT_MILLIS, + ConfigDef.Type.LONG, + CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT, + ConfigDef.Importance.LOW, + CONSUMER_POLL_TIMEOUT_MILLIS_DOC) + .define( + REFRESH_TOPICS_ENABLED, + ConfigDef.Type.BOOLEAN, + REFRESH_TOPICS_ENABLED_DEFAULT, + ConfigDef.Importance.LOW, + REFRESH_TOPICS_ENABLED_DOC) + .define( + REFRESH_TOPICS_INTERVAL_SECONDS, + ConfigDef.Type.LONG, + REFRESH_TOPICS_INTERVAL_SECONDS_DEFAULT, + ConfigDef.Importance.LOW, + REFRESH_TOPICS_INTERVAL_SECONDS_DOC) + .define( + SYNC_TOPIC_CONFIGS_ENABLED, + ConfigDef.Type.BOOLEAN, + SYNC_TOPIC_CONFIGS_ENABLED_DEFAULT, + ConfigDef.Importance.LOW, + SYNC_TOPIC_CONFIGS_ENABLED_DOC) + .define( + SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS, + ConfigDef.Type.LONG, + SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT, + ConfigDef.Importance.LOW, + SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC) + .define( + USE_INCREMENTAL_ALTER_CONFIGS, + ConfigDef.Type.STRING, + REQUEST_INCREMENTAL_ALTER_CONFIGS, + in(REQUEST_INCREMENTAL_ALTER_CONFIGS, REQUIRE_INCREMENTAL_ALTER_CONFIGS, NEVER_USE_INCREMENTAL_ALTER_CONFIGS), + ConfigDef.Importance.LOW, + USE_INCREMENTAL_ALTER_CONFIG_DOC) + .define( + SYNC_TOPIC_ACLS_ENABLED, + ConfigDef.Type.BOOLEAN, + SYNC_TOPIC_ACLS_ENABLED_DEFAULT, + ConfigDef.Importance.LOW, + SYNC_TOPIC_ACLS_ENABLED_DOC) + .define( + SYNC_TOPIC_ACLS_INTERVAL_SECONDS, + ConfigDef.Type.LONG, + SYNC_TOPIC_ACLS_INTERVAL_SECONDS_DEFAULT, + ConfigDef.Importance.LOW, + SYNC_TOPIC_ACLS_INTERVAL_SECONDS_DOC) + .define( + REPLICATION_FACTOR, + ConfigDef.Type.INT, + REPLICATION_FACTOR_DEFAULT, + ConfigDef.Importance.LOW, + REPLICATION_FACTOR_DOC) + .define( + OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR, + ConfigDef.Type.SHORT, + OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DEFAULT, + ConfigDef.Importance.LOW, + OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DOC) + .define( + OFFSET_LAG_MAX, + ConfigDef.Type.LONG, + OFFSET_LAG_MAX_DEFAULT, + ConfigDef.Importance.LOW, + OFFSET_LAG_MAX_DOC) + .define( + OFFSET_SYNCS_TOPIC_LOCATION, + ConfigDef.Type.STRING, + OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT, + in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT), + ConfigDef.Importance.LOW, + OFFSET_SYNCS_TOPIC_LOCATION_DOC) + .define( + ADD_SOURCE_ALIAS_TO_METRICS, + ConfigDef.Type.BOOLEAN, + ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT, + ConfigDef.Importance.LOW, + ADD_SOURCE_ALIAS_TO_METRICS_DOC); + } + + protected final static ConfigDef CONNECTOR_CONFIG_DEF = defineSourceConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF)); public static void main(String[] args) { - System.out.println(SOURCE_CONFIG_DEF.toHtml(4, config -> "mirror_source_" + config)); + System.out.println(defineSourceConfig(new ConfigDef()).toHtml(4, config -> "mirror_source_" + config)); } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java index ea8c67e5d63d..eb6bdeebb9ed 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.TimeoutException; @@ -52,14 +51,6 @@ final class MirrorUtils { // utility class private MirrorUtils() {} - static ConfigDef mergeConnectorConfigDef(ConfigDef configDef) { - ConfigDef connectorConfigDef = new ConfigDef(MirrorConnectorConfig.BASE_CONNECTOR_CONFIG_DEF); - configDef.configKeys().entrySet().forEach(config -> { - connectorConfigDef.define(config.getValue().displayName, config.getValue().type(), config.getValue().defaultValue, config.getValue().validator, config.getValue().importance, config.getValue().documentation); - }); - return connectorConfigDef; - } - static KafkaProducer newProducer(Map props) { return new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()); } diff --git a/docs/configuration.html b/docs/configuration.html index e02ad06e8534..8da916408303 100644 --- a/docs/configuration.html +++ b/docs/configuration.html @@ -267,13 +267,20 @@

-

3.8 MirrorMaker Configs

- Below is the configuration of MirrorMaker. - + Below is the configuration of the connectors that make up MirrorMaker 2. +

3.8.1 MirrorMaker Source Configs

+ Below is the configuration of MirrorMaker 2 source connector for replicating topics. +

3.8.2 MirrorMaker Checkpoint Configs

+ Below is the configuration of MirrorMaker 2 checkpoint connector for emitting consumer offset checkpoints. +

3.8.3 MirrorMaker HeartBeat Configs

+ Below is the configuration of MirrorMaker 2 heartbeat connector for checking connectivity between connectors and clusters. +

3.8.4 MirrorMaker Common Configs

+ Below is the common configuration properties that apply to all three connectors above. +

3.9 System Properties

Kafka supports some configuration that can be enabled through Java system properties. System properties are usually set by passing the -D flag to the Java virtual machine in which Kafka components are running.