Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15294: Publish remote storage configs #14266

Merged
merged 2 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 16 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,8 @@ project(':core') {
':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs',
':connect:runtime:genConnectMetricsDocs', ':connect:runtime:genConnectOpenAPIDocs',
':connect:mirror:genMirrorSourceConfigDocs', ':connect:mirror:genMirrorCheckpointConfigDocs',
':connect:mirror:genMirrorHeartbeatConfigDocs', ':connect:mirror:genMirrorConnectorConfigDocs'], type: Tar) {
':connect:mirror:genMirrorHeartbeatConfigDocs', ':connect:mirror:genMirrorConnectorConfigDocs',
':storage:genRemoteLogManagerConfigDoc', ':storage:genRemoteLogMetadataManagerConfigDoc'], type: Tar) {
archiveClassifier = 'site-docs'
compression = Compression.GZIP
from project.file("$rootDir/docs")
Expand Down Expand Up @@ -1766,6 +1767,20 @@ project(':storage') {
outputs.dir("src/generated/java/org/apache/kafka/server/log/remote/metadata/storage/generated")
}

task genRemoteLogManagerConfigDoc(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "remote_log_manager_config.html").newOutputStream()
}

task genRemoteLogMetadataManagerConfigDoc(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "remote_log_metadata_manager_config.html").newOutputStream()
}

sourceSets {
main {
java {
Expand Down
7 changes: 6 additions & 1 deletion docs/configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,12 @@ <h4 class="anchor-heading"><a id="mirrormakerheartbeatconfigs" class="anchor-lin
Below is the configuration of MirrorMaker 2 heartbeat connector for checking connectivity between connectors and clusters.
<!--#include virtual="generated/mirror_heartbeat_config.html" -->

<h3 class="anchor-heading"><a id="systemproperties" class="anchor-link"></a><a href="#systemproperties">3.9 System Properties</a></h3>
<h3 class="anchor-heading"><a id="remotestorageconfigs" class="anchor-link"></a><a href="#remotestorageconfigs">3.9 Tier Storage Configs</a></h3>
Below is the configuration of Tier Storage.
tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
<!--#include virtual="generated/remote_log_manager_config.html" -->
<!--#include virtual="generated/remote_log_metadata_manager_config.html" -->

<h3 class="anchor-heading"><a id="systemproperties" class="anchor-link"></a><a href="#systemproperties">3.10 System Properties</a></h3>
Kafka supports some configuration that can be enabled through Java system properties. System properties are usually set by passing the -D flag to the Java virtual machine in which Kafka components are running.
Below are the supported system properties.
<ul class="config-list">
Expand Down
3 changes: 2 additions & 1 deletion docs/toc.html
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
<li><a href="#streamsconfigs">3.6 Kafka Streams Configs</a>
<li><a href="#adminclientconfigs">3.7 AdminClient Configs</a>
<li><a href="#mirrormakerconfigs">3.8 MirrorMaker Configs</a>
<li><a href="#systemproperties">3.9 System Properties</a>
<li><a href="#remotestorageconfigs">3.9 Tier Storage Configs</a></li>
tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
<li><a href="#systemproperties">3.10 System Properties</a>
</ul>
</li>
<li><a href="#design">4. Design</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,26 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP = "remote.log.metadata.initialization.retry.interval.ms";

public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS = -1L;
public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS = -1L;
public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 2 * 60 * 1000L;
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS = 2 * 60 * 1000L;
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS = 5 * 1000L;

public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor of remote log metadata Topic.";
public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC = "The number of partitions for remote log metadata Topic.";
public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC = "Remote log metadata topic log retention in milli seconds." +
public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor of remote log metadata topic.";
public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC = "The number of partitions for remote log metadata topic.";
public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC = "Retention of remote log metadata topic in milliseconds. " +
"Default: -1, that means unlimited. Users can configure this value based on their use cases. " +
"To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with " +
"tiered storage in the cluster.";
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milli seconds to wait for the local consumer to " +
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milliseconds to wait for the local consumer to " +
"receive the published event.";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval in milli seconds for " +
" retrying RemoteLogMetadataManager resources initialization again.";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval in milliseconds for " +
"retrying RemoteLogMetadataManager resources initialization again.";

public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_DOC = "The maximum amount of time in milli seconds " +
" for retrying RemoteLogMetadataManager resources initialization. When total retry intervals reach this timeout, initialization" +
" is considered as failed and broker starts shutting down.";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds " +
"for retrying RemoteLogMetadataManager resources initialization. When total retry intervals reach this timeout, initialization " +
"is considered as failed and broker starts shutting down.";

public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX = "remote.log.metadata.common.client.";
public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = "remote.log.metadata.producer.";
Expand All @@ -84,7 +84,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC)
.define(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, INT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS, atLeast(1), LOW,
REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC)
.define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS, LOW,
.define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS, LOW,
REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC)
.define(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS, atLeast(0), LOW,
REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC)
Expand Down Expand Up @@ -244,4 +244,8 @@ public String toString() {
", producerProps=" + producerProps +
'}';
}

public static void main(String[] args) {
System.out.println(CONFIG.toHtml(4, config -> "remote_log_metadata_manager_" + config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,20 @@ public final class RemoteLogManagerConfig {
public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC = "Fully qualified class name of `RemoteLogStorageManager` implementation.";

public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP = "remote.log.storage.manager.class.path";
public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC = "Class path of the `RemoteLogStorageManager` implementation." +
"If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded by a dedicated" +
"classloader which searches this class path before the Kafka broker class path. The syntax of this parameter is same" +
"with the standard Java class path string.";
public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC = "Class path of the `RemoteLogStorageManager` implementation. " +
"If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded by a dedicated " +
"classloader which searches this class path before the Kafka broker class path. The syntax of this parameter is same " +
"as the standard Java class path string.";

public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = "remote.log.metadata.manager.class.name";
public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = "Fully qualified class name of `RemoteLogMetadataManager` implementation.";
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME = "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager";

public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = "remote.log.metadata.manager.class.path";
public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = "Class path of the `RemoteLogMetadataManager` implementation." +
"If specified, the RemoteLogMetadataManager implementation and its dependent libraries will be loaded by a dedicated" +
"classloader which searches this class path before the Kafka broker class path. The syntax of this parameter is same" +
"with the standard Java class path string.";
public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = "Class path of the `RemoteLogMetadataManager` implementation. " +
"If specified, the RemoteLogMetadataManager implementation and its dependent libraries will be loaded by a dedicated " +
"classloader which searches this class path before the Kafka broker class path. The syntax of this parameter is same " +
"as the standard Java class path string.";

public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP = "remote.log.metadata.manager.listener.name";
public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = "Listener name of the local broker to which it should get connected if " +
Expand Down Expand Up @@ -146,111 +146,111 @@ public final class RemoteLogManagerConfig {
public static final ConfigDef CONFIG_DEF = new ConfigDef();

static {
CONFIG_DEF.defineInternal(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
CONFIG_DEF.define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
BOOLEAN,
DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE,
null,
MEDIUM,
REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
.defineInternal(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
.define(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
STRING,
DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
.define(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
STRING,
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)
.defineInternal(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, STRING,
.define(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, STRING,
null,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC)
.defineInternal(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING,
.define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING,
null,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
.define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
STRING,
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
.define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
STRING,
null,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING,
.define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING,
null,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
.defineInternal(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
.define(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
INT,
DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
atLeast(0),
LOW,
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
.define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
tinaselenge marked this conversation as resolved.
Show resolved Hide resolved
LONG,
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
atLeast(1),
LOW,
REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC)
.defineInternal(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
.define(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
INT,
DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
atLeast(1),
MEDIUM,
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
.defineInternal(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
.define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
LONG,
DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS,
atLeast(1),
LOW,
REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC)
.defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP,
.define(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP,
LONG,
DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS,
atLeast(1),
LOW,
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC)
.defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP,
.define(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP,
LONG,
DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS,
atLeast(1), LOW,
REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC)
.defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
.define(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
DOUBLE,
DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_JITTER,
between(0, 0.5),
LOW,
REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_DOC)
.defineInternal(REMOTE_LOG_READER_THREADS_PROP,
.define(REMOTE_LOG_READER_THREADS_PROP,
INT,
DEFAULT_REMOTE_LOG_READER_THREADS,
atLeast(1),
MEDIUM,
REMOTE_LOG_READER_THREADS_DOC)
.defineInternal(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
.define(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
INT,
DEFAULT_REMOTE_LOG_READER_MAX_PENDING_TASKS,
atLeast(1),
MEDIUM,
REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC)
.defineInternal(LOG_LOCAL_RETENTION_MS_PROP,
.define(LOG_LOCAL_RETENTION_MS_PROP,
LONG,
DEFAULT_LOG_LOCAL_RETENTION_MS,
atLeast(DEFAULT_LOG_LOCAL_RETENTION_MS),
MEDIUM,
LOG_LOCAL_RETENTION_MS_DOC)
.defineInternal(LOG_LOCAL_RETENTION_BYTES_PROP,
.define(LOG_LOCAL_RETENTION_BYTES_PROP,
LONG,
DEFAULT_LOG_LOCAL_RETENTION_BYTES,
atLeast(DEFAULT_LOG_LOCAL_RETENTION_BYTES),
Expand Down Expand Up @@ -457,4 +457,8 @@ public int hashCode() {
remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix);
}

public static void main(String[] args) {
System.out.println(CONFIG_DEF.toHtml(4, config -> "remote_log_manager_" + config));
}
}