diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryConstants.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryConstants.java index 3644b74f3c..0f10b9d180 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryConstants.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryConstants.java @@ -14,6 +14,8 @@ import io.opentelemetry.api.common.AttributeKey; public class TelemetryConstants { + // The maximum number of combinations of attributes for a single metric + public static final int CARDINALITY_LIMIT = 10000; public static final String COMMON_JMX_YAML_CONFIG_PATH = "/jmx/rules/common.yaml"; public static final String BROKER_JMX_YAML_CONFIG_PATH = "/jmx/rules/broker.yaml"; public static final String CONTROLLER_JMX_YAML_CONFIG_PATH = "/jmx/rules/controller.yaml"; diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java index 8f21e55412..56e5768c96 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java @@ -42,6 +42,7 @@ import io.opentelemetry.sdk.metrics.export.MetricReader; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReaderBuilder; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.SpanProcessor; @@ -289,7 +290,8 @@ private void initOTLPExporter(SdkMeterProviderBuilder sdkMeterProviderBuilder, K MetricReader periodicReader = builder.setInterval(Duration.ofMillis(kafkaConfig.s3ExporterReportIntervalMs())).build(); metricReaderList.add(periodicReader); - sdkMeterProviderBuilder.registerMetricReader(periodicReader); + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(sdkMeterProviderBuilder, periodicReader, + instrumentType -> TelemetryConstants.CARDINALITY_LIMIT); LOGGER.info("OTLP exporter registered, endpoint: {}, protocol: {}", otlpExporterHost, protocol); } @@ -302,7 +304,8 @@ private void initLogExporter(SdkMeterProviderBuilder sdkMeterProviderBuilder, Ka metricReaderList.add(periodicReader); metricsLogger = java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()); metricsLogger.setLevel(Level.FINEST); - sdkMeterProviderBuilder.registerMetricReader(periodicReader); + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(sdkMeterProviderBuilder, periodicReader, + instrumentType -> TelemetryConstants.CARDINALITY_LIMIT); LOGGER.info("Log exporter registered"); } @@ -317,7 +320,8 @@ private void initPrometheusExporter(SdkMeterProviderBuilder sdkMeterProviderBuil .setHost(promExporterHost) .setPort(promExporterPort) .build(); - sdkMeterProviderBuilder.registerMetricReader(prometheusHttpServer); + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(sdkMeterProviderBuilder, prometheusHttpServer, + instrumentType -> TelemetryConstants.CARDINALITY_LIMIT); LOGGER.info("Prometheus exporter registered, host: {}, port: {}", promExporterHost, promExporterPort); }