From e3762ff9c63be9099318d1758ce78830a5052ecc Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 30 May 2024 13:05:56 +0800 Subject: [PATCH 1/4] feat(shell): add jitter for log and metrics report Signed-off-by: SSpirits --- .../main/java/com/automq/shell/log/LogUploader.java | 13 +++++++++---- .../log/stream/s3/telemetry/TelemetryManager.java | 3 ++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java index d80c1a5ce9..55b39e9580 100644 --- a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java +++ b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java @@ -27,6 +27,7 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Random; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -43,13 +44,16 @@ public class LogUploader implements LogRecorder { public static final int DEFAULT_MAX_QUEUE_SIZE = 64 * 1024; public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; - public static final int MAX_UPLOAD_INTERVAL = 60 * 1000; + public static final int MAX_UPLOAD_INTERVAL = 2 * 60 * 1000; + public static final int MAX_JITTER_INTERVAL = 60 * 1000; private static final LogUploader INSTANCE = new LogUploader(); private final BlockingQueue queue = new LinkedBlockingQueue<>(DEFAULT_MAX_QUEUE_SIZE); private final ByteBuf uploadBuffer = Unpooled.directBuffer(DEFAULT_BUFFER_SIZE); + private final Random random = new Random(); private volatile long lastUploadTimestamp = 0L; + private volatile long nextUploadInterval = MAX_UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL); private volatile boolean closed; @@ -174,14 +178,14 @@ public void run() { } byte[] bytes = logLine.toString().getBytes(StandardCharsets.UTF_8); - if (uploadBuffer.writableBytes() < bytes.length || now - lastUploadTimestamp > MAX_UPLOAD_INTERVAL) { + if (uploadBuffer.writableBytes() < bytes.length || now - lastUploadTimestamp > nextUploadInterval) { upload(now); } uploadBuffer.writeBytes(bytes); } else if (closed) { upload(now); break; - } else if (now - lastUploadTimestamp > MAX_UPLOAD_INTERVAL) { + } else if (now - lastUploadTimestamp > nextUploadInterval) { upload(now); } } catch (InterruptedException e) { @@ -216,6 +220,7 @@ private void upload(long now) { } uploadBuffer.clear(); lastUploadTimestamp = now; + nextUploadInterval = MAX_UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL); } } } @@ -230,7 +235,7 @@ public void run() { Thread.sleep(Duration.ofMinutes(1).toMillis()); continue; } - long expiredTime = System.currentTimeMillis() - MAX_UPLOAD_INTERVAL * 3; + long expiredTime = System.currentTimeMillis() - MAX_UPLOAD_INTERVAL; List> pairList = s3Operator.list(String.format("automq/logs/%s", config.clusterId())).join(); diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java index 84c4da2874..d1f327ae46 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.Locale; import java.util.Optional; +import java.util.Random; import java.util.concurrent.TimeUnit; import kafka.server.KafkaConfig; import kafka.server.KafkaRaftServer; @@ -318,7 +319,7 @@ public boolean s3PathStyle() { } }); PeriodicMetricReaderBuilder builder = PeriodicMetricReader.builder(s3MetricsExporter); - MetricReader periodicReader = builder.setInterval(Duration.ofMinutes(1)).build(); + MetricReader periodicReader = builder.setInterval(Duration.ofSeconds(120 + new Random().nextInt(60))).build(); metricReaderList.add(periodicReader); SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(sdkMeterProviderBuilder, periodicReader, From 2e5041a07f42aea0986634b51dd8677ab7099222 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 30 May 2024 14:51:04 +0800 Subject: [PATCH 2/4] feat(shell): add jitter for log and metrics report Signed-off-by: SSpirits --- .../com/automq/shell/log/LogUploader.java | 11 +-- .../shell/metrics/S3MetricsExporter.java | 67 ++++++++++++++++++- .../stream/s3/telemetry/TelemetryManager.java | 3 +- 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java index 55b39e9580..3342f379f3 100644 --- a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java +++ b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java @@ -44,7 +44,8 @@ public class LogUploader implements LogRecorder { public static final int DEFAULT_MAX_QUEUE_SIZE = 64 * 1024; public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; - public static final int MAX_UPLOAD_INTERVAL = 2 * 60 * 1000; + public static final int UPLOAD_INTERVAL = System.getenv("AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL") != null ? Integer.parseInt(System.getenv("AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL")) : 60 * 1000; + public static final int CLEANUP_INTERVAL = System.getenv("AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL") != null ? Integer.parseInt(System.getenv("AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL")) : 2 * 60 * 1000; public static final int MAX_JITTER_INTERVAL = 60 * 1000; private static final LogUploader INSTANCE = new LogUploader(); @@ -52,8 +53,8 @@ public class LogUploader implements LogRecorder { private final BlockingQueue queue = new LinkedBlockingQueue<>(DEFAULT_MAX_QUEUE_SIZE); private final ByteBuf uploadBuffer = Unpooled.directBuffer(DEFAULT_BUFFER_SIZE); private final Random random = new Random(); - private volatile long lastUploadTimestamp = 0L; - private volatile long nextUploadInterval = MAX_UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL); + private volatile long lastUploadTimestamp = System.currentTimeMillis(); + private volatile long nextUploadInterval = UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL); private volatile boolean closed; @@ -220,7 +221,7 @@ private void upload(long now) { } uploadBuffer.clear(); lastUploadTimestamp = now; - nextUploadInterval = MAX_UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL); + nextUploadInterval = UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL); } } } @@ -235,7 +236,7 @@ public void run() { Thread.sleep(Duration.ofMinutes(1).toMillis()); continue; } - long expiredTime = System.currentTimeMillis() - MAX_UPLOAD_INTERVAL; + long expiredTime = System.currentTimeMillis() - CLEANUP_INTERVAL; List> pairList = s3Operator.list(String.format("automq/logs/%s", config.clusterId())).join(); diff --git a/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java b/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java index 01be7d78fb..e880913f15 100644 --- a/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java +++ b/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java @@ -12,7 +12,7 @@ package com.automq.shell.metrics; import com.automq.shell.auth.CredentialsProviderHolder; -import com.automq.shell.log.LogUploader; +import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.s3.operator.DefaultS3Operator; import com.automq.stream.s3.operator.S3Operator; import com.fasterxml.jackson.databind.ObjectMapper; @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -50,12 +51,24 @@ public class S3MetricsExporter implements MetricExporter { private static final Logger LOGGER = LoggerFactory.getLogger(S3MetricsExporter.class); private static final String TOTAL_SUFFIX = "_total"; + public static final int UPLOAD_INTERVAL = System.getenv("AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL") != null ? Integer.parseInt(System.getenv("AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL")) : 60 * 1000; + public static final int CLEANUP_INTERVAL = System.getenv("AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL") != null ? Integer.parseInt(System.getenv("AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL")) : 2 * 60 * 1000; + public static final int MAX_JITTER_INTERVAL = 60 * 1000; + public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; + private final S3MetricsConfig config; private final Map defalutTagMap = new HashMap<>(); + + private final ByteBuf uploadBuffer = Unpooled.directBuffer(DEFAULT_BUFFER_SIZE); + private final Random random = new Random(); + private volatile long lastUploadTimestamp = System.currentTimeMillis(); + private volatile long nextUploadInterval = UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL); + private final S3Operator s3Operator; private final ObjectMapper objectMapper = new ObjectMapper(); private volatile boolean closed; + private final Thread uploadThread; private final Thread cleanupThread; public S3MetricsExporter(S3MetricsConfig config) { @@ -69,6 +82,11 @@ public S3MetricsExporter(S3MetricsConfig config) { defalutTagMap.put("service_instance_id", String.valueOf(config.nodeId())); defalutTagMap.put("instance", String.valueOf(config.nodeId())); + uploadThread = new Thread(new UploadTask()); + uploadThread.setName("s3-metrics-exporter-upload-thread"); + uploadThread.setDaemon(true); + uploadThread.start(); + cleanupThread = new Thread(new CleanupTask()); cleanupThread.setName("s3-metrics-exporter-cleanup-thread"); cleanupThread.setDaemon(true); @@ -80,9 +98,28 @@ public void close() { MetricExporter.super.close(); closed = true; cleanupThread.interrupt(); + uploadThread.interrupt(); + flush(); LOGGER.info("S3MetricsExporter is closed"); } + private class UploadTask implements Runnable { + + @Override + public void run() { + while (!closed && !uploadThread.isInterrupted()) { + try { + if (uploadBuffer.readableBytes() > 0 && System.currentTimeMillis() - lastUploadTimestamp > nextUploadInterval) { + flush(); + } + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + } + } + } + private class CleanupTask implements Runnable { @Override @@ -93,7 +130,7 @@ public void run() { Thread.sleep(Duration.ofMinutes(1).toMillis()); continue; } - long expiredTime = System.currentTimeMillis() - LogUploader.MAX_UPLOAD_INTERVAL * 3; + long expiredTime = System.currentTimeMillis() - CLEANUP_INTERVAL; List> pairList = s3Operator.list(String.format("automq/metrics/%s", config.clusterId())).join(); @@ -126,6 +163,10 @@ private String getHostName() { @Override public CompletableResultCode export(Collection metrics) { + if (closed) { + return CompletableResultCode.ofFailure(); + } + try { List lineList = new ArrayList<>(); // TODO: transfer metrics name into prometheus format @@ -171,7 +212,13 @@ public CompletableResultCode export(Collection metrics) { buffer.writeCharSequence(line, Charset.defaultCharset()); buffer.writeCharSequence("\n", Charset.defaultCharset()); }); - s3Operator.write(getObjectKey(), buffer); + synchronized (uploadBuffer) { + if (uploadBuffer.writableBytes() < buffer.readableBytes()) { + // Upload the buffer immediately + flush(); + } + uploadBuffer.writeBytes(buffer); + } } catch (Exception e) { LOGGER.error("Export metrics to S3 failed", e); return CompletableResultCode.ofFailure(); @@ -182,6 +229,20 @@ public CompletableResultCode export(Collection metrics) { @Override public CompletableResultCode flush() { + synchronized (uploadBuffer) { + if (uploadBuffer.readableBytes() > 0) { + try { + s3Operator.write(getObjectKey(), uploadBuffer.retainedSlice().asReadOnly(), ThrottleStrategy.BYPASS).get(); + } catch (Exception e) { + LOGGER.error("Failed to upload metrics to s3", e); + return CompletableResultCode.ofFailure(); + } finally { + lastUploadTimestamp = System.currentTimeMillis(); + nextUploadInterval = UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL); + uploadBuffer.clear(); + } + } + } return CompletableResultCode.ofSuccess(); } diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java index d1f327ae46..69c0f243db 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java @@ -59,7 +59,6 @@ import java.util.List; import java.util.Locale; import java.util.Optional; -import java.util.Random; import java.util.concurrent.TimeUnit; import kafka.server.KafkaConfig; import kafka.server.KafkaRaftServer; @@ -319,7 +318,7 @@ public boolean s3PathStyle() { } }); PeriodicMetricReaderBuilder builder = PeriodicMetricReader.builder(s3MetricsExporter); - MetricReader periodicReader = builder.setInterval(Duration.ofSeconds(120 + new Random().nextInt(60))).build(); + MetricReader periodicReader = builder.setInterval(Duration.ofMillis(kafkaConfig.s3ExporterReportIntervalMs())).build(); metricReaderList.add(periodicReader); SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(sdkMeterProviderBuilder, periodicReader, From c634157396ec06a5fb636990fdfc5b162854ae4e Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 30 May 2024 17:09:03 +0800 Subject: [PATCH 3/4] feat(shell): suppress spotbugs warning Signed-off-by: SSpirits --- .../shell/metrics/S3MetricsExporter.java | 6 ++- .../stream/s3/telemetry/TelemetryManager.java | 1 + gradle/spotbugs-exclude.xml | 41 +++++++++++-------- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java b/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java index e880913f15..05268a9c54 100644 --- a/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java +++ b/automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java @@ -85,12 +85,16 @@ public S3MetricsExporter(S3MetricsConfig config) { uploadThread = new Thread(new UploadTask()); uploadThread.setName("s3-metrics-exporter-upload-thread"); uploadThread.setDaemon(true); - uploadThread.start(); cleanupThread = new Thread(new CleanupTask()); cleanupThread.setName("s3-metrics-exporter-cleanup-thread"); cleanupThread.setDaemon(true); + } + + public void start() { + uploadThread.start(); cleanupThread.start(); + LOGGER.info("S3MetricsExporter is started"); } @Override diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java index 69c0f243db..ccdceef7ad 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java @@ -317,6 +317,7 @@ public boolean s3PathStyle() { return kafkaConfig.s3PathStyle(); } }); + s3MetricsExporter.start(); PeriodicMetricReaderBuilder builder = PeriodicMetricReader.builder(s3MetricsExporter); MetricReader periodicReader = builder.setInterval(Duration.ofMillis(kafkaConfig.s3ExporterReportIntervalMs())).build(); metricReaderList.add(periodicReader); diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 2042f3317b..57de48e8f8 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -32,7 +32,8 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + @@ -69,7 +70,8 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_FIELD_NAMES: Do not reuse public identifiers from JSL as field name PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_METHOD_NAMES: Do not reuse public identifiers from JSL as method name --> - + @@ -98,7 +100,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read IC_INIT_CIRCULARITY: Initialization circularity SE_NO_SUITABLE_CONSTRUCTOR: Class is Serializable but its superclass doesn't define a void constructor PA_PUBLIC_MUTABLE_OBJECT_ATTRIBUTE: Mutable object-type field is public --> - + @@ -304,9 +306,9 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - + + + @@ -472,9 +474,9 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read any other threads are created, but is used in synchronized and unsynchronized methods because it comes from the configs, passed through rewriteTopology. --> - - - + + + @@ -489,9 +491,9 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - + + + @@ -523,11 +525,12 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read -- + - + - - + + @@ -586,12 +589,16 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + - + + + + + From 80a9022c50a85099d8421d5e04a2231234610d0f Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 30 May 2024 17:14:35 +0800 Subject: [PATCH 4/4] feat(shell): suppress spotbugs warning Signed-off-by: SSpirits --- gradle/spotbugs-exclude.xml | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 57de48e8f8..d5d08c308c 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -32,8 +32,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + @@ -70,8 +69,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_FIELD_NAMES: Do not reuse public identifiers from JSL as field name PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_METHOD_NAMES: Do not reuse public identifiers from JSL as method name --> - + @@ -100,7 +98,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read IC_INIT_CIRCULARITY: Initialization circularity SE_NO_SUITABLE_CONSTRUCTOR: Class is Serializable but its superclass doesn't define a void constructor PA_PUBLIC_MUTABLE_OBJECT_ATTRIBUTE: Mutable object-type field is public --> - + @@ -306,9 +304,9 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - + + + @@ -474,9 +472,9 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read any other threads are created, but is used in synchronized and unsynchronized methods because it comes from the configs, passed through rewriteTopology. --> - - - + + + @@ -491,9 +489,9 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - + + + @@ -525,12 +523,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - +- - - + + @@ -589,7 +586,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - +