diff --git a/core/src/main/java/kafka/automq/table/coordinator/TableCoordinator.java b/core/src/main/java/kafka/automq/table/coordinator/TableCoordinator.java index 4701a0e3ea..643083a7c2 100644 --- a/core/src/main/java/kafka/automq/table/coordinator/TableCoordinator.java +++ b/core/src/main/java/kafka/automq/table/coordinator/TableCoordinator.java @@ -36,6 +36,7 @@ import org.apache.kafka.storage.internals.log.LogConfig; +import com.automq.stream.s3.metrics.Metrics; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.utils.Systems; import com.automq.stream.utils.Threads; @@ -61,13 +62,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -82,24 +80,8 @@ public class TableCoordinator implements Closeable { private static final String SNAPSHOT_COMMIT_ID = "automq.commit.id"; private static final String WATERMARK = "automq.watermark"; private static final UUID NOOP_UUID = new UUID(0, 0); - private static final Map WATERMARK_METRICS = new ConcurrentHashMap<>(); - private static final Map FIELD_PER_SECONDS_METRICS = new ConcurrentHashMap<>(); private static final long NOOP_WATERMARK = -1L; - static { - TableTopicMetricsManager.setDelaySupplier(() -> { - Map delay = new HashMap<>(WATERMARK_METRICS.size()); - long now = System.currentTimeMillis(); - WATERMARK_METRICS.forEach((topic, watermark) -> { - if (watermark != NOOP_WATERMARK) { - delay.put(topic, now - watermark); - } - }); - return delay; - }); - TableTopicMetricsManager.setFieldsPerSecondSupplier(() -> FIELD_PER_SECONDS_METRICS); - } - private final Catalog catalog; private final String topic; private final String name; @@ -113,9 +95,11 @@ public class TableCoordinator implements Closeable { private final long commitTimeout = TimeUnit.SECONDS.toMillis(30); private volatile boolean closed = false; private final Supplier config; + private final Metrics.LongGaugeBundle.LongGauge delayMetric; + private final Metrics.DoubleGaugeBundle.DoubleGauge fieldsPerSecondMetric; public TableCoordinator(Catalog catalog, String topic, MetaStream metaStream, Channel channel, - EventLoop eventLoop, MetadataCache metadataCache, Supplier config) { + EventLoop eventLoop, MetadataCache metadataCache, Supplier config) { this.catalog = catalog; this.topic = topic; this.name = topic; @@ -125,13 +109,15 @@ public TableCoordinator(Catalog catalog, String topic, MetaStream metaStream, Ch this.metadataCache = metadataCache; this.config = config; this.tableIdentifier = TableIdentifierUtil.of(config.get().tableTopicNamespace, topic); + this.delayMetric = TableTopicMetricsManager.registerDelay(topic); + this.fieldsPerSecondMetric = TableTopicMetricsManager.registerFieldsPerSecond(topic); } private CommitStatusMachine commitStatusMachine; public void start() { - WATERMARK_METRICS.put(topic, -1L); - FIELD_PER_SECONDS_METRICS.put(topic, 0.0); + delayMetric.clear(); + fieldsPerSecondMetric.record(0.0); // await for a while to avoid multi coordinators concurrent commit. SCHEDULER.schedule(() -> { @@ -157,8 +143,8 @@ public void start() { public void close() { // quick close closed = true; - WATERMARK_METRICS.remove(topic); - FIELD_PER_SECONDS_METRICS.remove(topic); + delayMetric.close(); + fieldsPerSecondMetric.close(); eventLoop.execute(() -> { if (commitStatusMachine != null) { commitStatusMachine.close(); @@ -474,9 +460,15 @@ private void handlePartitionNumChange(int partitionNum) { } private void recordMetrics() { - double fps = commitFieldCount * 1000.0 / Math.max(System.currentTimeMillis() - lastCommitTimestamp, 1); - FIELD_PER_SECONDS_METRICS.computeIfPresent(topic, (k, v) -> fps); - WATERMARK_METRICS.computeIfPresent(topic, (k, v) -> watermark(partitionWatermarks)); + long now = System.currentTimeMillis(); + double fps = commitFieldCount * 1000.0 / Math.max(now - lastCommitTimestamp, 1); + fieldsPerSecondMetric.record(fps); + long watermarkTimestamp = watermark(partitionWatermarks); + if (watermarkTimestamp == NOOP_WATERMARK) { + delayMetric.clear(); + } else { + delayMetric.record(Math.max(now - watermarkTimestamp, 0)); + } } private boolean tryEvolvePartition() { diff --git a/core/src/main/java/kafka/automq/table/metric/TableTopicMetricsManager.java b/core/src/main/java/kafka/automq/table/metric/TableTopicMetricsManager.java index 832fb93b98..200ec175b5 100644 --- a/core/src/main/java/kafka/automq/table/metric/TableTopicMetricsManager.java +++ b/core/src/main/java/kafka/automq/table/metric/TableTopicMetricsManager.java @@ -19,49 +19,45 @@ package kafka.automq.table.metric; +import com.automq.stream.s3.metrics.Metrics; +import com.automq.stream.s3.metrics.MetricsLevel; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.time.Duration; -import java.util.Collections; -import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.api.metrics.ObservableDoubleGauge; -import io.opentelemetry.api.metrics.ObservableLongGauge; -public class TableTopicMetricsManager { +public final class TableTopicMetricsManager { private static final Cache TOPIC_ATTRIBUTE_CACHE = CacheBuilder.newBuilder() .expireAfterAccess(Duration.ofMinutes(1)).build(); - private static Supplier> delaySupplier = Collections::emptyMap; - private static Supplier> fieldsPerSecondSupplier = Collections::emptyMap; - private static ObservableLongGauge delay; - private static ObservableDoubleGauge fieldsPerSecond; + private static final Metrics.LongGaugeBundle DELAY_GAUGES = Metrics.instance() + .longGauge("kafka_tabletopic_delay", "Table topic commit delay", "ms"); + private static final Metrics.DoubleGaugeBundle FIELDS_PER_SECOND_GAUGES = Metrics.instance() + .doubleGauge("kafka_tabletopic_fps", "Table topic fields per second", "fields/s"); + private static final Metrics.DoubleGaugeBundle EVENT_LOOP_BUSY_GAUGES = Metrics.instance() + .doubleGauge("kafka_tableworker_eventloop_busy_ratio", "Table worker event loop busy ratio", "%"); + + private TableTopicMetricsManager() { + } public static void initMetrics(Meter meter) { - String prefix = "kafka_tabletopic_"; - delay = meter.gaugeBuilder(prefix + "delay").ofLongs().setUnit("ms") - .buildWithCallback(recorder -> - delaySupplier.get().forEach((topic, delay) -> { - if (delay >= 0) { - recorder.record(delay, getTopicAttribute(topic)); - } - })); - fieldsPerSecond = meter.gaugeBuilder(prefix + "fps") - .buildWithCallback(recorder -> - fieldsPerSecondSupplier.get().forEach((topic, fps) -> recorder.record(fps, getTopicAttribute(topic)))); + // Metrics instruments are registered via Metrics.instance(); no additional setup required. } - public static void setDelaySupplier(Supplier> supplier) { - delaySupplier = supplier; + public static Metrics.LongGaugeBundle.LongGauge registerDelay(String topic) { + return DELAY_GAUGES.register(MetricsLevel.INFO, getTopicAttribute(topic)); } - public static void setFieldsPerSecondSupplier(Supplier> supplier) { - fieldsPerSecondSupplier = supplier; + public static Metrics.DoubleGaugeBundle.DoubleGauge registerFieldsPerSecond(String topic) { + return FIELDS_PER_SECOND_GAUGES.register(MetricsLevel.INFO, getTopicAttribute(topic)); + } + + public static Metrics.DoubleGaugeBundle.DoubleGauge registerEventLoopBusy(String loop) { + return EVENT_LOOP_BUSY_GAUGES.register(MetricsLevel.INFO, getLoopAttribute(loop)); } private static Attributes getTopicAttribute(String topic) { @@ -72,4 +68,7 @@ private static Attributes getTopicAttribute(String topic) { } } + private static Attributes getLoopAttribute(String loop) { + return Attributes.of(AttributeKey.stringKey("event_loop"), loop); + } } diff --git a/core/src/main/java/kafka/automq/table/worker/EventLoops.java b/core/src/main/java/kafka/automq/table/worker/EventLoops.java index 803c12ca7b..cf16f65665 100644 --- a/core/src/main/java/kafka/automq/table/worker/EventLoops.java +++ b/core/src/main/java/kafka/automq/table/worker/EventLoops.java @@ -19,6 +19,9 @@ package kafka.automq.table.worker; +import kafka.automq.table.metric.TableTopicMetricsManager; + +import com.automq.stream.s3.metrics.Metrics; import com.automq.stream.utils.Threads; import com.automq.stream.utils.threads.EventLoop; @@ -90,11 +93,23 @@ void logStats() { lastRecordNanoTimes[i] = recordNanoTime; long elapseDelta = Math.max(recordNanoTime - lastRecordNanoTime, 1); - sb.append(eventLoop.eventLoop.getName()).append(String.format(": %.1f", (double) busyTimeDelta / elapseDelta * 100)).append("%, "); + double busyRatio = (double) busyTimeDelta / elapseDelta * 100; + eventLoop.lastBusyRatio = busyRatio; + eventLoop.busyRatioGauge.record(busyRatio); + sb.append(eventLoop.eventLoop.getName()).append(String.format(": %.1f", busyRatio)).append("%, "); } LOGGER.info(sb.toString()); } + double busyRatio(EventLoop eventLoop) { + for (EventLoopWrapper wrapper : eventLoops) { + if (wrapper.eventLoop == eventLoop) { + return wrapper.lastBusyRatio; + } + } + return 0.0; + } + public static class EventLoopWrapper { final EventLoop eventLoop; @@ -102,9 +117,12 @@ public static class EventLoopWrapper { final AtomicInteger inflight = new AtomicInteger(); volatile long runningTaskStartTime = -1; volatile long totalBusyTime = 0; + volatile double lastBusyRatio = 0.0; + final Metrics.DoubleGaugeBundle.DoubleGauge busyRatioGauge; public EventLoopWrapper(EventLoop eventLoop) { this.eventLoop = eventLoop; + this.busyRatioGauge = TableTopicMetricsManager.registerEventLoopBusy(eventLoop.getName()); } } diff --git a/core/src/main/java/kafka/automq/table/worker/IcebergWriter.java b/core/src/main/java/kafka/automq/table/worker/IcebergWriter.java index e0f302f6e3..448d1d1199 100644 --- a/core/src/main/java/kafka/automq/table/worker/IcebergWriter.java +++ b/core/src/main/java/kafka/automq/table/worker/IcebergWriter.java @@ -30,11 +30,17 @@ import org.apache.kafka.server.record.ErrorsTolerance; import com.automq.stream.s3.metrics.TimerUtil; +import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; +import com.automq.stream.s3.network.GlobalNetworkBandwidthLimiters; +import com.automq.stream.s3.network.NetworkBandwidthLimiter; +import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.LogSuppressor; import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -59,7 +65,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -87,6 +95,8 @@ public class IcebergWriter implements Writer { private final boolean deltaWrite; private RecordBinder binder; private String lastSchemaIdentity; + private final NetworkBandwidthLimiter outboundLimiter; + public IcebergWriter(IcebergTableManager icebergTableManager, RecordProcessor processor, WorkerConfig config) { this.tableId = icebergTableManager.tableId(); @@ -94,6 +104,7 @@ public IcebergWriter(IcebergTableManager icebergTableManager, RecordProcessor pr this.processor = processor; this.config = config; this.deltaWrite = StringUtils.isNoneBlank(config.cdcField()) || config.upsertEnable(); + this.outboundLimiter = GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND); } @Override @@ -130,7 +141,7 @@ public String toString() { } protected boolean write0(int partition, - org.apache.kafka.common.record.Record kafkaRecord) throws IOException, RecordProcessorException { + org.apache.kafka.common.record.Record kafkaRecord) throws IOException, RecordProcessorException { ProcessingResult result = processor.process(partition, kafkaRecord); if (!result.isSuccess()) { @@ -175,7 +186,11 @@ protected boolean write0(int partition, recordMetric(partition, kafkaRecord.timestamp()); + waitForNetworkPermit(1); + TaskWriter writer = getWriter(record.struct()); + + if (deltaWrite) { writer.write(new RecordWrapper(record, config.cdcField(), config.upsertEnable())); } else { @@ -422,7 +437,9 @@ private boolean flush() throws IOException { return false; } // Complete writer first, then collect statistics only if successful - results.add(this.writer.complete()); + WriteResult writeResult = this.writer.complete(); + results.add(writeResult); + recordNetworkCost(writeResult); // Collect field count statistics from the binder after successful completion if (binder != null) { @@ -439,6 +456,50 @@ private boolean flush() throws IOException { } } + private void recordNetworkCost(WriteResult writeResult) { + final long totalBytes = calculateWriteResultBytes(writeResult); + if (totalBytes <= 0) { + LOGGER.warn("[NETWORK_LIMITER_RECORD_INVALID_BYTES],{},bytes={}", this, totalBytes); + return; + } + try { + waitForNetworkPermit(totalBytes); + } catch (IOException e) { + LOGGER.warn("[NETWORK_LIMITER_RECORD_FAIL],{},bytes={}", this, totalBytes, e); + } + } + + private long calculateWriteResultBytes(WriteResult writeResult) { + long bytes = 0L; + for (DataFile file : writeResult.dataFiles()) { + bytes += Math.max(file.fileSizeInBytes(), 0L); + } + for (DeleteFile file : writeResult.deleteFiles()) { + bytes += Math.max(file.fileSizeInBytes(), 0L); + } + return bytes; + } + + private void waitForNetworkPermit(long size) throws IOException { + try { + outboundLimiter.consumeBlocking(ThrottleStrategy.ICEBERG_WRITE, size); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[NETWORK_LIMITER_PERMIT_FAIL],{}", this, e); + throw new IOException("Failed to acquire outbound network permit", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause() == null ? e : e.getCause(); + LOGGER.warn("[NETWORK_LIMITER_PERMIT_ERROR],{}", this, cause); + throw new IOException("Failed to acquire outbound network permit", cause); + } catch (CancellationException e) { + LOGGER.warn("[NETWORK_LIMITER_PERMIT_ERROR],{}", this, e); + throw new IOException("Failed to acquire outbound network permit", e); + } catch (RuntimeException e) { + LOGGER.warn("[NETWORK_LIMITER_PERMIT_FAIL],{}", this, e); + throw new IOException("Failed to acquire outbound network permit", e); + } + } + private void recordMetric(int partition, long timestamp) { Metric metric = metrics.get(partition); if (metric.watermark < timestamp) { @@ -451,10 +512,10 @@ private void recordMetric(int partition, long timestamp) { */ private String buildRecordContext(int partition, org.apache.kafka.common.record.Record kafkaRecord) { return String.format("topic=%s, partition=%d, offset=%d, timestamp=%d", - tableId.name(), - partition, - kafkaRecord.offset(), - kafkaRecord.timestamp()); + tableId.name(), + partition, + kafkaRecord.offset(), + kafkaRecord.timestamp()); } static class Metric { diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/Metrics.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/Metrics.java index b334c6a2a2..5bc7d6f9ce 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/Metrics.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/Metrics.java @@ -25,13 +25,18 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableDoubleGauge; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.context.Context; public class Metrics { @@ -58,6 +63,14 @@ public LongCounter counter(Function newFunc) { return new LazyLongCounter(newFunc); } + public LongGaugeBundle longGauge(String name, String desc, String unit) { + return new LongGaugeBundle(name, desc, unit); + } + + public DoubleGaugeBundle doubleGauge(String name, String desc, String unit) { + return new DoubleGaugeBundle(name, desc, unit); + } + private synchronized void setup0() { if (meter == null) { return; @@ -185,15 +198,180 @@ public void setup() { return; } this.finalAttributes = Attributes.builder() - .putAll(globalConfig.getBaseAttributes()) - .putAll(histogramAttrs) - .build(); + .putAll(globalConfig.getBaseAttributes()) + .putAll(histogramAttrs) + .build(); this.shouldRecord = level.isWithin(globalConfig.getMetricsLevel()); histogram.setSnapshotInterval(globalConfig.getMetricsReportIntervalMs()); } } } + public class LongGaugeBundle implements Setup { + private final List gauges = new CopyOnWriteArrayList<>(); + private final String name; + private final String desc; + private final String unit; + + private ObservableLongGauge instrument; + + @SuppressWarnings("this-escape") + public LongGaugeBundle(String name, String desc, String unit) { + this.name = name; + this.desc = desc; + this.unit = unit; + waitingSetups.add(this); + setup0(); + } + + public LongGauge register(MetricsLevel level, Attributes attributes) { + LongGauge gauge = new LongGauge(level, attributes); + gauges.add(gauge); + gauge.setup(); + return gauge; + } + + public synchronized void setup() { + gauges.forEach(LongGauge::setup); + this.instrument = meter.gaugeBuilder(name) + .setDescription(desc) + .setUnit(unit) + .ofLongs() + .buildWithCallback(measurement -> gauges.forEach(gauge -> gauge.record(measurement))); + } + + public final class LongGauge implements AutoCloseable { + private final MetricsLevel level; + private final Attributes gaugeAttributes; + private final AtomicLong value = new AtomicLong(); + private final AtomicBoolean hasValue = new AtomicBoolean(false); + private Attributes finalAttributes = Attributes.empty(); + private volatile boolean shouldRecord = true; + + private LongGauge(MetricsLevel level, Attributes attributes) { + this.level = level; + this.gaugeAttributes = attributes; + this.finalAttributes = attributes; + } + + private void setup() { + if (meter != null && globalConfig != null) { + this.finalAttributes = Attributes.builder() + .putAll(globalConfig.getBaseAttributes()) + .putAll(gaugeAttributes) + .build(); + this.shouldRecord = level.isWithin(globalConfig.getMetricsLevel()); + } else { + this.finalAttributes = gaugeAttributes; + this.shouldRecord = true; + } + } + + public void record(long newValue) { + value.set(newValue); + hasValue.set(true); + } + + public void clear() { + hasValue.set(false); + } + + private void record(ObservableLongMeasurement measurement) { + if (shouldRecord && hasValue.get()) { + measurement.record(value.get(), finalAttributes); + } + } + + @Override + public void close() { + gauges.remove(this); + hasValue.set(false); + } + } + } + + public class DoubleGaugeBundle implements Setup { + private final List gauges = new CopyOnWriteArrayList<>(); + private final String name; + private final String desc; + private final String unit; + + private ObservableDoubleGauge instrument; + + @SuppressWarnings("this-escape") + public DoubleGaugeBundle(String name, String desc, String unit) { + this.name = name; + this.desc = desc; + this.unit = unit; + waitingSetups.add(this); + setup0(); + } + + public DoubleGauge register(MetricsLevel level, Attributes attributes) { + DoubleGauge gauge = new DoubleGauge(level, attributes); + gauges.add(gauge); + gauge.setup(); + return gauge; + } + + public synchronized void setup() { + gauges.forEach(DoubleGauge::setup); + this.instrument = meter.gaugeBuilder(name) + .setDescription(desc) + .setUnit(unit) + .buildWithCallback(measurement -> gauges.forEach(gauge -> gauge.record(measurement))); + } + + public final class DoubleGauge implements AutoCloseable { + private final MetricsLevel level; + private final Attributes gaugeAttributes; + private final AtomicReference value = new AtomicReference<>(0.0); + private final AtomicBoolean hasValue = new AtomicBoolean(false); + private Attributes finalAttributes = Attributes.empty(); + private volatile boolean shouldRecord = true; + + private DoubleGauge(MetricsLevel level, Attributes attributes) { + this.level = level; + this.gaugeAttributes = attributes; + this.finalAttributes = attributes; + } + + private void setup() { + if (meter != null && globalConfig != null) { + this.finalAttributes = Attributes.builder() + .putAll(globalConfig.getBaseAttributes()) + .putAll(gaugeAttributes) + .build(); + this.shouldRecord = level.isWithin(globalConfig.getMetricsLevel()); + } else { + this.finalAttributes = gaugeAttributes; + this.shouldRecord = true; + } + } + + public void record(double newValue) { + value.set(newValue); + hasValue.set(true); + } + + public void clear() { + hasValue.set(false); + } + + private void record(ObservableDoubleMeasurement measurement) { + if (shouldRecord && hasValue.get()) { + measurement.record(value.get(), finalAttributes); + } + } + + @Override + public void close() { + gauges.remove(this); + hasValue.set(false); + } + } + } + public class LazyLongCounter implements Setup, LongCounter { private final Function newFunc; private LongCounter counter = new NoopLongCounter(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/NetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/NetworkBandwidthLimiter.java index 1775375e52..e6e957c2c6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/network/NetworkBandwidthLimiter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/network/NetworkBandwidthLimiter.java @@ -20,12 +20,27 @@ package com.automq.stream.s3.network; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; public interface NetworkBandwidthLimiter { NetworkBandwidthLimiter NOOP = new Noop(); CompletableFuture consume(ThrottleStrategy throttleStrategy, long size); + default void consumeBlocking(ThrottleStrategy throttleStrategy, long size) + throws InterruptedException, ExecutionException { + CompletableFuture future = consume(throttleStrategy, size); + if (future == null) { + return; + } + try { + future.get(); + } catch (InterruptedException e) { + future.cancel(true); + throw e; + } + } + long getMaxTokens(); long getAvailableTokens(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/ThrottleStrategy.java b/s3stream/src/main/java/com/automq/stream/s3/network/ThrottleStrategy.java index 596153d596..0ff361645e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/network/ThrottleStrategy.java +++ b/s3stream/src/main/java/com/automq/stream/s3/network/ThrottleStrategy.java @@ -23,7 +23,8 @@ public enum ThrottleStrategy { BYPASS(0, "bypass"), COMPACTION(1, "compaction"), TAIL(2, "tail"), - CATCH_UP(3, "catchup"); + CATCH_UP(3, "catchup"), + ICEBERG_WRITE(4, "iceberg_write"); private final int priority; private final String name;