Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand All @@ -43,13 +44,17 @@ public class LogUploader implements LogRecorder {

public static final int DEFAULT_MAX_QUEUE_SIZE = 64 * 1024;
public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024;
public static final int MAX_UPLOAD_INTERVAL = 60 * 1000;
public static final int UPLOAD_INTERVAL = System.getenv("AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL") != null ? Integer.parseInt(System.getenv("AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL")) : 60 * 1000;
public static final int CLEANUP_INTERVAL = System.getenv("AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL") != null ? Integer.parseInt(System.getenv("AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL")) : 2 * 60 * 1000;
public static final int MAX_JITTER_INTERVAL = 60 * 1000;

private static final LogUploader INSTANCE = new LogUploader();

private final BlockingQueue<LogEvent> queue = new LinkedBlockingQueue<>(DEFAULT_MAX_QUEUE_SIZE);
private final ByteBuf uploadBuffer = Unpooled.directBuffer(DEFAULT_BUFFER_SIZE);
private volatile long lastUploadTimestamp = 0L;
private final Random random = new Random();
private volatile long lastUploadTimestamp = System.currentTimeMillis();
private volatile long nextUploadInterval = UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL);

private volatile boolean closed;

Expand Down Expand Up @@ -174,14 +179,14 @@ public void run() {
}

byte[] bytes = logLine.toString().getBytes(StandardCharsets.UTF_8);
if (uploadBuffer.writableBytes() < bytes.length || now - lastUploadTimestamp > MAX_UPLOAD_INTERVAL) {
if (uploadBuffer.writableBytes() < bytes.length || now - lastUploadTimestamp > nextUploadInterval) {
upload(now);
}
uploadBuffer.writeBytes(bytes);
} else if (closed) {
upload(now);
break;
} else if (now - lastUploadTimestamp > MAX_UPLOAD_INTERVAL) {
} else if (now - lastUploadTimestamp > nextUploadInterval) {
upload(now);
}
} catch (InterruptedException e) {
Expand Down Expand Up @@ -216,6 +221,7 @@ private void upload(long now) {
}
uploadBuffer.clear();
lastUploadTimestamp = now;
nextUploadInterval = UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL);
}
}
}
Expand All @@ -230,7 +236,7 @@ public void run() {
Thread.sleep(Duration.ofMinutes(1).toMillis());
continue;
}
long expiredTime = System.currentTimeMillis() - MAX_UPLOAD_INTERVAL * 3;
long expiredTime = System.currentTimeMillis() - CLEANUP_INTERVAL;

List<Pair<String, Long>> pairList = s3Operator.list(String.format("automq/logs/%s", config.clusterId())).join();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
package com.automq.shell.metrics;

import com.automq.shell.auth.CredentialsProviderHolder;
import com.automq.shell.log.LogUploader;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.DefaultS3Operator;
import com.automq.stream.s3.operator.S3Operator;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -38,6 +38,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -50,12 +51,24 @@ public class S3MetricsExporter implements MetricExporter {
private static final Logger LOGGER = LoggerFactory.getLogger(S3MetricsExporter.class);
private static final String TOTAL_SUFFIX = "_total";

public static final int UPLOAD_INTERVAL = System.getenv("AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL") != null ? Integer.parseInt(System.getenv("AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL")) : 60 * 1000;
public static final int CLEANUP_INTERVAL = System.getenv("AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL") != null ? Integer.parseInt(System.getenv("AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL")) : 2 * 60 * 1000;
public static final int MAX_JITTER_INTERVAL = 60 * 1000;
public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024;

private final S3MetricsConfig config;
private final Map<String, String> defalutTagMap = new HashMap<>();

private final ByteBuf uploadBuffer = Unpooled.directBuffer(DEFAULT_BUFFER_SIZE);
private final Random random = new Random();
private volatile long lastUploadTimestamp = System.currentTimeMillis();
private volatile long nextUploadInterval = UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL);

private final S3Operator s3Operator;
private final ObjectMapper objectMapper = new ObjectMapper();

private volatile boolean closed;
private final Thread uploadThread;
private final Thread cleanupThread;

public S3MetricsExporter(S3MetricsConfig config) {
Expand All @@ -69,20 +82,48 @@ public S3MetricsExporter(S3MetricsConfig config) {
defalutTagMap.put("service_instance_id", String.valueOf(config.nodeId()));
defalutTagMap.put("instance", String.valueOf(config.nodeId()));

uploadThread = new Thread(new UploadTask());
uploadThread.setName("s3-metrics-exporter-upload-thread");
uploadThread.setDaemon(true);

cleanupThread = new Thread(new CleanupTask());
cleanupThread.setName("s3-metrics-exporter-cleanup-thread");
cleanupThread.setDaemon(true);
}

public void start() {
uploadThread.start();
cleanupThread.start();
LOGGER.info("S3MetricsExporter is started");
}

@Override
public void close() {
MetricExporter.super.close();
closed = true;
cleanupThread.interrupt();
uploadThread.interrupt();
flush();
LOGGER.info("S3MetricsExporter is closed");
}

private class UploadTask implements Runnable {

@Override
public void run() {
while (!closed && !uploadThread.isInterrupted()) {
try {
if (uploadBuffer.readableBytes() > 0 && System.currentTimeMillis() - lastUploadTimestamp > nextUploadInterval) {
flush();
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
}
}

private class CleanupTask implements Runnable {

@Override
Expand All @@ -93,7 +134,7 @@ public void run() {
Thread.sleep(Duration.ofMinutes(1).toMillis());
continue;
}
long expiredTime = System.currentTimeMillis() - LogUploader.MAX_UPLOAD_INTERVAL * 3;
long expiredTime = System.currentTimeMillis() - CLEANUP_INTERVAL;

List<Pair<String, Long>> pairList = s3Operator.list(String.format("automq/metrics/%s", config.clusterId())).join();

Expand Down Expand Up @@ -126,6 +167,10 @@ private String getHostName() {

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (closed) {
return CompletableResultCode.ofFailure();
}

try {
List<String> lineList = new ArrayList<>();
// TODO: transfer metrics name into prometheus format
Expand Down Expand Up @@ -171,7 +216,13 @@ public CompletableResultCode export(Collection<MetricData> metrics) {
buffer.writeCharSequence(line, Charset.defaultCharset());
buffer.writeCharSequence("\n", Charset.defaultCharset());
});
s3Operator.write(getObjectKey(), buffer);
synchronized (uploadBuffer) {
if (uploadBuffer.writableBytes() < buffer.readableBytes()) {
// Upload the buffer immediately
flush();
}
uploadBuffer.writeBytes(buffer);
}
} catch (Exception e) {
LOGGER.error("Export metrics to S3 failed", e);
return CompletableResultCode.ofFailure();
Expand All @@ -182,6 +233,20 @@ public CompletableResultCode export(Collection<MetricData> metrics) {

@Override
public CompletableResultCode flush() {
synchronized (uploadBuffer) {
if (uploadBuffer.readableBytes() > 0) {
try {
s3Operator.write(getObjectKey(), uploadBuffer.retainedSlice().asReadOnly(), ThrottleStrategy.BYPASS).get();
} catch (Exception e) {
LOGGER.error("Failed to upload metrics to s3", e);
return CompletableResultCode.ofFailure();
} finally {
lastUploadTimestamp = System.currentTimeMillis();
nextUploadInterval = UPLOAD_INTERVAL + random.nextInt(MAX_JITTER_INTERVAL);
uploadBuffer.clear();
}
}
}
return CompletableResultCode.ofSuccess();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,9 @@ public boolean s3PathStyle() {
return kafkaConfig.s3PathStyle();
}
});
s3MetricsExporter.start();
PeriodicMetricReaderBuilder builder = PeriodicMetricReader.builder(s3MetricsExporter);
MetricReader periodicReader = builder.setInterval(Duration.ofMinutes(1)).build();
MetricReader periodicReader = builder.setInterval(Duration.ofMillis(kafkaConfig.s3ExporterReportIntervalMs())).build();
metricReaderList.add(periodicReader);

SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(sdkMeterProviderBuilder, periodicReader,
Expand Down
6 changes: 5 additions & 1 deletion gradle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
</Match>

<Match>
<Class name="org.apache.kafka.jmh.util.ByteUtilsBenchmark$BaseBenchmarkState" />
<Or>
<Class name="org.apache.kafka.jmh.util.ByteUtilsBenchmark$BaseBenchmarkState"/>
<Class name="com.automq.shell.log.LogUploader"/>
<Class name="com.automq.shell.metrics.S3MetricsExporter"/>
</Or>
<!-- Suppress warning because JMH code is used to instantiate Random() once per Trial. This is done to ensure
that all benchmarks run with same seed for Random. -->
<Bug pattern="DMI_RANDOM_USED_ONLY_ONCE"/>
Expand Down