Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tinaselenge committed Jul 17, 2023
1 parent 5f9307f commit af2388b
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 237 deletions.
Expand Up @@ -23,8 +23,6 @@
import java.util.List;
import java.util.Map;

import static org.apache.kafka.connect.mirror.MirrorUtils.mergeConnectorConfigDef;

public class MirrorCheckpointConfig extends MirrorConnectorConfig {

protected static final String REFRESH_GROUPS = "refresh.groups";
Expand Down Expand Up @@ -167,96 +165,98 @@ Duration consumerPollTimeout() {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}

protected static final ConfigDef CHECKPOINT_CONFIG_DEF = new ConfigDef()
.define(
CONSUMER_POLL_TIMEOUT_MILLIS,
ConfigDef.Type.LONG,
CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT,
ConfigDef.Importance.LOW,
CONSUMER_POLL_TIMEOUT_MILLIS_DOC)
.define(
GROUPS,
ConfigDef.Type.LIST,
GROUPS_DEFAULT,
ConfigDef.Importance.HIGH,
GROUPS_DOC)
.define(
GROUPS_EXCLUDE,
ConfigDef.Type.LIST,
GROUPS_EXCLUDE_DEFAULT,
ConfigDef.Importance.HIGH,
GROUPS_EXCLUDE_DOC)
.define(
GROUPS_EXCLUDE_ALIAS,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.HIGH,
"Deprecated. Use " + GROUPS_EXCLUDE + " instead.")
.define(
GROUP_FILTER_CLASS,
ConfigDef.Type.CLASS,
GROUP_FILTER_CLASS_DEFAULT,
ConfigDef.Importance.LOW,
GROUP_FILTER_CLASS_DOC)
.define(
REFRESH_GROUPS_ENABLED,
ConfigDef.Type.BOOLEAN,
REFRESH_GROUPS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
REFRESH_GROUPS_ENABLED_DOC)
.define(
REFRESH_GROUPS_INTERVAL_SECONDS,
ConfigDef.Type.LONG,
REFRESH_GROUPS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
REFRESH_GROUPS_INTERVAL_SECONDS_DOC)
.define(
EMIT_CHECKPOINTS_ENABLED,
ConfigDef.Type.BOOLEAN,
EMIT_CHECKPOINTS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_CHECKPOINTS_ENABLED_DOC)
.define(
EMIT_CHECKPOINTS_INTERVAL_SECONDS,
ConfigDef.Type.LONG,
EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC)
.define(
SYNC_GROUP_OFFSETS_ENABLED,
ConfigDef.Type.BOOLEAN,
SYNC_GROUP_OFFSETS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_GROUP_OFFSETS_ENABLED_DOC)
.define(
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS,
ConfigDef.Type.LONG,
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC)
.define(
CHECKPOINTS_TOPIC_REPLICATION_FACTOR,
ConfigDef.Type.SHORT,
CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT,
ConfigDef.Importance.LOW,
CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC)
.define(
OFFSET_SYNCS_TOPIC_LOCATION,
ConfigDef.Type.STRING,
OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT,
ConfigDef.ValidString.in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT),
ConfigDef.Importance.LOW,
OFFSET_SYNCS_TOPIC_LOCATION_DOC)
.define(
TOPIC_FILTER_CLASS,
ConfigDef.Type.CLASS,
TOPIC_FILTER_CLASS_DEFAULT,
ConfigDef.Importance.LOW,
TOPIC_FILTER_CLASS_DOC);
private static ConfigDef defineCheckpointConfig(ConfigDef baseConfig) {
return baseConfig
.define(
CONSUMER_POLL_TIMEOUT_MILLIS,
ConfigDef.Type.LONG,
CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT,
ConfigDef.Importance.LOW,
CONSUMER_POLL_TIMEOUT_MILLIS_DOC)
.define(
GROUPS,
ConfigDef.Type.LIST,
GROUPS_DEFAULT,
ConfigDef.Importance.HIGH,
GROUPS_DOC)
.define(
GROUPS_EXCLUDE,
ConfigDef.Type.LIST,
GROUPS_EXCLUDE_DEFAULT,
ConfigDef.Importance.HIGH,
GROUPS_EXCLUDE_DOC)
.define(
GROUPS_EXCLUDE_ALIAS,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.HIGH,
"Deprecated. Use " + GROUPS_EXCLUDE + " instead.")
.define(
GROUP_FILTER_CLASS,
ConfigDef.Type.CLASS,
GROUP_FILTER_CLASS_DEFAULT,
ConfigDef.Importance.LOW,
GROUP_FILTER_CLASS_DOC)
.define(
REFRESH_GROUPS_ENABLED,
ConfigDef.Type.BOOLEAN,
REFRESH_GROUPS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
REFRESH_GROUPS_ENABLED_DOC)
.define(
REFRESH_GROUPS_INTERVAL_SECONDS,
ConfigDef.Type.LONG,
REFRESH_GROUPS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
REFRESH_GROUPS_INTERVAL_SECONDS_DOC)
.define(
EMIT_CHECKPOINTS_ENABLED,
ConfigDef.Type.BOOLEAN,
EMIT_CHECKPOINTS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_CHECKPOINTS_ENABLED_DOC)
.define(
EMIT_CHECKPOINTS_INTERVAL_SECONDS,
ConfigDef.Type.LONG,
EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC)
.define(
SYNC_GROUP_OFFSETS_ENABLED,
ConfigDef.Type.BOOLEAN,
SYNC_GROUP_OFFSETS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_GROUP_OFFSETS_ENABLED_DOC)
.define(
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS,
ConfigDef.Type.LONG,
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC)
.define(
CHECKPOINTS_TOPIC_REPLICATION_FACTOR,
ConfigDef.Type.SHORT,
CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT,
ConfigDef.Importance.LOW,
CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC)
.define(
OFFSET_SYNCS_TOPIC_LOCATION,
ConfigDef.Type.STRING,
OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT,
ConfigDef.ValidString.in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT),
ConfigDef.Importance.LOW,
OFFSET_SYNCS_TOPIC_LOCATION_DOC)
.define(
TOPIC_FILTER_CLASS,
ConfigDef.Type.CLASS,
TOPIC_FILTER_CLASS_DEFAULT,
ConfigDef.Importance.LOW,
TOPIC_FILTER_CLASS_DOC);
}

protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(CHECKPOINT_CONFIG_DEF));
protected final static ConfigDef CONNECTOR_CONFIG_DEF = defineCheckpointConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF));

public static void main(String[] args) {
System.out.println(CHECKPOINT_CONFIG_DEF.toHtml(4, config -> "mirror_checkpoint_" + config));
System.out.println(defineCheckpointConfig(new ConfigDef()).toHtml(4, config -> "mirror_checkpoint_" + config));
}
}
Expand Up @@ -21,8 +21,6 @@
import java.time.Duration;
import java.util.Map;

import static org.apache.kafka.connect.mirror.MirrorUtils.mergeConnectorConfigDef;

public class MirrorHeartbeatConfig extends MirrorConnectorConfig {

protected static final String EMIT_HEARTBEATS = "emit.heartbeats";
Expand Down Expand Up @@ -59,29 +57,31 @@ short heartbeatsTopicReplicationFactor() {
return getShort(HEARTBEATS_TOPIC_REPLICATION_FACTOR);
}

protected static final ConfigDef HEARTBEAT_CONFIG_DEF = new ConfigDef()
.define(
private static ConfigDef defineHeartbeatConfig(ConfigDef baseConfig) {
return baseConfig
.define(
EMIT_HEARTBEATS_ENABLED,
ConfigDef.Type.BOOLEAN,
EMIT_HEARTBEATS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_HEARTBEATS_ENABLED_DOC)
.define(
.define(
EMIT_HEARTBEATS_INTERVAL_SECONDS,
ConfigDef.Type.LONG,
EMIT_HEARTBEATS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_HEARTBEATS_INTERVAL_SECONDS_DOC)
.define(
.define(
HEARTBEATS_TOPIC_REPLICATION_FACTOR,
ConfigDef.Type.SHORT,
HEARTBEATS_TOPIC_REPLICATION_FACTOR_DEFAULT,
ConfigDef.Importance.LOW,
HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC);
}

protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(HEARTBEAT_CONFIG_DEF));
protected final static ConfigDef CONNECTOR_CONFIG_DEF = defineHeartbeatConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF));

public static void main(String[] args) {
System.out.println(HEARTBEAT_CONFIG_DEF.toHtml(4, config -> "mirror_heartbeat_" + config));
System.out.println(defineHeartbeatConfig(new ConfigDef()).toHtml(4, config -> "mirror_heartbeat_" + config));
}
}

0 comments on commit af2388b

Please sign in to comment.