Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions config/kraft/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,33 @@ s3.stream.set.object.compaction.max.num=500
# and catch up read
s3.network.baseline.bandwidth=104857600

# Set to true to enable metrics collection
s3.telemetry.metrics.enable=false

# The metrics level to record, supported values are INFO, DEBUG
s3.telemetry.metrics.level=INFO

# The metrics exporter type, supported values are otlp, prometheus, log. Use comma to separate multiple exporters.
s3.telemetry.metrics.exporter.type=otlp

# The Prometheus HTTP server host and port, if exporter type is set to prometheus
# s3.metrics.exporter.prom.host=127.0.0.1
# s3.metrics.exporter.prom.port=9090

# Set to true to enable exporting tracing data to OTel Collector
s3.telemetry.tracer.enable=false

# The OTel Collector endpoint, if exporter type is set to otlp or tracing is enabled
# s3.telemetry.exporter.otlp.endpoint=http://${your_host_name}:4317

# Set following configurations for batching
s3.telemetry.tracer.span.scheduled.delay.ms=1000
s3.telemetry.tracer.span.max.queue.size=5120
s3.telemetry.tracer.span.max.batch.size=1024

# Metrics report interval
s3.telemetry.exporter.report.interval.ms=5000

############################# Settings for Auto Balancer #############################
# The metric reporter to collect and report metrics for Auto Balancer
metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter
Expand Down
27 changes: 27 additions & 0 deletions config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,33 @@ s3.stream.set.object.compaction.max.num=500
# and catch up read
s3.network.baseline.bandwidth=104857600

# Set to true to enable metrics collection
s3.telemetry.metrics.enable=false

# The metrics level to record, supported values are INFO, DEBUG
s3.telemetry.metrics.level=INFO

# The metrics exporter type, supported values are otlp, prometheus, log. Use comma to separate multiple exporters.
s3.telemetry.metrics.exporter.type=otlp

# The Prometheus HTTP server host and port, if exporter type is set to prometheus
# s3.metrics.exporter.prom.host=127.0.0.1
# s3.metrics.exporter.prom.port=9090

# Set to true to enable exporting tracing data to OTel Collector
s3.telemetry.tracer.enable=false

# The OTel Collector endpoint, if exporter type is set to otlp or tracing is enabled
# s3.telemetry.exporter.otlp.endpoint=http://${your_host_name}:4317

# Set following configurations for batching
s3.telemetry.tracer.span.scheduled.delay.ms=1000
s3.telemetry.tracer.span.max.queue.size=5120
s3.telemetry.tracer.span.max.batch.size=1024

# Metrics report interval
s3.telemetry.exporter.report.interval.ms=5000

############################# Settings for Auto Balancer #############################
# The metric reporter to collect and report metrics for Auto Balancer
metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.log.stream.s3.telemetry;

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.common.AttributeKey;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -139,24 +141,37 @@ private void init() {
autoCloseables.addAll(GarbageCollector.registerObservers(openTelemetrySdk));

Meter meter = openTelemetrySdk.getMeter(TelemetryConstants.TELEMETRY_SCOPE_NAME);
S3StreamMetricsManager.setMetricsLevel(metricsLevel());
S3StreamMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX);
S3StreamMetricsManager.initAttributesBuilder(() -> {
//TODO: cache and reuse attributes
//TODO: support metrics level
AttributesBuilder builder = attributesBuilderSupplier.get();
labelMap.forEach(builder::put);
return builder;
});
}

LOGGER.info("Instrument manager initialized with metrics: {}, trace: {} report interval: {}",
kafkaConfig.s3MetricsEnable(), kafkaConfig.s3TracerEnable(), kafkaConfig.s3ExporterReportIntervalMs());
LOGGER.info("Instrument manager initialized with metrics: {} (level: {}), trace: {} report interval: {}",
kafkaConfig.s3MetricsEnable(), kafkaConfig.s3MetricsLevel(), kafkaConfig.s3TracerEnable(), kafkaConfig.s3ExporterReportIntervalMs());
}

public static OpenTelemetrySdk getOpenTelemetrySdk() {
return openTelemetrySdk;
}

private MetricsLevel metricsLevel() {
String levelStr = kafkaConfig.s3MetricsLevel();
if (StringUtils.isBlank(levelStr)) {
return MetricsLevel.INFO;
}
try {
String up = levelStr.toUpperCase(Locale.ENGLISH);
return MetricsLevel.valueOf(up);
} catch (Exception e) {
LOGGER.error("illegal metrics level: {}", levelStr);
return MetricsLevel.INFO;
}
}

private SdkTracerProvider getTraceProvider(Resource resource) {
OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint(kafkaConfig.s3ExporterOTLPEndpoint())
Expand Down Expand Up @@ -185,6 +200,7 @@ private SdkMeterProvider getMetricsProvider(Resource resource) {
}
String[] exporterTypeArray = exporterTypes.split(",");
for (String exporterType : exporterTypeArray) {
exporterType = exporterType.trim();
switch (exporterType) {
case "otlp":
initOTLPExporter(sdkMeterProviderBuilder, kafkaConfig);
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ object KafkaConfig {
val S3TracerEnableProp = "s3.telemetry.tracer.enable"
val S3ExporterOTLPEndpointProp = "s3.telemetry.exporter.otlp.endpoint"
val S3ExporterReportIntervalMsProp = "s3.telemetry.exporter.report.interval.ms"
val S3MetricsLevelProp = "s3.telemetry.metrics.level"
val S3MetricsExporterTypeProp = "s3.telemetry.metrics.exporter.type"
val S3MetricsExporterPromHostProp = "s3.metrics.exporter.prom.host"
val S3MetricsExporterPromPortProp = "s3.metrics.exporter.prom.port"
Expand Down Expand Up @@ -775,6 +776,7 @@ object KafkaConfig {
val S3FailoverEnableDoc = "Failover mode: if enable, the controller will scan failed node and failover the failed node"
val S3MetricsEnableDoc = "Whether to enable metrics exporter for s3stream."
val S3TracerEnableDoc = "Whether to enable tracer exporter for s3stream."
val S3MetricsLevelDoc = "The metrics level, supported value: INFO, DEBUG"
val S3MetricsExporterTypeDoc = "The enabled S3 metrics exporters type, seperated by comma. Supported type: otlp, prometheus, log"
val S3ExporterOTLPEndpointDoc = "The endpoint of OTLP collector"
val S3ExporterReportIntervalMsDoc = "The interval in milliseconds to report telemetry"
Expand Down Expand Up @@ -1620,6 +1622,7 @@ object KafkaConfig {
.define(S3FailoverEnableProp, BOOLEAN, false, MEDIUM, S3FailoverEnableDoc)
.define(S3MetricsEnableProp, BOOLEAN, true, MEDIUM, S3MetricsEnableDoc)
.define(S3TracerEnableProp, BOOLEAN, false, MEDIUM, S3TracerEnableDoc)
.define(S3MetricsLevelProp, STRING, "INFO", MEDIUM, S3MetricsLevelDoc)
.define(S3MetricsExporterTypeProp, STRING, null, MEDIUM, S3MetricsExporterTypeDoc)
.define(S3ExporterOTLPEndpointProp, STRING, null, MEDIUM, S3ExporterOTLPEndpointDoc)
.define(S3MetricsExporterPromHostProp, STRING, null, MEDIUM, S3MetricsExporterPromHostDoc)
Expand Down Expand Up @@ -2202,6 +2205,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val s3FailoverEnable = getBoolean(KafkaConfig.S3FailoverEnableProp)
val s3MetricsEnable = getBoolean(KafkaConfig.S3MetricsEnableProp)
val s3TracerEnable = getBoolean(KafkaConfig.S3TracerEnableProp)
val s3MetricsLevel = getString(KafkaConfig.S3MetricsLevelProp)
val s3MetricsExporterType = getString(KafkaConfig.S3MetricsExporterTypeProp)
val s3ExporterOTLPEndpoint = getString(KafkaConfig.S3ExporterOTLPEndpointProp)
val s3ExporterReportIntervalMs = getInt(KafkaConfig.S3ExporterReportIntervalMsProp)
Expand Down
Loading