From 6336e896a969add0b9e59593a4194aa6eca94164 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 28 Dec 2023 15:55:38 +0800 Subject: [PATCH 1/5] feat(core): introduce s3stream tracing Signed-off-by: Shichao Nie --- build.gradle | 8 +- .../kafka/log/stream/s3/DefaultS3Client.java | 4 - .../stream/s3/metrics/MetricsExporter.java | 174 ----------- .../log/stream/s3/telemetry/ContextUtils.java | 39 +++ .../s3/telemetry/TelemetryConstants.java | 29 ++ .../stream/s3/telemetry/TelemetryManager.java | 270 ++++++++++++++++++ .../log/streamaspect/AlwaysSuccessClient.java | 29 +- .../DefaultElasticStreamSlice.java | 11 +- .../streamaspect/ElasticLogFileRecords.java | 45 ++- .../log/streamaspect/ElasticStreamSlice.java | 13 +- .../kafka/log/streamaspect/LazyStream.java | 15 +- .../kafka/log/streamaspect/MemoryClient.java | 7 +- .../kafka/log/streamaspect/MetaStream.java | 11 +- .../scala/kafka/server/BrokerServer.scala | 10 +- .../main/scala/kafka/server/KafkaConfig.scala | 42 ++- .../scala/kafka/server/SharedServer.scala | 2 - .../streamaspect/AlwaysSuccessClientTest.java | 7 +- gradle/dependencies.gradle | 9 +- 18 files changed, 479 insertions(+), 246 deletions(-) delete mode 100644 core/src/main/scala/kafka/log/stream/s3/metrics/MetricsExporter.java create mode 100644 core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java create mode 100644 core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryConstants.java create mode 100644 core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java diff --git a/build.gradle b/build.gradle index 99b4a70623..7d090a858a 100644 --- a/build.gradle +++ b/build.gradle @@ -960,14 +960,18 @@ project(':core') { exclude group: 'org.slf4j', module: '*' exclude group: 'net.sourceforge.argparse4j', module: '*' } - implementation libs.opentelemetryApi + implementation libs.opentelemetryJava17 + implementation libs.opentelemetryOshi implementation libs.opentelemetrySdk implementation libs.opentelemetrySdkMetrics implementation libs.opentelemetryExporterLogging - implementation libs.opentelemetrySemconv implementation libs.opentelemetryExporterProm implementation libs.opentelemetryExporterOTLP + implementation(libs.oshi) { + exclude group: 'org.slf4j', module: '*' + } + // https://github.com/netty/netty-tcnative/issues/716 // After 2.0.48, gradle project need explicitly declare the tcnative dependencies with classifiers implementation 'io.netty:netty-tcnative-boringssl-static:2.0.48.Final' diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index 0525c49f6e..04667a7b38 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -38,7 +38,6 @@ import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor; import com.automq.stream.utils.LogContext; import kafka.log.stream.s3.metadata.StreamMetadataManager; -import kafka.log.stream.s3.metrics.MetricsExporter; import kafka.log.stream.s3.network.ControllerRequestSender; import kafka.log.stream.s3.objects.ControllerObjectManager; import kafka.log.stream.s3.streams.ControllerStreamManager; @@ -75,7 +74,6 @@ public class DefaultS3Client implements Client { private final AsyncNetworkBandwidthLimiter networkInboundLimiter; private final AsyncNetworkBandwidthLimiter networkOutboundLimiter; - private final MetricsExporter metricsExporter; public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) { this.config = ConfigUtils.to(kafkaConfig); @@ -112,7 +110,6 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) { } S3StreamThreadPoolMonitor.config(new LogContext("ThreadPoolMonitor").logger("s3.threads.logger"), TimeUnit.SECONDS.toMillis(5)); S3StreamThreadPoolMonitor.init(); - this.metricsExporter = new MetricsExporter(kafkaConfig); } @Override @@ -130,7 +127,6 @@ public void shutdown() { this.networkInboundLimiter.shutdown(); this.networkOutboundLimiter.shutdown(); this.requestSender.shutdown(); - this.metricsExporter.shutdown(); LOGGER.info("S3Client shutdown successfully"); } diff --git a/core/src/main/scala/kafka/log/stream/s3/metrics/MetricsExporter.java b/core/src/main/scala/kafka/log/stream/s3/metrics/MetricsExporter.java deleted file mode 100644 index 7c0fbc28b2..0000000000 --- a/core/src/main/scala/kafka/log/stream/s3/metrics/MetricsExporter.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.stream.s3.metrics; - -import com.automq.stream.s3.metrics.S3StreamMetricsManager; -import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; -import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.exporter.logging.LoggingMetricExporter; -import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; -import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; -import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; -import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.export.MetricReader; -import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; -import io.opentelemetry.sdk.resources.Resource; -import io.opentelemetry.semconv.ResourceAttributes; -import kafka.server.KafkaConfig; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.bridge.SLF4JBridgeHandler; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; -import java.util.logging.Level; - -public class MetricsExporter { - private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class); - private static final String KAFKA_METRICS_PREFIX = "kafka_stream_"; - private static String clusterId = "default"; - private static java.util.logging.Logger metricsLogger; - private final KafkaConfig kafkaConfig; - private final Map, String> labelMap; - private final Supplier attributesBuilderSupplier; - private OpenTelemetrySdk openTelemetrySdk; - private PrometheusHttpServer prometheusHttpServer; - - public MetricsExporter(KafkaConfig kafkaConfig) { - this.kafkaConfig = kafkaConfig; - this.labelMap = new HashMap<>(); - this.attributesBuilderSupplier = Attributes::builder; - init(); - } - - public static void setClusterId(String clusterId) { - MetricsExporter.clusterId = clusterId; - } - - private void init() { - Resource resource = Resource.getDefault().toBuilder() - .put(ResourceAttributes.HOST_ID, String.valueOf(kafkaConfig.brokerId())) - .build(); - - labelMap.put(AttributeKey.stringKey("cluster_id"), clusterId); - labelMap.put(AttributeKey.stringKey("node_id"), String.valueOf(kafkaConfig.nodeId())); - labelMap.put(AttributeKey.stringKey("node_type"), StringUtils.join(kafkaConfig.getList(KafkaConfig.ProcessRolesProp()), ",")); - - SdkMeterProviderBuilder sdkMeterProvider = SdkMeterProvider.builder() - .setResource(resource); - - String exporterTypes = kafkaConfig.s3MetricsExporterType(); - if (StringUtils.isBlank(exporterTypes)) { - return; - } - String[] exporterTypeArray = exporterTypes.split(","); - for (String exporterType : exporterTypeArray) { - switch (exporterType) { - case "otlp": - initOTLPExporter(sdkMeterProvider, kafkaConfig); - break; - case "log": - initLogExporter(sdkMeterProvider, kafkaConfig); - break; - case "prometheus": - initPrometheusExporter(sdkMeterProvider, kafkaConfig); - break; - default: - LOGGER.error("illegal metrics exporter type: {}", exporterType); - break; - } - } - - openTelemetrySdk = OpenTelemetrySdk.builder() - .setMeterProvider(sdkMeterProvider.build()) - .setPropagators(ContextPropagators.create(TextMapPropagator.composite( - W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance()))) - .buildAndRegisterGlobal(); - Meter meter = openTelemetrySdk.getMeter("automq-for-kafka"); - S3StreamMetricsManager.initMetrics(meter, KAFKA_METRICS_PREFIX); - S3StreamMetricsManager.initAttributesBuilder(() -> { - AttributesBuilder builder = attributesBuilderSupplier.get(); - labelMap.forEach(builder::put); - return builder; - }); - } - - private void initOTLPExporter(SdkMeterProviderBuilder sdkMeterProvider, KafkaConfig kafkaConfig) { - String otlpExporterHost = kafkaConfig.s3MetricsExporterOTLPEndpoint(); - if (StringUtils.isBlank(otlpExporterHost)) { - LOGGER.error("illegal OTLP collector endpoint: {}", otlpExporterHost); - return; - } - if (!otlpExporterHost.startsWith("http://")) { - otlpExporterHost = "https://" + otlpExporterHost; - } - OtlpGrpcMetricExporterBuilder otlpExporterBuilder = OtlpGrpcMetricExporter.builder() - .setEndpoint(otlpExporterHost) - .setTimeout(Duration.ofMillis(30000)); - MetricReader periodicReader = PeriodicMetricReader.builder(otlpExporterBuilder.build()) - .setInterval(Duration.ofMillis(kafkaConfig.s3MetricsExporterReportIntervalMs())) - .build(); - sdkMeterProvider.registerMetricReader(periodicReader); - } - - private void initLogExporter(SdkMeterProviderBuilder sdkMeterProvider, KafkaConfig kafkaConfig) { - SLF4JBridgeHandler.removeHandlersForRootLogger(); - SLF4JBridgeHandler.install(); - MetricReader periodicReader = PeriodicMetricReader.builder(LoggingMetricExporter.create(AggregationTemporality.DELTA)) - .setInterval(Duration.ofMillis(kafkaConfig.s3MetricsExporterReportIntervalMs())) - .build(); - metricsLogger = java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()); - metricsLogger.setLevel(Level.FINEST); - sdkMeterProvider.registerMetricReader(periodicReader); - } - - private void initPrometheusExporter(SdkMeterProviderBuilder sdkMeterProvider, KafkaConfig kafkaConfig) { - String promExporterHost = kafkaConfig.s3MetricsExporterPromHost(); - int promExporterPort = kafkaConfig.s3MetricsExporterPromPort(); - if (StringUtils.isBlank(promExporterHost) || promExporterPort <= 0) { - LOGGER.error("illegal prometheus server address, host: {}, port: {}", promExporterHost, promExporterPort); - return; - } - prometheusHttpServer = PrometheusHttpServer.builder() - .setHost(promExporterHost) - .setPort(promExporterPort) - .build(); - sdkMeterProvider.registerMetricReader(prometheusHttpServer); - } - - public void shutdown() { - if (openTelemetrySdk != null) { - openTelemetrySdk.shutdown(); - } - if (prometheusHttpServer != null) { - prometheusHttpServer.shutdown(); - } - } -} diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java new file mode 100644 index 0000000000..cd8335db57 --- /dev/null +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.stream.s3.telemetry; + +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; +import com.automq.stream.s3.trace.context.TraceContext; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; + +public class ContextUtils { + public static FetchContext creaetFetchContext() { + return new FetchContext(TelemetryManager.isTracerEnabled, GlobalOpenTelemetry.getTracer(TelemetryConstants.TELEMETRY_SCOPE_NAME), Context.current()); + } + + public static AppendContext createAppendContext() { + return new AppendContext(TelemetryManager.isTracerEnabled, GlobalOpenTelemetry.getTracer(TelemetryConstants.TELEMETRY_SCOPE_NAME), Context.current()); + } + + public static TraceContext createTraceContext() { + return new TraceContext(TelemetryManager.isTracerEnabled, GlobalOpenTelemetry.getTracer(TelemetryConstants.TELEMETRY_SCOPE_NAME), Context.current()); + } + +} diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryConstants.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryConstants.java new file mode 100644 index 0000000000..af0d1d1664 --- /dev/null +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryConstants.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.stream.s3.telemetry; + +import io.opentelemetry.api.common.AttributeKey; + +public class TelemetryConstants { + public static final String TELEMETRY_SCOPE_NAME = "automq_for_kafka"; + public static final String KAFKA_METRICS_PREFIX = "kafka_stream_"; + public static final AttributeKey STREAM_ID_NAME = AttributeKey.longKey("streamId"); + public static final AttributeKey START_OFFSET_NAME = AttributeKey.longKey("startOffset"); + public static final AttributeKey END_OFFSET_NAME = AttributeKey.longKey("endOffset"); + public static final AttributeKey MAX_BYTES_NAME = AttributeKey.longKey("maxBytes"); +} diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java new file mode 100644 index 0000000000..57bc79efb7 --- /dev/null +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.stream.s3.telemetry; + +import com.automq.stream.s3.metrics.S3StreamMetricsManager; +import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.exporter.logging.LoggingMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; +import io.opentelemetry.instrumentation.runtimemetrics.java17.JfrFeature; +import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.OpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.semconv.ResourceAttributes; +import kafka.server.KafkaConfig; +import kafka.server.KafkaRaftServer; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; +import scala.collection.immutable.Set; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.logging.Level; + +public class TelemetryManager { + private static final Logger LOGGER = LoggerFactory.getLogger(TelemetryManager.class); + private static final Integer EXPORTER_TIMEOUT_MS = 5000; + public static boolean isTracerEnabled = false; + private static java.util.logging.Logger metricsLogger; + private final KafkaConfig kafkaConfig; + private final String clusterId; + private final Map, String> labelMap; + private final Supplier attributesBuilderSupplier; + private final List metricReaderList; + private RuntimeMetrics runtimeMetrics; + private OpenTelemetrySdk openTelemetrySdk; + private PrometheusHttpServer prometheusHttpServer; + + public TelemetryManager(KafkaConfig kafkaConfig, String clusterId) { + this.kafkaConfig = kafkaConfig; + this.clusterId = clusterId; + this.labelMap = new HashMap<>(); + this.metricReaderList = new ArrayList<>(); + this.attributesBuilderSupplier = Attributes::builder; + isTracerEnabled = kafkaConfig.s3TracerEnable(); + init(); + } + + private String buildServiceName() { + return clusterId + "_" + getNodeType() + "_" + kafkaConfig.nodeId(); + } + + private String getNodeType() { + Set roles = kafkaConfig.processRoles(); + if (roles.size() == 1) { + return roles.last().toString(); + } + return "server"; + } + + private void init() { + Resource resource = Resource.getDefault().toBuilder() + .put(ResourceAttributes.SERVICE_NAMESPACE, clusterId) + .put(ResourceAttributes.SERVICE_NAME, getNodeType()) + .put(ResourceAttributes.SERVICE_INSTANCE_ID, String.valueOf(kafkaConfig.nodeId())) + .build(); + + labelMap.put(AttributeKey.stringKey("cluster_id"), clusterId); + labelMap.put(AttributeKey.stringKey("node_type"), getNodeType()); + labelMap.put(AttributeKey.stringKey("node_id"), String.valueOf(kafkaConfig.nodeId())); + + OpenTelemetrySdkBuilder openTelemetrySdkBuilder = OpenTelemetrySdk.builder(); + + if (kafkaConfig.s3MetricsEnable()) { + SdkMeterProvider sdkMeterProvider = getMetricsProvider(resource); + if (sdkMeterProvider != null) { + openTelemetrySdkBuilder.setMeterProvider(sdkMeterProvider); + } + } + if (kafkaConfig.s3TracerEnable()) { + openTelemetrySdkBuilder.setTracerProvider(getTraceProvider(resource)); + } + + openTelemetrySdk = openTelemetrySdkBuilder + .setPropagators(ContextPropagators.create(TextMapPropagator.composite( + W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance()))) + .buildAndRegisterGlobal(); + + if (kafkaConfig.s3MetricsEnable()) { + // set JVM metrics opt-in to prevent metrics conflict. + System.setProperty("otel.semconv-stability.opt-in", "jvm"); + // JVM metrics + runtimeMetrics = RuntimeMetrics.builder(openTelemetrySdk) + .enableFeature(JfrFeature.GC_DURATION_METRICS) + .enableFeature(JfrFeature.CPU_UTILIZATION_METRICS) + .enableFeature(JfrFeature.MEMORY_POOL_METRICS) + .enableFeature(JfrFeature.MEMORY_ALLOCATION_METRICS) + .enableFeature(JfrFeature.NETWORK_IO_METRICS) + .build(); + + Meter meter = openTelemetrySdk.getMeter(TelemetryConstants.TELEMETRY_SCOPE_NAME); + S3StreamMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX); + S3StreamMetricsManager.initAttributesBuilder(() -> { + //TODO: cache and reuse attributes + AttributesBuilder builder = attributesBuilderSupplier.get(); + labelMap.forEach(builder::put); + return builder; + }); + } + + LOGGER.info("Instrument manager initialized with metrics: {}, trace: {} report interval: {}", + kafkaConfig.s3MetricsEnable(), kafkaConfig.s3TracerEnable(), kafkaConfig.s3ExporterReportIntervalMs()); + } + + private SdkTracerProvider getTraceProvider(Resource resource) { + OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() + .setEndpoint(kafkaConfig.s3ExporterOTLPEndpoint()) + .setTimeout(EXPORTER_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .build(); + + SpanProcessor spanProcessor = BatchSpanProcessor.builder(spanExporter) + .setExporterTimeout(EXPORTER_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .setScheduleDelay(kafkaConfig.s3SpanScheduledDelayMs(), TimeUnit.MILLISECONDS) + .setMaxExportBatchSize(kafkaConfig.s3SpanMaxBatchSize()) + .setMaxQueueSize(kafkaConfig.s3SpanMaxQueueSize()) + .build(); + + return SdkTracerProvider.builder() + .addSpanProcessor(spanProcessor) + .setResource(resource) + .build(); + } + + private SdkMeterProvider getMetricsProvider(Resource resource) { + SdkMeterProviderBuilder sdkMeterProviderBuilder = SdkMeterProvider.builder().setResource(resource); + String exporterTypes = kafkaConfig.s3MetricsExporterType(); + if (StringUtils.isBlank(exporterTypes)) { + LOGGER.info("Metrics exporter not configured"); + return null; + } + String[] exporterTypeArray = exporterTypes.split(","); + for (String exporterType : exporterTypeArray) { + switch (exporterType) { + case "otlp": + initOTLPExporter(sdkMeterProviderBuilder, kafkaConfig); + break; + case "log": + initLogExporter(sdkMeterProviderBuilder, kafkaConfig); + break; + case "prometheus": + initPrometheusExporter(sdkMeterProviderBuilder, kafkaConfig); + break; + default: + LOGGER.error("illegal metrics exporter type: {}", exporterType); + break; + } + } + return sdkMeterProviderBuilder.build(); + } + + private void initOTLPExporter(SdkMeterProviderBuilder sdkMeterProviderBuilder, KafkaConfig kafkaConfig) { + String otlpExporterHost = kafkaConfig.s3ExporterOTLPEndpoint(); + if (StringUtils.isBlank(otlpExporterHost)) { + LOGGER.error("illegal OTLP collector endpoint: {}", otlpExporterHost); + return; + } + if (!otlpExporterHost.startsWith("http://")) { + otlpExporterHost = "https://" + otlpExporterHost; + } + OtlpGrpcMetricExporterBuilder otlpExporterBuilder = OtlpGrpcMetricExporter.builder() + .setEndpoint(otlpExporterHost) + .setTimeout(Duration.ofMillis(30000)); + MetricReader periodicReader = PeriodicMetricReader.builder(otlpExporterBuilder.build()) + .setInterval(Duration.ofMillis(kafkaConfig.s3ExporterReportIntervalMs())) + .build(); + metricReaderList.add(periodicReader); + sdkMeterProviderBuilder.registerMetricReader(periodicReader); + LOGGER.info("OTLP exporter registered, endpoint: {}", otlpExporterHost); + } + + private void initLogExporter(SdkMeterProviderBuilder sdkMeterProviderBuilder, KafkaConfig kafkaConfig) { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + MetricReader periodicReader = PeriodicMetricReader.builder(LoggingMetricExporter.create(AggregationTemporality.DELTA)) + .setInterval(Duration.ofMillis(kafkaConfig.s3ExporterReportIntervalMs())) + .build(); + metricReaderList.add(periodicReader); + metricsLogger = java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()); + metricsLogger.setLevel(Level.FINEST); + sdkMeterProviderBuilder.registerMetricReader(periodicReader); + LOGGER.info("Log exporter registered"); + } + + private void initPrometheusExporter(SdkMeterProviderBuilder sdkMeterProviderBuilder, KafkaConfig kafkaConfig) { + String promExporterHost = kafkaConfig.s3MetricsExporterPromHost(); + int promExporterPort = kafkaConfig.s3MetricsExporterPromPort(); + if (StringUtils.isBlank(promExporterHost) || promExporterPort <= 0) { + LOGGER.error("illegal prometheus server address, host: {}, port: {}", promExporterHost, promExporterPort); + return; + } + prometheusHttpServer = PrometheusHttpServer.builder() + .setHost(promExporterHost) + .setPort(promExporterPort) + .build(); + sdkMeterProviderBuilder.registerMetricReader(prometheusHttpServer); + LOGGER.info("Prometheus exporter registered, host: {}, port: {}", promExporterHost, promExporterPort); + } + + public void shutdown() { + if (prometheusHttpServer != null) { + prometheusHttpServer.forceFlush(); + prometheusHttpServer.close(); + } + for (MetricReader metricReader : metricReaderList) { + metricReader.forceFlush(); + try { + metricReader.close(); + } catch (IOException ignored) { + + } + } + if (runtimeMetrics != null) { + runtimeMetrics.close(); + } + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } + } +} diff --git a/core/src/main/scala/kafka/log/streamaspect/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/streamaspect/AlwaysSuccessClient.java index 14fc590315..999a47d6e2 100644 --- a/core/src/main/scala/kafka/log/streamaspect/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/streamaspect/AlwaysSuccessClient.java @@ -23,13 +23,14 @@ import com.automq.stream.api.FetchResult; import com.automq.stream.api.KVClient; import com.automq.stream.api.OpenStreamOptions; -import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.Stream; import com.automq.stream.api.StreamClient; import com.automq.stream.api.exceptions.ErrorCode; import com.automq.stream.api.exceptions.FastReadFailFastException; import com.automq.stream.api.exceptions.StreamClientException; +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; import com.automq.stream.utils.FutureUtil; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; @@ -253,22 +254,22 @@ public long nextOffset() { } @Override - public CompletableFuture append(RecordBatch recordBatch) { + public CompletableFuture append(AppendContext context, RecordBatch recordBatch) { CompletableFuture cf = new CompletableFuture<>(); if (appendCallbackAsync) { - append0(recordBatch, cf); + append0(context, recordBatch, cf); } else { - append0WithSyncCallback(recordBatch, cf); + append0WithSyncCallback(context, recordBatch, cf); } return cf; } - private void append0(RecordBatch recordBatch, CompletableFuture cf) { - stream.append(recordBatch).whenCompleteAsync((rst, ex) -> FutureUtil.suppress(() -> { + private void append0(AppendContext context, RecordBatch recordBatch, CompletableFuture cf) { + stream.append(context, recordBatch).whenCompleteAsync((rst, ex) -> FutureUtil.suppress(() -> { if (ex != null) { if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) { LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex); - appendRetryScheduler.schedule(() -> append0(recordBatch, cf), 3, TimeUnit.SECONDS); + appendRetryScheduler.schedule(() -> append0(context, recordBatch, cf), 3, TimeUnit.SECONDS); } } else { cf.complete(rst); @@ -280,12 +281,12 @@ private void append0(RecordBatch recordBatch, CompletableFuture cf * Append to stream without using async callback threadPools. * Used for tests only. */ - private void append0WithSyncCallback(RecordBatch recordBatch, CompletableFuture cf) { - stream.append(recordBatch).whenComplete((rst, ex) -> FutureUtil.suppress(() -> { + private void append0WithSyncCallback(AppendContext context, RecordBatch recordBatch, CompletableFuture cf) { + stream.append(context, recordBatch).whenComplete((rst, ex) -> FutureUtil.suppress(() -> { if (ex != null) { if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) { LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex); - appendRetryScheduler.schedule(() -> append0(recordBatch, cf), 3, TimeUnit.SECONDS); + appendRetryScheduler.schedule(() -> append0(context, recordBatch, cf), 3, TimeUnit.SECONDS); } } else { cf.complete(rst); @@ -294,15 +295,15 @@ private void append0WithSyncCallback(RecordBatch recordBatch, CompletableFuture< } @Override - public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint, ReadOptions readOptions) { + public CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) { CompletableFuture cf = new CompletableFuture<>(); Timeout timeout = fetchTimeout.newTimeout(t -> LOGGER.warn("fetch timeout, stream[{}] [{}, {})", streamId(), startOffset, endOffset), 1, TimeUnit.MINUTES); - fetch0(startOffset, endOffset, maxBytesHint, readOptions, cf); + fetch0(context, startOffset, endOffset, maxBytesHint, cf); return cf.whenComplete((rst, ex) -> timeout.cancel()); } - private void fetch0(long startOffset, long endOffset, int maxBytesHint, ReadOptions readOptions, CompletableFuture cf) { - stream.fetch(startOffset, endOffset, maxBytesHint, readOptions).whenCompleteAsync((rst, e) -> FutureUtil.suppress(() -> { + private void fetch0(FetchContext context, long startOffset, long endOffset, int maxBytesHint, CompletableFuture cf) { + stream.fetch(context, startOffset, endOffset, maxBytesHint).whenCompleteAsync((rst, e) -> FutureUtil.suppress(() -> { Throwable ex = FutureUtil.cause(e); if (ex != null) { if (!(ex instanceof FastReadFailFastException)) { diff --git a/core/src/main/scala/kafka/log/streamaspect/DefaultElasticStreamSlice.java b/core/src/main/scala/kafka/log/streamaspect/DefaultElasticStreamSlice.java index bfe11a7674..3da9ead788 100644 --- a/core/src/main/scala/kafka/log/streamaspect/DefaultElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/streamaspect/DefaultElasticStreamSlice.java @@ -19,10 +19,11 @@ import com.automq.stream.api.AppendResult; import com.automq.stream.api.FetchResult; -import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; import com.automq.stream.api.Stream; +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; import com.automq.stream.utils.FutureUtil; import org.apache.kafka.common.utils.Utils; @@ -71,18 +72,18 @@ public DefaultElasticStreamSlice(Stream stream, SliceRange sliceRange, ExecutorS } @Override - public CompletableFuture append(RecordBatch recordBatch) { + public CompletableFuture append(AppendContext context, RecordBatch recordBatch) { if (sealed) { return FutureUtil.failedFuture(new IllegalStateException("stream segment " + this + " is sealed")); } nextOffset += recordBatch.count(); - return stream.append(recordBatch).thenApply(AppendResultWrapper::new); + return stream.append(context, recordBatch).thenApply(AppendResultWrapper::new); } @Override - public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint, ReadOptions readOptions) { + public CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) { long fixedStartOffset = Utils.max(startOffset, 0); - return stream.fetch(startOffsetInStream + fixedStartOffset, startOffsetInStream + endOffset, maxBytesHint, readOptions) + return stream.fetch(context, startOffsetInStream + fixedStartOffset, startOffsetInStream + endOffset, maxBytesHint) .thenApplyAsync(FetchResultWrapper::new, executorService); } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index b17daece55..da2acfe537 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -19,12 +19,18 @@ import com.automq.stream.api.ReadOptions; import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; +import com.automq.stream.s3.trace.TraceUtils; import com.automq.stream.utils.FutureUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import com.automq.stream.api.FetchResult; import com.automq.stream.api.RecordBatchWithContext; +import io.opentelemetry.api.common.Attributes; +import kafka.log.stream.s3.telemetry.ContextUtils; +import kafka.log.stream.s3.telemetry.TelemetryConstants; import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.ConvertedRecords; @@ -105,29 +111,41 @@ public long appendedOffset() { public CompletableFuture read(long startOffset, long maxOffset, int maxSize) { if (ReadHint.isReadAll()) { ReadOptions readOptions = ReadOptions.builder().fastRead(ReadHint.isFastRead()).pooledBuf(true).build(); - return readAll0(startOffset, maxOffset, maxSize, readOptions); + FetchContext fetchContext = ContextUtils.creaetFetchContext(); + fetchContext.setReadOptions(readOptions); + Attributes attributes = Attributes.builder() + .put(TelemetryConstants.START_OFFSET_NAME, startOffset) + .put(TelemetryConstants.END_OFFSET_NAME, maxOffset) + .put(TelemetryConstants.MAX_BYTES_NAME, maxSize) + .build(); + try { + return TraceUtils.runWithSpanAsync(fetchContext, attributes, "ElasticLogFileRecords::read", + () -> readAll0(fetchContext, startOffset, maxOffset, maxSize)); + } catch (Throwable ex) { + return CompletableFuture.failedFuture(ex); + } } else { return CompletableFuture.completedFuture(new BatchIteratorRecordsAdaptor(this, startOffset, maxOffset, maxSize)); } } - private CompletableFuture readAll0(long startOffset, long maxOffset, int maxSize, ReadOptions readOptions) { + private CompletableFuture readAll0(FetchContext context, long startOffset, long maxOffset, int maxSize) { // calculate the relative offset in the segment, which may start from 0. long nextFetchOffset = startOffset - baseOffset; long endOffset = Utils.min(this.committedOffset.get(), maxOffset) - baseOffset; if (nextFetchOffset >= endOffset) { return CompletableFuture.completedFuture(null); } - return fetch0(nextFetchOffset, endOffset, maxSize, readOptions) - .thenApply(rst -> PooledMemoryRecords.of(rst, readOptions.pooledBuf())); + return fetch0(context, nextFetchOffset, endOffset, maxSize) + .thenApply(rst -> PooledMemoryRecords.of(rst, context.readOptions().pooledBuf())); } - private CompletableFuture> fetch0(long startOffset, long endOffset, int maxSize, ReadOptions readOptions) { + private CompletableFuture> fetch0(FetchContext context, long startOffset, long endOffset, int maxSize) { if (startOffset >= endOffset || maxSize <= 0) { return CompletableFuture.completedFuture(new LinkedList<>()); } int adjustedMaxSize = Math.min(maxSize, 1024 * 1024); - return streamSlice.fetch(startOffset, endOffset, adjustedMaxSize, readOptions) + return streamSlice.fetch(context, startOffset, endOffset, adjustedMaxSize) .thenCompose(rst -> { long nextFetchOffset = startOffset; int readSize = 0; @@ -141,7 +159,7 @@ private CompletableFuture> fetch0(long startOffset, long } readSize += recordBatchWithContext.rawPayload().remaining(); } - return fetch0(nextFetchOffset, endOffset, maxSize - readSize, readOptions) + return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize) .thenApply(rstList -> { // add to first since we need to reverse the order. rstList.addFirst(rst); @@ -171,7 +189,16 @@ public int append(MemoryRecords records, long lastOffset) throws IOException { // Note that the calculation of count requires strong consistency between nextOffset and the baseOffset of records. int count = (int) (lastOffset - nextOffset.get()); com.automq.stream.DefaultRecordBatch batch = new com.automq.stream.DefaultRecordBatch(count, 0, Collections.emptyMap(), records.buffer()); - CompletableFuture cf = streamSlice.append(batch); + + AppendContext context = ContextUtils.createAppendContext(); + CompletableFuture cf; + try { + cf = TraceUtils.runWithSpanAsync(context, Attributes.empty(), "ElasticLogFileRecords::append", + () -> streamSlice.append(context, batch)); + } catch (Throwable ex) { + throw new IOException("Failed to append to stream " + streamSlice.stream().streamId(), ex); + } + nextOffset.set(lastOffset); size.getAndAdd(appendSize); cf.whenComplete((rst, e) -> { @@ -485,7 +512,7 @@ private void ensureAllLoaded() throws IOException { } Records records = null; try { - records = elasticLogFileRecords.readAll0(startOffset, maxOffset, fetchSize, ReadOptions.DEFAULT).get(); + records = elasticLogFileRecords.readAll0(FetchContext.DEFAULT, startOffset, maxOffset, fetchSize).get(); } catch (Throwable t) { throw new IOException(FutureUtil.cause(t)); } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticStreamSlice.java b/core/src/main/scala/kafka/log/streamaspect/ElasticStreamSlice.java index ae28190389..8a132c4788 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticStreamSlice.java @@ -19,9 +19,10 @@ import com.automq.stream.api.AppendResult; import com.automq.stream.api.FetchResult; -import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.Stream; +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; import java.util.concurrent.CompletableFuture; @@ -37,7 +38,11 @@ public interface ElasticStreamSlice { * @param recordBatch {@link RecordBatch} * @return {@link AppendResult} */ - CompletableFuture append(RecordBatch recordBatch); + CompletableFuture append(AppendContext context, RecordBatch recordBatch); + + default CompletableFuture append(RecordBatch recordBatch) { + return append(AppendContext.DEFAULT, recordBatch); + } /** * Fetch record batch from stream slice. @@ -47,10 +52,10 @@ public interface ElasticStreamSlice { * @param maxBytesHint max fetch data size hint, the real return data size may be larger than maxBytesHint. * @return {@link FetchResult} */ - CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint, ReadOptions readOptions); + CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint); default CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint) { - return fetch(startOffset, endOffset, maxBytesHint, ReadOptions.DEFAULT); + return fetch(FetchContext.DEFAULT, startOffset, endOffset, maxBytesHint); } default CompletableFuture fetch(long startOffset, long endOffset) { diff --git a/core/src/main/scala/kafka/log/streamaspect/LazyStream.java b/core/src/main/scala/kafka/log/streamaspect/LazyStream.java index 75145d8720..6846303fc6 100644 --- a/core/src/main/scala/kafka/log/streamaspect/LazyStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/LazyStream.java @@ -21,10 +21,11 @@ import com.automq.stream.api.CreateStreamOptions; import com.automq.stream.api.FetchResult; import com.automq.stream.api.OpenStreamOptions; -import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.Stream; import com.automq.stream.api.StreamClient; +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; import com.automq.stream.utils.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +107,7 @@ public long nextOffset() { @Override - public synchronized CompletableFuture append(RecordBatch recordBatch) { + public synchronized CompletableFuture append(AppendContext context, RecordBatch recordBatch) { if (this.inner == NOOP_STREAM) { try { this.inner = client.createAndOpenStream(CreateStreamOptions.newBuilder().replicaCount(replicaCount) @@ -117,7 +118,7 @@ public synchronized CompletableFuture append(RecordBatch recordBat return FutureUtil.failedFuture(new IOException(e)); } } - return inner.append(recordBatch); + return inner.append(context, recordBatch); } @Override @@ -126,8 +127,8 @@ public CompletableFuture trim(long newStartOffset) { } @Override - public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint, ReadOptions readOptions) { - return inner.fetch(startOffset, endOffset, maxBytesHint, readOptions); + public CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) { + return inner.fetch(context, startOffset, endOffset, maxBytesHint); } @Override @@ -179,7 +180,7 @@ public long nextOffset() { } @Override - public CompletableFuture append(RecordBatch recordBatch) { + public CompletableFuture append(AppendContext context, RecordBatch recordBatch) { return FutureUtil.failedFuture(new UnsupportedOperationException("noop stream")); } @@ -189,7 +190,7 @@ public CompletableFuture trim(long newStartOffset) { } @Override - public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint, ReadOptions readOptions) { + public CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) { return CompletableFuture.completedFuture(Collections::emptyList); } diff --git a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java index 99e5bfea21..95bef9fa19 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java +++ b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java @@ -27,11 +27,12 @@ import com.automq.stream.api.KeyValue.Key; import com.automq.stream.api.KeyValue.Value; import com.automq.stream.api.OpenStreamOptions; -import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; import com.automq.stream.api.Stream; import com.automq.stream.api.StreamClient; +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -98,14 +99,14 @@ public long nextOffset() { } @Override - public synchronized CompletableFuture append(RecordBatch recordBatch) { + public synchronized CompletableFuture append(AppendContext context, RecordBatch recordBatch) { long baseOffset = nextOffsetAlloc.getAndAdd(recordBatch.count()); recordMap.put(baseOffset, new RecordBatchWithContextWrapper(recordBatch, baseOffset)); return CompletableFuture.completedFuture(() -> baseOffset); } @Override - public CompletableFuture fetch(long startOffset, long endOffset, int maxSizeHint, ReadOptions readOptions) { + public CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxSizeHint) { Long floorKey = recordMap.floorKey(startOffset); if (floorKey == null) { return CompletableFuture.completedFuture(ArrayList::new); diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index f39921b27d..8e4cdb86b3 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -17,14 +17,15 @@ package kafka.log.streamaspect; +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; +import io.netty.buffer.Unpooled; import com.automq.stream.DefaultRecordBatch; import com.automq.stream.api.AppendResult; import com.automq.stream.api.FetchResult; -import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; import com.automq.stream.api.Stream; -import io.netty.buffer.Unpooled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,7 +112,7 @@ public long nextOffset() { } @Override - public CompletableFuture append(RecordBatch batch) { + public CompletableFuture append(AppendContext context, RecordBatch batch) { throw new UnsupportedOperationException("append record batch is not supported in meta stream"); } @@ -148,8 +149,8 @@ private synchronized CompletableFuture append0(MetaKeyValue kv) { } @Override - public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint, ReadOptions readOptions) { - return innerStream.fetch(startOffset, endOffset, maxBytesHint, readOptions); + public CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) { + return innerStream.fetch(context, startOffset, endOffset, maxBytesHint); } @Override diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index f7116dd5ff..0c212886cc 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -26,6 +26,7 @@ import kafka.cluster.Broker.ServerInfo import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter} import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator} import kafka.log.LogManager +import kafka.log.stream.s3.telemetry.TelemetryManager import kafka.log.streamaspect.ElasticLogManager import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.KafkaRaftManager @@ -151,6 +152,8 @@ class BrokerServer( var controllerNodeProvider: RaftControllerNodeProvider = _ + var telemetryManager: TelemetryManager = _ + def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = { @@ -207,6 +210,7 @@ class BrokerServer( // AutoMQ for Kafka inject start // ElasticLogManager should be marked before LogManager is created. ElasticLogManager.enable(config.elasticStreamEnabled) + telemetryManager = new TelemetryManager(config, sharedServer.metaProps.clusterId) // AutoMQ for Kafka inject end // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery @@ -603,8 +607,12 @@ class BrokerServer( CoreUtils.swallow(logManager.shutdown(), this) // log manager need clientToControllerChannelManager to send request to controller. - if (clientToControllerChannelManager != null) + if (clientToControllerChannelManager != null) { CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) + } + if (telemetryManager != null) { + CoreUtils.swallow(telemetryManager.shutdown(), this) + } // AutoMQ for Kafka inject end if (quotaManagers != null) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 8588a87f2d..e5885ddbf6 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -322,6 +322,9 @@ object Defaults { val S3NetworkBaselineBandwidth: Long = 100 * 1024 * 1024 // 100MB/s val S3RefillPeriodMs: Int = 1000 // 1s val S3MetricsExporterReportIntervalMs = 60000 // 1min + val S3SpanScheduledDelayMs = 1000 // 1s + val S3SpanMaxQueueSize = 5120 + val S3SpanMaxBatchSize = 1024 } object KafkaConfig { @@ -722,11 +725,16 @@ object KafkaConfig { val S3NetworkBaselineBandwidthProp = "s3.network.baseline.bandwidth" val S3RefillPeriodMsProp = "s3.network.refill.period.ms" val S3FailoverEnableProp = "s3.failover.enable" - val S3MetricsExporterTypeProp = "s3.metrics.exporter.type" - val S3MetricsExporterOTLPEndpointProp = "s3.metrics.exporter.otlp.endpoint" + val S3MetricsEnableProp = "s3.telemetry.metrics.enable" + val S3TracerEnableProp = "s3.telemetry.tracer.enable" + val S3ExporterOTLPEndpointProp = "s3.telemetry.exporter.otlp.endpoint" + val S3ExporterReportIntervalMsProp = "s3.telemetry.exporter.report.interval.ms" + val S3MetricsExporterTypeProp = "s3.telemetry.metrics.exporter.type" val S3MetricsExporterPromHostProp = "s3.metrics.exporter.prom.host" val S3MetricsExporterPromPortProp = "s3.metrics.exporter.prom.port" - val S3MetricsExporterReportIntervalMsProp = "s3.metrics.exporter.report.interval.ms" + val S3SpanScheduledDelayMsProp = "s3.telemetry.tracer.span.scheduled.delay.ms" + val S3SpanMaxQueueSizeProp = "s3.telemetry.tracer.span.max.queue.size" + val S3SpanMaxBatchSizeProp = "s3.telemetry.tracer.span.max.batch.size" val S3EndpointDoc = "The S3 endpoint, ex. https://s3.{region}.amazonaws.com." val S3RegionDoc = "The S3 region, ex. us-east-1." @@ -765,11 +773,16 @@ object KafkaConfig { val S3NetworkBaselineBandwidthDoc = "The network baseline bandwidth in Bytes/s." val S3RefillPeriodMsDoc = "The network bandwidth token refill period in milliseconds." val S3FailoverEnableDoc = "Failover mode: if enable, the controller will scan failed node and failover the failed node" + val S3MetricsEnableDoc = "Whether to enable metrics exporter for s3stream." + val S3TracerEnableDoc = "Whether to enable tracer exporter for s3stream." val S3MetricsExporterTypeDoc = "The enabled S3 metrics exporters type, seperated by comma. Supported type: otlp, prometheus, log" - val S3MetricsExporterOTLPEndpointDoc = "The endpoint of OTLP collector" + val S3ExporterOTLPEndpointDoc = "The endpoint of OTLP collector" + val S3ExporterReportIntervalMsDoc = "The interval in milliseconds to report telemetry" val S3MetricsExporterPromHostDoc = "The host address of Prometheus http server to expose the metrics" val S3MetricsExporterPromPortDoc = "The port of Prometheus http server to expose the metrics" - val S3MetricsExporterReportIntervalMsDoc = "The interval in milliseconds to report metrics" + val S3SpanScheduledDelayMsDoc = "The delay in milliseconds to export queued spans" + val S3SpanMaxQueueSizeDoc = "The max number of spans that can be queued before dropped" + val S3SpanMaxBatchSizeDoc = "The max number of spans that can be exported in a single batch" // AutoMQ for Kafka inject end @@ -1605,11 +1618,16 @@ object KafkaConfig { .define(S3NetworkBaselineBandwidthProp, LONG, Defaults.S3NetworkBaselineBandwidth, MEDIUM, S3NetworkBaselineBandwidthDoc) .define(S3RefillPeriodMsProp, INT, Defaults.S3RefillPeriodMs, MEDIUM, S3RefillPeriodMsDoc) .define(S3FailoverEnableProp, BOOLEAN, false, MEDIUM, S3FailoverEnableDoc) + .define(S3MetricsEnableProp, BOOLEAN, true, MEDIUM, S3MetricsEnableDoc) + .define(S3TracerEnableProp, BOOLEAN, false, MEDIUM, S3TracerEnableDoc) .define(S3MetricsExporterTypeProp, STRING, null, MEDIUM, S3MetricsExporterTypeDoc) - .define(S3MetricsExporterOTLPEndpointProp, STRING, null, MEDIUM, S3MetricsExporterOTLPEndpointDoc) + .define(S3ExporterOTLPEndpointProp, STRING, null, MEDIUM, S3ExporterOTLPEndpointDoc) .define(S3MetricsExporterPromHostProp, STRING, null, MEDIUM, S3MetricsExporterPromHostDoc) .define(S3MetricsExporterPromPortProp, INT, 0, MEDIUM, S3MetricsExporterPromPortDoc) - .define(S3MetricsExporterReportIntervalMsProp, INT, Defaults.S3MetricsExporterReportIntervalMs, MEDIUM, S3MetricsExporterReportIntervalMsDoc) + .define(S3ExporterReportIntervalMsProp, INT, Defaults.S3MetricsExporterReportIntervalMs, MEDIUM, S3ExporterReportIntervalMsDoc) + .define(S3SpanScheduledDelayMsProp, INT, Defaults.S3SpanScheduledDelayMs, MEDIUM, S3SpanScheduledDelayMsDoc) + .define(S3SpanMaxQueueSizeProp, INT, Defaults.S3SpanMaxQueueSize, MEDIUM, S3SpanMaxQueueSizeDoc) + .define(S3SpanMaxBatchSizeProp, INT, Defaults.S3SpanMaxBatchSize, MEDIUM, S3SpanMaxBatchSizeDoc) // AutoMQ for Kafka inject end } @@ -2182,11 +2200,17 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3NetworkBaselineBandwidthProp = getLong(KafkaConfig.S3NetworkBaselineBandwidthProp) val s3RefillPeriodMsProp = getInt(KafkaConfig.S3RefillPeriodMsProp) val s3FailoverEnable = getBoolean(KafkaConfig.S3FailoverEnableProp) + val s3MetricsEnable = getBoolean(KafkaConfig.S3MetricsEnableProp) + val s3TracerEnable = getBoolean(KafkaConfig.S3TracerEnableProp) val s3MetricsExporterType = getString(KafkaConfig.S3MetricsExporterTypeProp) - val s3MetricsExporterOTLPEndpoint = getString(KafkaConfig.S3MetricsExporterOTLPEndpointProp) + val s3ExporterOTLPEndpoint = getString(KafkaConfig.S3ExporterOTLPEndpointProp) + val s3ExporterReportIntervalMs = getInt(KafkaConfig.S3ExporterReportIntervalMsProp) val s3MetricsExporterPromHost = getString(KafkaConfig.S3MetricsExporterPromHostProp) val s3MetricsExporterPromPort = getInt(KafkaConfig.S3MetricsExporterPromPortProp) - val s3MetricsExporterReportIntervalMs = getInt(KafkaConfig.S3MetricsExporterReportIntervalMsProp) + val s3SpanScheduledDelayMs = getInt(KafkaConfig.S3SpanScheduledDelayMsProp) + val s3SpanMaxQueueSize = getInt(KafkaConfig.S3SpanMaxQueueSizeProp) + val s3SpanMaxBatchSize = getInt(KafkaConfig.S3SpanMaxBatchSizeProp) + // AutoMQ for Kafka inject end def addReconfigurable(reconfigurable: Reconfigurable): Unit = { diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 665c57846b..b759bb2c2a 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.log.stream.s3.metrics.MetricsExporter import kafka.raft.KafkaRaftManager import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole} import kafka.server.Server.MetricsPrefix @@ -101,7 +100,6 @@ class SharedServer( val controllerConfig = new KafkaConfig(sharedServerConfig.props, false, None) // AutoMQ for Kafka injection start ElasticStreamSwitch.setSwitch(sharedServerConfig.elasticStreamEnabled) - MetricsExporter.setClusterId(metaProps.clusterId) // AutoMQ for Kafka injection end @volatile var metrics: Metrics = _metrics @volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _ diff --git a/core/src/test/java/kafka/log/streamaspect/AlwaysSuccessClientTest.java b/core/src/test/java/kafka/log/streamaspect/AlwaysSuccessClientTest.java index e32fcf2ce0..306109320d 100644 --- a/core/src/test/java/kafka/log/streamaspect/AlwaysSuccessClientTest.java +++ b/core/src/test/java/kafka/log/streamaspect/AlwaysSuccessClientTest.java @@ -22,12 +22,13 @@ import com.automq.stream.api.CreateStreamOptions; import com.automq.stream.api.FetchResult; import com.automq.stream.api.OpenStreamOptions; -import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; import com.automq.stream.api.Stream; import com.automq.stream.api.StreamClient; import com.automq.stream.api.exceptions.StreamClientException; +import com.automq.stream.s3.context.AppendContext; +import com.automq.stream.s3.context.FetchContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -296,7 +297,7 @@ public long nextOffset() { } @Override - public synchronized CompletableFuture append(RecordBatch recordBatch) { + public synchronized CompletableFuture append(AppendContext context, RecordBatch recordBatch) { Exception exception = exceptionHint.generateException(); if (exception != null) { exceptionHint = exceptionHint.moveToNext(); @@ -308,7 +309,7 @@ public synchronized CompletableFuture append(RecordBatch recordBat } @Override - public CompletableFuture fetch(long startOffset, long endOffset, int maxSizeHint, ReadOptions readOptions) { + public CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxSizeHint) { Exception exception = exceptionHint.generateException(); if (exception != null) { exceptionHint = exceptionHint.moveToNext(); diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index c2466966ab..971e3a1303 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -128,10 +128,10 @@ versions += [ zookeeper: "3.6.3", zstd: "1.5.2-1", commonLang: "3.12.0", - s3stream: "0.8.0-SNAPSHOT", + s3stream: "0.10.0-SNAPSHOT", opentelemetry: "1.32.0", opentelemetryAlpha: "1.32.0-alpha", - opentelemetrySemconv: "1.30.1-alpha" + oshi: "6.4.7" ] libs += [ activation: "javax.activation:activation:$versions.activation", @@ -227,11 +227,12 @@ libs += [ commonLang: "org.apache.commons:commons-lang3:$versions.commonLang", nettyHttp2: "io.netty:netty-codec-http2:$versions.netty", s3stream: "com.automq.elasticstream:s3stream:$versions.s3stream", - opentelemetryApi: "io.opentelemetry:opentelemetry-api:$versions.opentelemetry", + opentelemetryJava17: "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java17:$versions.opentelemetryAlpha", + opentelemetryOshi: "io.opentelemetry.instrumentation:opentelemetry-oshi:$versions.opentelemetryAlpha", opentelemetrySdk: "io.opentelemetry:opentelemetry-sdk:$versions.opentelemetry", opentelemetrySdkMetrics: "io.opentelemetry:opentelemetry-sdk-metrics:$versions.opentelemetry", opentelemetryExporterLogging: "io.opentelemetry:opentelemetry-exporter-logging:$versions.opentelemetry", - opentelemetrySemconv: "io.opentelemetry:opentelemetry-semconv:$versions.opentelemetrySemconv", opentelemetryExporterProm: "io.opentelemetry:opentelemetry-exporter-prometheus:$versions.opentelemetryAlpha", opentelemetryExporterOTLP: "io.opentelemetry:opentelemetry-exporter-otlp:$versions.opentelemetry", + oshi: "com.github.oshi:oshi-core-java11:$versions.oshi" ] From 086d13e04e7a085de5d6f43d7c0086dec48e3f21 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 28 Dec 2023 16:43:29 +0800 Subject: [PATCH 2/5] feat(core): prevent multi initilization of opentelemetry SDK in single process Signed-off-by: Shichao Nie --- .../log/stream/s3/telemetry/ContextUtils.java | 15 +++++++++++---- .../log/stream/s3/telemetry/TelemetryManager.java | 12 ++++++------ 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java index cd8335db57..1362507b89 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java @@ -20,20 +20,27 @@ import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; import com.automq.stream.s3.trace.context.TraceContext; -import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.OpenTelemetrySdk; public class ContextUtils { public static FetchContext creaetFetchContext() { - return new FetchContext(TelemetryManager.isTracerEnabled, GlobalOpenTelemetry.getTracer(TelemetryConstants.TELEMETRY_SCOPE_NAME), Context.current()); + return new FetchContext(createTraceContext()); } public static AppendContext createAppendContext() { - return new AppendContext(TelemetryManager.isTracerEnabled, GlobalOpenTelemetry.getTracer(TelemetryConstants.TELEMETRY_SCOPE_NAME), Context.current()); + return new AppendContext(createTraceContext()); } public static TraceContext createTraceContext() { - return new TraceContext(TelemetryManager.isTracerEnabled, GlobalOpenTelemetry.getTracer(TelemetryConstants.TELEMETRY_SCOPE_NAME), Context.current()); + OpenTelemetrySdk openTelemetrySdk = TelemetryManager.getOpenTelemetrySdk(); + boolean isTraceEnabled = openTelemetrySdk != null && TelemetryManager.isTracerEnabled; + Tracer tracer = null; + if (isTraceEnabled) { + tracer = openTelemetrySdk.getTracer(TelemetryConstants.TELEMETRY_SCOPE_NAME); + } + return new TraceContext(isTraceEnabled, tracer, Context.current()); } } diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java index 57bc79efb7..d8342b7cfe 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java @@ -68,13 +68,13 @@ public class TelemetryManager { private static final Integer EXPORTER_TIMEOUT_MS = 5000; public static boolean isTracerEnabled = false; private static java.util.logging.Logger metricsLogger; + private static OpenTelemetrySdk openTelemetrySdk; private final KafkaConfig kafkaConfig; private final String clusterId; private final Map, String> labelMap; private final Supplier attributesBuilderSupplier; private final List metricReaderList; private RuntimeMetrics runtimeMetrics; - private OpenTelemetrySdk openTelemetrySdk; private PrometheusHttpServer prometheusHttpServer; public TelemetryManager(KafkaConfig kafkaConfig, String clusterId) { @@ -87,10 +87,6 @@ public TelemetryManager(KafkaConfig kafkaConfig, String clusterId) { init(); } - private String buildServiceName() { - return clusterId + "_" + getNodeType() + "_" + kafkaConfig.nodeId(); - } - private String getNodeType() { Set roles = kafkaConfig.processRoles(); if (roles.size() == 1) { @@ -125,7 +121,7 @@ private void init() { openTelemetrySdk = openTelemetrySdkBuilder .setPropagators(ContextPropagators.create(TextMapPropagator.composite( W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance()))) - .buildAndRegisterGlobal(); + .build(); if (kafkaConfig.s3MetricsEnable()) { // set JVM metrics opt-in to prevent metrics conflict. @@ -153,6 +149,10 @@ private void init() { kafkaConfig.s3MetricsEnable(), kafkaConfig.s3TracerEnable(), kafkaConfig.s3ExporterReportIntervalMs()); } + public static OpenTelemetrySdk getOpenTelemetrySdk() { + return openTelemetrySdk; + } + private SdkTracerProvider getTraceProvider(Resource resource) { OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() .setEndpoint(kafkaConfig.s3ExporterOTLPEndpoint()) From c9d5fdd2131c908cdbeea279497fbaae4a36dd53 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 28 Dec 2023 18:38:26 +0800 Subject: [PATCH 3/5] feat(core): downgrade to otel instrument java8 for compatibility Signed-off-by: Shichao Nie --- build.gradle | 3 +- .../stream/s3/telemetry/TelemetryManager.java | 36 ++++++++++--------- gradle/dependencies.gradle | 1 + 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/build.gradle b/build.gradle index 7d090a858a..71eb0c5aba 100644 --- a/build.gradle +++ b/build.gradle @@ -960,7 +960,8 @@ project(':core') { exclude group: 'org.slf4j', module: '*' exclude group: 'net.sourceforge.argparse4j', module: '*' } - implementation libs.opentelemetryJava17 +// implementation libs.opentelemetryJava17 + implementation libs.opentelemetryJava8 implementation libs.opentelemetryOshi implementation libs.opentelemetrySdk implementation libs.opentelemetrySdkMetrics diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java index d8342b7cfe..a5bf2702cc 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java @@ -31,8 +31,9 @@ import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; -import io.opentelemetry.instrumentation.runtimemetrics.java17.JfrFeature; -import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics; +import io.opentelemetry.instrumentation.runtimemetrics.java8.Cpu; +import io.opentelemetry.instrumentation.runtimemetrics.java8.GarbageCollector; +import io.opentelemetry.instrumentation.runtimemetrics.java8.MemoryPools; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.OpenTelemetrySdkBuilder; import io.opentelemetry.sdk.metrics.SdkMeterProvider; @@ -74,7 +75,7 @@ public class TelemetryManager { private final Map, String> labelMap; private final Supplier attributesBuilderSupplier; private final List metricReaderList; - private RuntimeMetrics runtimeMetrics; + private final List autoCloseables; private PrometheusHttpServer prometheusHttpServer; public TelemetryManager(KafkaConfig kafkaConfig, String clusterId) { @@ -82,6 +83,7 @@ public TelemetryManager(KafkaConfig kafkaConfig, String clusterId) { this.clusterId = clusterId; this.labelMap = new HashMap<>(); this.metricReaderList = new ArrayList<>(); + this.autoCloseables = new ArrayList<>(); this.attributesBuilderSupplier = Attributes::builder; isTracerEnabled = kafkaConfig.s3TracerEnable(); init(); @@ -127,13 +129,9 @@ private void init() { // set JVM metrics opt-in to prevent metrics conflict. System.setProperty("otel.semconv-stability.opt-in", "jvm"); // JVM metrics - runtimeMetrics = RuntimeMetrics.builder(openTelemetrySdk) - .enableFeature(JfrFeature.GC_DURATION_METRICS) - .enableFeature(JfrFeature.CPU_UTILIZATION_METRICS) - .enableFeature(JfrFeature.MEMORY_POOL_METRICS) - .enableFeature(JfrFeature.MEMORY_ALLOCATION_METRICS) - .enableFeature(JfrFeature.NETWORK_IO_METRICS) - .build(); + autoCloseables.addAll(MemoryPools.registerObservers(openTelemetrySdk)); + autoCloseables.addAll(Cpu.registerObservers(openTelemetrySdk)); + autoCloseables.addAll(GarbageCollector.registerObservers(openTelemetrySdk)); Meter meter = openTelemetrySdk.getMeter(TelemetryConstants.TELEMETRY_SCOPE_NAME); S3StreamMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX); @@ -248,21 +246,25 @@ private void initPrometheusExporter(SdkMeterProviderBuilder sdkMeterProviderBuil } public void shutdown() { + autoCloseables.forEach(autoCloseable -> { + try { + autoCloseable.close(); + } catch (Exception e) { + LOGGER.error("Failed to close auto closeable", e); + } + }); if (prometheusHttpServer != null) { prometheusHttpServer.forceFlush(); prometheusHttpServer.close(); } - for (MetricReader metricReader : metricReaderList) { + metricReaderList.forEach(metricReader -> { metricReader.forceFlush(); try { metricReader.close(); - } catch (IOException ignored) { - + } catch (IOException e) { + LOGGER.error("Failed to close metric reader", e); } - } - if (runtimeMetrics != null) { - runtimeMetrics.close(); - } + }); if (openTelemetrySdk != null) { openTelemetrySdk.close(); } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 971e3a1303..0b8ddb8c01 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -228,6 +228,7 @@ libs += [ nettyHttp2: "io.netty:netty-codec-http2:$versions.netty", s3stream: "com.automq.elasticstream:s3stream:$versions.s3stream", opentelemetryJava17: "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java17:$versions.opentelemetryAlpha", + opentelemetryJava8: "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java8:$versions.opentelemetryAlpha", opentelemetryOshi: "io.opentelemetry.instrumentation:opentelemetry-oshi:$versions.opentelemetryAlpha", opentelemetrySdk: "io.opentelemetry:opentelemetry-sdk:$versions.opentelemetry", opentelemetrySdkMetrics: "io.opentelemetry:opentelemetry-sdk-metrics:$versions.opentelemetry", From 9ca16782f4301fa7164f21d495132a6770cd31f9 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 28 Dec 2023 18:46:03 +0800 Subject: [PATCH 4/5] fix(core): remove uneccessary dependencies Signed-off-by: Shichao Nie --- build.gradle | 2 +- gradle/dependencies.gradle | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 71eb0c5aba..dd5f84de0c 100644 --- a/build.gradle +++ b/build.gradle @@ -960,7 +960,7 @@ project(':core') { exclude group: 'org.slf4j', module: '*' exclude group: 'net.sourceforge.argparse4j', module: '*' } -// implementation libs.opentelemetryJava17 + implementation libs.opentelemetryJava8 implementation libs.opentelemetryOshi implementation libs.opentelemetrySdk diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 0b8ddb8c01..8c63eb81ea 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -227,7 +227,6 @@ libs += [ commonLang: "org.apache.commons:commons-lang3:$versions.commonLang", nettyHttp2: "io.netty:netty-codec-http2:$versions.netty", s3stream: "com.automq.elasticstream:s3stream:$versions.s3stream", - opentelemetryJava17: "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java17:$versions.opentelemetryAlpha", opentelemetryJava8: "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java8:$versions.opentelemetryAlpha", opentelemetryOshi: "io.opentelemetry.instrumentation:opentelemetry-oshi:$versions.opentelemetryAlpha", opentelemetrySdk: "io.opentelemetry:opentelemetry-sdk:$versions.opentelemetry", From e33b3c1b500bd09dba8d044a7f1175d367346f64 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 28 Dec 2023 19:37:06 +0800 Subject: [PATCH 5/5] fix(core): fix spotBugs check dependencies Signed-off-by: Shichao Nie --- .../kafka/log/stream/s3/telemetry/ContextUtils.java | 2 +- .../log/stream/s3/telemetry/TelemetryManager.java | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java index 1362507b89..09b7336277 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/ContextUtils.java @@ -35,7 +35,7 @@ public static AppendContext createAppendContext() { public static TraceContext createTraceContext() { OpenTelemetrySdk openTelemetrySdk = TelemetryManager.getOpenTelemetrySdk(); - boolean isTraceEnabled = openTelemetrySdk != null && TelemetryManager.isTracerEnabled; + boolean isTraceEnabled = openTelemetrySdk != null && TelemetryManager.isTraceEnable(); Tracer tracer = null; if (isTraceEnabled) { tracer = openTelemetrySdk.getTracer(TelemetryConstants.TELEMETRY_SCOPE_NAME); diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java index a5bf2702cc..f9c6e8247e 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java @@ -67,9 +67,9 @@ public class TelemetryManager { private static final Logger LOGGER = LoggerFactory.getLogger(TelemetryManager.class); private static final Integer EXPORTER_TIMEOUT_MS = 5000; - public static boolean isTracerEnabled = false; private static java.util.logging.Logger metricsLogger; private static OpenTelemetrySdk openTelemetrySdk; + private static boolean traceEnable = false; private final KafkaConfig kafkaConfig; private final String clusterId; private final Map, String> labelMap; @@ -85,7 +85,6 @@ public TelemetryManager(KafkaConfig kafkaConfig, String clusterId) { this.metricReaderList = new ArrayList<>(); this.autoCloseables = new ArrayList<>(); this.attributesBuilderSupplier = Attributes::builder; - isTracerEnabled = kafkaConfig.s3TracerEnable(); init(); } @@ -97,6 +96,10 @@ private String getNodeType() { return "server"; } + public static boolean isTraceEnable() { + return traceEnable; + } + private void init() { Resource resource = Resource.getDefault().toBuilder() .put(ResourceAttributes.SERVICE_NAMESPACE, clusterId) @@ -110,6 +113,8 @@ private void init() { OpenTelemetrySdkBuilder openTelemetrySdkBuilder = OpenTelemetrySdk.builder(); + traceEnable = kafkaConfig.s3TracerEnable(); + if (kafkaConfig.s3MetricsEnable()) { SdkMeterProvider sdkMeterProvider = getMetricsProvider(resource); if (sdkMeterProvider != null) { @@ -137,6 +142,7 @@ private void init() { S3StreamMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX); S3StreamMetricsManager.initAttributesBuilder(() -> { //TODO: cache and reuse attributes + //TODO: support metrics level AttributesBuilder builder = attributesBuilderSupplier.get(); labelMap.forEach(builder::put); return builder; @@ -254,7 +260,6 @@ public void shutdown() { } }); if (prometheusHttpServer != null) { - prometheusHttpServer.forceFlush(); prometheusHttpServer.close(); } metricReaderList.forEach(metricReader -> {