Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.apache.kafka.storage.internals.log.LogConfig;

import com.automq.stream.s3.metrics.Metrics;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.utils.Systems;
import com.automq.stream.utils.Threads;
Expand All @@ -61,13 +62,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -82,24 +80,8 @@ public class TableCoordinator implements Closeable {
private static final String SNAPSHOT_COMMIT_ID = "automq.commit.id";
private static final String WATERMARK = "automq.watermark";
private static final UUID NOOP_UUID = new UUID(0, 0);
private static final Map<String, Long> WATERMARK_METRICS = new ConcurrentHashMap<>();
private static final Map<String, Double> FIELD_PER_SECONDS_METRICS = new ConcurrentHashMap<>();
private static final long NOOP_WATERMARK = -1L;

static {
TableTopicMetricsManager.setDelaySupplier(() -> {
Map<String, Long> delay = new HashMap<>(WATERMARK_METRICS.size());
long now = System.currentTimeMillis();
WATERMARK_METRICS.forEach((topic, watermark) -> {
if (watermark != NOOP_WATERMARK) {
delay.put(topic, now - watermark);
}
});
return delay;
});
TableTopicMetricsManager.setFieldsPerSecondSupplier(() -> FIELD_PER_SECONDS_METRICS);
}

private final Catalog catalog;
private final String topic;
private final String name;
Expand All @@ -113,9 +95,11 @@ public class TableCoordinator implements Closeable {
private final long commitTimeout = TimeUnit.SECONDS.toMillis(30);
private volatile boolean closed = false;
private final Supplier<LogConfig> config;
private final Metrics.LongGaugeBundle.LongGauge delayMetric;
private final Metrics.DoubleGaugeBundle.DoubleGauge fieldsPerSecondMetric;

public TableCoordinator(Catalog catalog, String topic, MetaStream metaStream, Channel channel,
EventLoop eventLoop, MetadataCache metadataCache, Supplier<LogConfig> config) {
EventLoop eventLoop, MetadataCache metadataCache, Supplier<LogConfig> config) {
this.catalog = catalog;
this.topic = topic;
this.name = topic;
Expand All @@ -125,13 +109,15 @@ public TableCoordinator(Catalog catalog, String topic, MetaStream metaStream, Ch
this.metadataCache = metadataCache;
this.config = config;
this.tableIdentifier = TableIdentifierUtil.of(config.get().tableTopicNamespace, topic);
this.delayMetric = TableTopicMetricsManager.registerDelay(topic);
this.fieldsPerSecondMetric = TableTopicMetricsManager.registerFieldsPerSecond(topic);
}

private CommitStatusMachine commitStatusMachine;

public void start() {
WATERMARK_METRICS.put(topic, -1L);
FIELD_PER_SECONDS_METRICS.put(topic, 0.0);
delayMetric.clear();
fieldsPerSecondMetric.record(0.0);

// await for a while to avoid multi coordinators concurrent commit.
SCHEDULER.schedule(() -> {
Expand All @@ -157,8 +143,8 @@ public void start() {
public void close() {
// quick close
closed = true;
WATERMARK_METRICS.remove(topic);
FIELD_PER_SECONDS_METRICS.remove(topic);
delayMetric.close();
fieldsPerSecondMetric.close();
eventLoop.execute(() -> {
if (commitStatusMachine != null) {
commitStatusMachine.close();
Expand Down Expand Up @@ -474,9 +460,15 @@ private void handlePartitionNumChange(int partitionNum) {
}

private void recordMetrics() {
double fps = commitFieldCount * 1000.0 / Math.max(System.currentTimeMillis() - lastCommitTimestamp, 1);
FIELD_PER_SECONDS_METRICS.computeIfPresent(topic, (k, v) -> fps);
WATERMARK_METRICS.computeIfPresent(topic, (k, v) -> watermark(partitionWatermarks));
long now = System.currentTimeMillis();
double fps = commitFieldCount * 1000.0 / Math.max(now - lastCommitTimestamp, 1);
fieldsPerSecondMetric.record(fps);
long watermarkTimestamp = watermark(partitionWatermarks);
if (watermarkTimestamp == NOOP_WATERMARK) {
delayMetric.clear();
} else {
delayMetric.record(Math.max(now - watermarkTimestamp, 0));
}
}

private boolean tryEvolvePartition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,45 @@

package kafka.automq.table.metric;

import com.automq.stream.s3.metrics.Metrics;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.api.metrics.ObservableLongGauge;

public class TableTopicMetricsManager {
public final class TableTopicMetricsManager {
private static final Cache<String, Attributes> TOPIC_ATTRIBUTE_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(1)).build();
private static Supplier<Map<String, Long>> delaySupplier = Collections::emptyMap;
private static Supplier<Map<String, Double>> fieldsPerSecondSupplier = Collections::emptyMap;
private static ObservableLongGauge delay;
private static ObservableDoubleGauge fieldsPerSecond;
private static final Metrics.LongGaugeBundle DELAY_GAUGES = Metrics.instance()
.longGauge("kafka_tabletopic_delay", "Table topic commit delay", "ms");
private static final Metrics.DoubleGaugeBundle FIELDS_PER_SECOND_GAUGES = Metrics.instance()
.doubleGauge("kafka_tabletopic_fps", "Table topic fields per second", "fields/s");
private static final Metrics.DoubleGaugeBundle EVENT_LOOP_BUSY_GAUGES = Metrics.instance()
.doubleGauge("kafka_tableworker_eventloop_busy_ratio", "Table worker event loop busy ratio", "%");

private TableTopicMetricsManager() {
}

public static void initMetrics(Meter meter) {
String prefix = "kafka_tabletopic_";
delay = meter.gaugeBuilder(prefix + "delay").ofLongs().setUnit("ms")
.buildWithCallback(recorder ->
delaySupplier.get().forEach((topic, delay) -> {
if (delay >= 0) {
recorder.record(delay, getTopicAttribute(topic));
}
}));
fieldsPerSecond = meter.gaugeBuilder(prefix + "fps")
.buildWithCallback(recorder ->
fieldsPerSecondSupplier.get().forEach((topic, fps) -> recorder.record(fps, getTopicAttribute(topic))));
// Metrics instruments are registered via Metrics.instance(); no additional setup required.
}

public static void setDelaySupplier(Supplier<Map<String, Long>> supplier) {
delaySupplier = supplier;
public static Metrics.LongGaugeBundle.LongGauge registerDelay(String topic) {
return DELAY_GAUGES.register(MetricsLevel.INFO, getTopicAttribute(topic));
}

public static void setFieldsPerSecondSupplier(Supplier<Map<String, Double>> supplier) {
fieldsPerSecondSupplier = supplier;
public static Metrics.DoubleGaugeBundle.DoubleGauge registerFieldsPerSecond(String topic) {
return FIELDS_PER_SECOND_GAUGES.register(MetricsLevel.INFO, getTopicAttribute(topic));
}

public static Metrics.DoubleGaugeBundle.DoubleGauge registerEventLoopBusy(String loop) {
return EVENT_LOOP_BUSY_GAUGES.register(MetricsLevel.INFO, getLoopAttribute(loop));
}

private static Attributes getTopicAttribute(String topic) {
Expand All @@ -72,4 +68,7 @@ private static Attributes getTopicAttribute(String topic) {
}
}

private static Attributes getLoopAttribute(String loop) {
return Attributes.of(AttributeKey.stringKey("event_loop"), loop);
}
}
20 changes: 19 additions & 1 deletion core/src/main/java/kafka/automq/table/worker/EventLoops.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package kafka.automq.table.worker;

import kafka.automq.table.metric.TableTopicMetricsManager;

import com.automq.stream.s3.metrics.Metrics;
import com.automq.stream.utils.Threads;
import com.automq.stream.utils.threads.EventLoop;

Expand Down Expand Up @@ -90,21 +93,36 @@ void logStats() {
lastRecordNanoTimes[i] = recordNanoTime;

long elapseDelta = Math.max(recordNanoTime - lastRecordNanoTime, 1);
sb.append(eventLoop.eventLoop.getName()).append(String.format(": %.1f", (double) busyTimeDelta / elapseDelta * 100)).append("%, ");
double busyRatio = (double) busyTimeDelta / elapseDelta * 100;
eventLoop.lastBusyRatio = busyRatio;
eventLoop.busyRatioGauge.record(busyRatio);
sb.append(eventLoop.eventLoop.getName()).append(String.format(": %.1f", busyRatio)).append("%, ");
}
LOGGER.info(sb.toString());
}

double busyRatio(EventLoop eventLoop) {
for (EventLoopWrapper wrapper : eventLoops) {
if (wrapper.eventLoop == eventLoop) {
return wrapper.lastBusyRatio;
}
}
return 0.0;
}

public static class EventLoopWrapper {
final EventLoop eventLoop;

final PriorityBlockingQueue<PriorityTask> tasks = new PriorityBlockingQueue<>();
final AtomicInteger inflight = new AtomicInteger();
volatile long runningTaskStartTime = -1;
volatile long totalBusyTime = 0;
volatile double lastBusyRatio = 0.0;
final Metrics.DoubleGaugeBundle.DoubleGauge busyRatioGauge;

public EventLoopWrapper(EventLoop eventLoop) {
this.eventLoop = eventLoop;
this.busyRatioGauge = TableTopicMetricsManager.registerEventLoopBusy(eventLoop.getName());
}
}

Expand Down
73 changes: 67 additions & 6 deletions core/src/main/java/kafka/automq/table/worker/IcebergWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@
import org.apache.kafka.server.record.ErrorsTolerance;

import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.GlobalNetworkBandwidthLimiters;
import com.automq.stream.s3.network.NetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.LogSuppressor;

import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -59,7 +65,9 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -87,13 +95,16 @@ public class IcebergWriter implements Writer {
private final boolean deltaWrite;
private RecordBinder binder;
private String lastSchemaIdentity;
private final NetworkBandwidthLimiter outboundLimiter;


public IcebergWriter(IcebergTableManager icebergTableManager, RecordProcessor processor, WorkerConfig config) {
this.tableId = icebergTableManager.tableId();
this.icebergTableManager = icebergTableManager;
this.processor = processor;
this.config = config;
this.deltaWrite = StringUtils.isNoneBlank(config.cdcField()) || config.upsertEnable();
this.outboundLimiter = GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND);
}

@Override
Expand Down Expand Up @@ -130,7 +141,7 @@ public String toString() {
}

protected boolean write0(int partition,
org.apache.kafka.common.record.Record kafkaRecord) throws IOException, RecordProcessorException {
org.apache.kafka.common.record.Record kafkaRecord) throws IOException, RecordProcessorException {
ProcessingResult result = processor.process(partition, kafkaRecord);

if (!result.isSuccess()) {
Expand Down Expand Up @@ -175,7 +186,11 @@ protected boolean write0(int partition,

recordMetric(partition, kafkaRecord.timestamp());

waitForNetworkPermit(1);

TaskWriter<Record> writer = getWriter(record.struct());


if (deltaWrite) {
writer.write(new RecordWrapper(record, config.cdcField(), config.upsertEnable()));
} else {
Expand Down Expand Up @@ -422,7 +437,9 @@ private boolean flush() throws IOException {
return false;
}
// Complete writer first, then collect statistics only if successful
results.add(this.writer.complete());
WriteResult writeResult = this.writer.complete();
results.add(writeResult);
recordNetworkCost(writeResult);

// Collect field count statistics from the binder after successful completion
if (binder != null) {
Expand All @@ -439,6 +456,50 @@ private boolean flush() throws IOException {
}
}

private void recordNetworkCost(WriteResult writeResult) {
final long totalBytes = calculateWriteResultBytes(writeResult);
if (totalBytes <= 0) {
LOGGER.warn("[NETWORK_LIMITER_RECORD_INVALID_BYTES],{},bytes={}", this, totalBytes);
return;
}
try {
waitForNetworkPermit(totalBytes);
} catch (IOException e) {
LOGGER.warn("[NETWORK_LIMITER_RECORD_FAIL],{},bytes={}", this, totalBytes, e);
}
}

private long calculateWriteResultBytes(WriteResult writeResult) {
long bytes = 0L;
for (DataFile file : writeResult.dataFiles()) {
bytes += Math.max(file.fileSizeInBytes(), 0L);
}
for (DeleteFile file : writeResult.deleteFiles()) {
bytes += Math.max(file.fileSizeInBytes(), 0L);
}
return bytes;
}

private void waitForNetworkPermit(long size) throws IOException {
try {
outboundLimiter.consumeBlocking(ThrottleStrategy.ICEBERG_WRITE, size);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[NETWORK_LIMITER_PERMIT_FAIL],{}", this, e);
throw new IOException("Failed to acquire outbound network permit", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause() == null ? e : e.getCause();
LOGGER.warn("[NETWORK_LIMITER_PERMIT_ERROR],{}", this, cause);
throw new IOException("Failed to acquire outbound network permit", cause);
} catch (CancellationException e) {
LOGGER.warn("[NETWORK_LIMITER_PERMIT_ERROR],{}", this, e);
throw new IOException("Failed to acquire outbound network permit", e);
} catch (RuntimeException e) {
LOGGER.warn("[NETWORK_LIMITER_PERMIT_FAIL],{}", this, e);
throw new IOException("Failed to acquire outbound network permit", e);
}
}

private void recordMetric(int partition, long timestamp) {
Metric metric = metrics.get(partition);
if (metric.watermark < timestamp) {
Expand All @@ -451,10 +512,10 @@ private void recordMetric(int partition, long timestamp) {
*/
private String buildRecordContext(int partition, org.apache.kafka.common.record.Record kafkaRecord) {
return String.format("topic=%s, partition=%d, offset=%d, timestamp=%d",
tableId.name(),
partition,
kafkaRecord.offset(),
kafkaRecord.timestamp());
tableId.name(),
partition,
kafkaRecord.offset(),
kafkaRecord.timestamp());
}

static class Metric {
Expand Down
Loading
Loading