Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14669: Use the generated docs for MirrorMaker configs in the doc #13658

Merged
merged 4 commits into from Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion build.gradle
Expand Up @@ -1063,7 +1063,7 @@ project(':core') {
':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs',
':connect:runtime:genConnectMetricsDocs', ':connect:runtime:genConnectOpenAPIDocs',
':connect:mirror:genMirrorSourceConfigDocs', ':connect:mirror:genMirrorCheckpointConfigDocs',
':connect:mirror:genMirrorHeartbeatConfigDocs'], type: Tar) {
':connect:mirror:genMirrorHeartbeatConfigDocs', ':connect:mirror:genMirrorConnectorConfigDocs'], type: Tar) {
archiveClassifier = 'site-docs'
compression = Compression.GZIP
from project.file("$rootDir/docs")
Expand Down Expand Up @@ -2985,6 +2985,13 @@ project(':connect:mirror') {
duplicatesStrategy 'exclude'
}

task genMirrorConnectorConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'org.apache.kafka.connect.mirror.MirrorConnectorConfig'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "mirror_connector_config.html").newOutputStream()
}

task genMirrorSourceConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'org.apache.kafka.connect.mirror.MirrorSourceConfig'
Expand Down
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.kafka.connect.mirror.MirrorUtils.mergeConnectorConfigDef;

public class MirrorCheckpointConfig extends MirrorConnectorConfig {

protected static final String REFRESH_GROUPS = "refresh.groups";
Expand Down Expand Up @@ -165,7 +167,7 @@ Duration consumerPollTimeout() {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}

protected static final ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(BASE_CONNECTOR_CONFIG_DEF)
protected static final ConfigDef CHECKPOINT_CONFIG_DEF = new ConfigDef()
.define(
CONSUMER_POLL_TIMEOUT_MILLIS,
ConfigDef.Type.LONG,
Expand Down Expand Up @@ -252,7 +254,9 @@ Duration consumerPollTimeout() {
ConfigDef.Importance.LOW,
TOPIC_FILTER_CLASS_DOC);

protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(CHECKPOINT_CONFIG_DEF));

public static void main(String[] args) {
System.out.println(CONNECTOR_CONFIG_DEF.toHtml(4, config -> "mirror_checkpoint_" + config));
System.out.println(CHECKPOINT_CONFIG_DEF.toHtml(4, config -> "mirror_checkpoint_" + config));
}
}
Expand Up @@ -306,4 +306,8 @@ String entityLabel() {
)
.withClientSslSupport()
.withClientSaslSupport();

public static void main(String[] args) {
System.out.println(BASE_CONNECTOR_CONFIG_DEF.toHtml(4, config -> "mirror_connector_" + config));
}
}
Expand Up @@ -21,6 +21,8 @@
import java.time.Duration;
import java.util.Map;

import static org.apache.kafka.connect.mirror.MirrorUtils.mergeConnectorConfigDef;

public class MirrorHeartbeatConfig extends MirrorConnectorConfig {

protected static final String EMIT_HEARTBEATS = "emit.heartbeats";
Expand Down Expand Up @@ -57,7 +59,7 @@ short heartbeatsTopicReplicationFactor() {
return getShort(HEARTBEATS_TOPIC_REPLICATION_FACTOR);
}

protected static final ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(BASE_CONNECTOR_CONFIG_DEF)
protected static final ConfigDef HEARTBEAT_CONFIG_DEF = new ConfigDef()
tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
.define(
EMIT_HEARTBEATS_ENABLED,
ConfigDef.Type.BOOLEAN,
Expand All @@ -77,7 +79,9 @@ short heartbeatsTopicReplicationFactor() {
ConfigDef.Importance.LOW,
HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC);

protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(HEARTBEAT_CONFIG_DEF));
C0urante marked this conversation as resolved.
Show resolved Hide resolved

public static void main(String[] args) {
System.out.println(CONNECTOR_CONFIG_DEF.toHtml(4, config -> "mirror_heartbeat_" + config));
System.out.println(HEARTBEAT_CONFIG_DEF.toHtml(4, config -> "mirror_heartbeat_" + config));
tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Collectors;

import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.connect.mirror.MirrorUtils.mergeConnectorConfigDef;

public class MirrorSourceConfig extends MirrorConnectorConfig {

Expand Down Expand Up @@ -220,7 +221,7 @@ boolean addSourceAliasToMetrics() {
return getBoolean(ADD_SOURCE_ALIAS_TO_METRICS);
}

protected static final ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(BASE_CONNECTOR_CONFIG_DEF)
protected static final ConfigDef SOURCE_CONFIG_DEF = new ConfigDef()
.define(
TOPICS,
ConfigDef.Type.LIST,
Expand Down Expand Up @@ -344,7 +345,9 @@ boolean addSourceAliasToMetrics() {
ConfigDef.Importance.LOW,
ADD_SOURCE_ALIAS_TO_METRICS_DOC);

public static void main(String[] args) {
System.out.println(CONNECTOR_CONFIG_DEF.toHtml(4, config -> "mirror_source_" + config));
protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(SOURCE_CONFIG_DEF));

public static void main(String[] args) {
System.out.println(SOURCE_CONFIG_DEF.toHtml(4, config -> "mirror_source_" + config));
}
}
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -51,6 +52,14 @@ final class MirrorUtils {
// utility class
private MirrorUtils() {}

static ConfigDef mergeConnectorConfigDef(ConfigDef configDef) {
ConfigDef connectorConfigDef = new ConfigDef(MirrorConnectorConfig.BASE_CONNECTOR_CONFIG_DEF);
configDef.configKeys().entrySet().forEach(config -> {
connectorConfigDef.define(config.getValue().displayName, config.getValue().type(), config.getValue().defaultValue, config.getValue().validator, config.getValue().importance, config.getValue().documentation);
});
return connectorConfigDef;
}

static KafkaProducer<byte[], byte[]> newProducer(Map<String, Object> props) {
return new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
}
Expand Down
36 changes: 22 additions & 14 deletions docs/configuration.html
Expand Up @@ -267,23 +267,31 @@ <h3 class="anchor-heading"><a id="adminclientconfigs" class="anchor-link"></a><a
Below is the configuration of the Kafka Admin client library.
<!--#include virtual="generated/admin_client_config.html" -->

<h3 class="anchor-heading"><a id="systemproperties" class="anchor-link"></a><a href="#systemproperties">3.8 System Properties</a></h3>

tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
<h3 class="anchor-heading"><a id="mirrormakerconfigs" class="anchor-link"></a><a href="#mirrormakerconfigs">3.8 MirrorMaker Configs</a></h3>
Below is the configuration of MirrorMaker.
C0urante marked this conversation as resolved.
Show resolved Hide resolved
<!--#include virtual="generated/mirror_connector_config.html" -->
tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
<!--#include virtual="generated/mirror_source_config.html" -->
tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
<!--#include virtual="generated/mirror_checkpoint_config.html" -->
<!--#include virtual="generated/mirror_heartbeat_config.html" -->

<h3 class="anchor-heading"><a id="systemproperties" class="anchor-link"></a><a href="#systemproperties">3.9 System Properties</a></h3>
Kafka supports some configuration that can be enabled through Java system properties. System properties are usually set by passing the -D flag to the Java virtual machine in which Kafka components are running.
Below are the supported system properties.
<ul class="config-list">
<li>
<h4><a id="org.apache.kafka.disallowed.login.modules"></a><a id="systemproperties_org.apache.kafka.disallowed.login.modules" href="#systemproperties_org.apache.kafka.disallowed.login.modules">org.apache.kafka.disallowed.login.modules</a></h4>
<p>This system property is used to disable the problematic login modules usage in SASL JAAS configuration. This property accepts comma-separated list of loginModule names. By default <b>com.sun.security.auth.module.JndiLoginModule</b> loginModule is disabled.
<p>If users want to enable JndiLoginModule, users need to explicitly reset the system property like below. We advise the users to validate configurations and only allow trusted JNDI configurations. For more details <a href="https://nvd.nist.gov/vuln/detail/CVE-2023-25194">CVE-2023-25194</a>.
<p><pre class="brush: bash;"> -Dorg.apache.kafka.disallowed.login.modules=</pre>
<p>To disable more loginModules, update the system property with comma-separated loginModule names. Make sure to explicitly add <b>JndiLoginModule</b> module name to the comma-separated list like below.
<p><pre class="brush: bash;"> -Dorg.apache.kafka.disallowed.login.modules=com.sun.security.auth.module.JndiLoginModule,com.ibm.security.auth.module.LdapLoginModule,com.ibm.security.auth.module.Krb5LoginModule</pre>
<table><tbody>
<tr><th>Since:</th><td>3.4.0</td></tr>
<tr><th>Default Value:</th><td>com.sun.security.auth.module.JndiLoginModule</td></tr>
</tbody></table>
</li>
</ul>
<li>
<h4><a id="org.apache.kafka.disallowed.login.modules"></a><a id="systemproperties_org.apache.kafka.disallowed.login.modules" href="#systemproperties_org.apache.kafka.disallowed.login.modules">org.apache.kafka.disallowed.login.modules</a></h4>
<p>This system property is used to disable the problematic login modules usage in SASL JAAS configuration. This property accepts comma-separated list of loginModule names. By default <b>com.sun.security.auth.module.JndiLoginModule</b> loginModule is disabled.
<p>If users want to enable JndiLoginModule, users need to explicitly reset the system property like below. We advise the users to validate configurations and only allow trusted JNDI configurations. For more details <a href="https://nvd.nist.gov/vuln/detail/CVE-2023-25194">CVE-2023-25194</a>.
<p><pre class="brush: bash;"> -Dorg.apache.kafka.disallowed.login.modules=</pre>
<p>To disable more loginModules, update the system property with comma-separated loginModule names. Make sure to explicitly add <b>JndiLoginModule</b> module name to the comma-separated list like below.
<p><pre class="brush: bash;"> -Dorg.apache.kafka.disallowed.login.modules=com.sun.security.auth.module.JndiLoginModule,com.ibm.security.auth.module.LdapLoginModule,com.ibm.security.auth.module.Krb5LoginModule</pre>
<table><tbody>
<tr><th>Since:</th><td>3.4.0</td></tr>
<tr><th>Default Value:</th><td>com.sun.security.auth.module.JndiLoginModule</td></tr>
</tbody></table>
</li>
</ul>
</script>

<div class="p-configuration"></div>
20 changes: 1 addition & 19 deletions docs/ops.html
Expand Up @@ -766,27 +766,9 @@ <h5 class="anchor-heading"><a id="georeplication-flow-configure" class="anchor-l
</code></pre>

<p>
Additional configuration settings are supported, some of which are listed below. In most cases, you can leave these settings at their default values. See <a href="https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java">MirrorMakerConfig</a> and <a href="https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java">MirrorConnectorConfig</a> for further details.
Additional configuration settings are supported which can be left with their default values in most cases. See <a href="https://kafka.apache.org/documentation/#mirrormakerconfigs">MirrorMaker Configs</a>.
tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
</p>

<ul>
<li><code>refresh.topics.enabled</code>: whether to check for new topics in the source cluster periodically (default: true)
<li><code>refresh.topics.interval.seconds</code>: frequency of checking for new topics in the source cluster; lower values than the default may lead to performance degradation (default: 600, every ten minutes)
<li><code>refresh.groups.enabled</code>: whether to check for new consumer groups in the source cluster periodically (default: true)
<li><code>refresh.groups.interval.seconds</code>: frequency of checking for new consumer groups in the source cluster; lower values than the default may lead to performance degradation (default: 600, every ten minutes)
<li><code>sync.topic.configs.enabled</code>: whether to replicate topic configurations from the source cluster (default: true)
<li><code>sync.topic.acls.enabled</code>: whether to sync ACLs from the source cluster (default: true)
<li><code>emit.heartbeats.enabled</code>: whether to emit heartbeats periodically (default: true)
<li><code>emit.heartbeats.interval.seconds</code>: frequency at which heartbeats are emitted (default: 1, every one seconds)
<li><code>heartbeats.topic.replication.factor</code>: replication factor of MirrorMaker's internal heartbeat topics (default: 3)
<li><code>emit.checkpoints.enabled</code>: whether to emit MirrorMaker's consumer offsets periodically (default: true)
<li><code>emit.checkpoints.interval.seconds</code>: frequency at which checkpoints are emitted (default: 60, every minute)
<li><code>checkpoints.topic.replication.factor</code>: replication factor of MirrorMaker's internal checkpoints topics (default: 3)
<li><code>sync.group.offsets.enabled</code>: whether to periodically write the translated offsets of replicated consumer groups (in the source cluster) to <code>__consumer_offsets</code> topic in target cluster, as long as no active consumers in that group are connected to the target cluster (default: false)
<li><code>sync.group.offsets.interval.seconds</code>: frequency at which consumer group offsets are synced (default: 60, every minute)
<li><code>offset-syncs.topic.replication.factor</code>: replication factor of MirrorMaker's internal offset-sync topics (default: 3)
</ul>

<h5 class="anchor-heading"><a id="georeplication-flow-secure" class="anchor-link"></a><a href="#georeplication-flow-secure">Securing Replication Flows</a></h5>

<p>
Expand Down