From 55774b5520c5c93a1216dd1444a2a7c0de5e1849 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Tue, 14 Apr 2026 17:15:06 +0800 Subject: [PATCH 1/3] [lake/tiering] add tiering table pending time and freshness metrics --- .../org/apache/fluss/metrics/MetricNames.java | 2 ++ .../coordinator/LakeTableTieringManager.java | 23 +++++++++++++++++++ .../observability/monitor-metrics.md | 14 +++++++++-- 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index c1cc2b5b69..650a93abbf 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -70,6 +70,8 @@ public class MetricNames { public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL = "failuresTotal"; public static final String LAKE_TIERING_TABLE_FILE_SIZE = "fileSize"; public static final String LAKE_TIERING_TABLE_RECORD_COUNT = "recordCount"; + public static final String LAKE_TIERING_TABLE_PENDING_TIME = "pendingTime"; + public static final String LAKE_TIERING_TABLE_FRESHNESS = "freshness"; // -------------------------------------------------------------------------------------------- // metrics for tablet server diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java index a3448d148b..439092b898 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java @@ -141,6 +141,9 @@ public class LakeTableTieringManager implements AutoCloseable { // table_id -> start time (ms) of the currently in-progress tiering round private final Map currentTieringStartTime; + // table_id -> time (ms) when the table entered pending queue + private final Map pendingEnterTime; + // the live tables that are tiering, // from table_id -> last heartbeat time by the tiering service private final Map liveTieringTableIds; @@ -184,6 +187,7 @@ protected LakeTableTieringManager( this.delayedTieringByTableId = new HashMap<>(); this.tableFailureCounters = new HashMap<>(); this.currentTieringStartTime = new HashMap<>(); + this.pendingEnterTime = new HashMap<>(); this.tieringMetricGroup = lakeTieringMetricGroup; registerMetrics(); } @@ -279,6 +283,17 @@ private void registerTableMetrics(long tableId, TablePath tablePath) { MetricNames.LAKE_TIERING_TABLE_TIER_DURATION, () -> inReadLock(lock, () -> getLastResultField(tableId, r -> r.tierDuration))); + // pendingTime: how long the table has been waiting in the pending queue + tableMetricGroup.gauge( + MetricNames.LAKE_TIERING_TABLE_PENDING_TIME, + () -> + inReadLock( + lock, + () -> { + long enterTime = pendingEnterTime.getOrDefault(tableId, 0L); + return enterTime > 0 ? clock.milliseconds() - enterTime : 0L; + })); + // failuresTotal: total failure count for this table Counter failuresCounter = tableMetricGroup.counter(MetricNames.LAKE_TIERING_TABLE_FAILURES_TOTAL); @@ -293,6 +308,11 @@ private void registerTableMetrics(long tableId, TablePath tablePath) { tableMetricGroup.gauge( MetricNames.LAKE_TIERING_TABLE_RECORD_COUNT, () -> inReadLock(lock, () -> getLastResultField(tableId, r -> r.recordCount))); + + // freshness: the user-configured table data freshness interval in milliseconds + tableMetricGroup.gauge( + MetricNames.LAKE_TIERING_TABLE_FRESHNESS, + () -> inReadLock(lock, () -> tableLakeFreshness.getOrDefault(tableId, -1L))); } /** @@ -316,6 +336,7 @@ public void removeLakeTable(long tableId) { tableLakeFreshness.remove(tableId); lastTieringResult.remove(tableId); currentTieringStartTime.remove(tableId); + pendingEnterTime.remove(tableId); tableFailureCounters.remove(tableId); // close and remove the metric group to unregister metrics tieringMetricGroup.removeTableLakeTieringMetricGroup(tableId); @@ -579,10 +600,12 @@ private void doHandleStateChange(long tableId, TieringState targetState) { // increase tiering epoch and initialize the heartbeat of the tiering table tableTierEpoch.computeIfPresent(tableId, (t, v) -> v + 1); pendingTieringTables.add(tableId); + pendingEnterTime.put(tableId, clock.milliseconds()); break; case Tiering: liveTieringTableIds.put(tableId, clock.milliseconds()); currentTieringStartTime.put(tableId, clock.milliseconds()); + pendingEnterTime.put(tableId, 0L); break; case Tiered: liveTieringTableIds.remove(tableId); diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index 330a326b2f..961d0bc8f3 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM - coordinator + coordinator - activeCoordinatorCount The number of active CoordinatorServer (only leader) in this cluster. @@ -401,7 +401,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM Gauge - lakeTiering_table + lakeTiering_table tierLag Time in milliseconds since the last successful tiering operation for this table. For newly registered tables that have never completed a tiering round, the lag is measured from the time the table was registered. Gauge @@ -426,6 +426,16 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM Cumulative total record count of the lake table after the last tiering round. Returns -1 if no tiering has completed yet. Gauge + + pendingTime + How long (in milliseconds) the table has been waiting in the pending queue for tiering. Returns 0 when the table is not currently pending. + Gauge + + + freshness + The user-configured data freshness interval (in milliseconds) for this table. + Gauge + From bfee6d0b1f0f7c3ec5b5c910e0b6a40b211e8b86 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Tue, 14 Apr 2026 17:30:31 +0800 Subject: [PATCH 2/3] rename pendingRecords metric for clarity --- .../src/main/java/org/apache/fluss/metrics/MetricNames.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 650a93abbf..738188237e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -224,13 +224,11 @@ public class MetricNames { // metrics for table bucket // -------------------------------------------------------------------------------------------- - // for tablet - public static final String LAKE_PENDING_RECORDS = "pendingRecords"; - // for log tablet public static final String LOG_NUM_SEGMENTS = "numSegments"; public static final String LOG_END_OFFSET = "endOffset"; public static final String REMOTE_LOG_SIZE = "size"; + public static final String LOG_LAKE_PENDING_RECORDS = "pendingRecords"; public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag"; // for logic storage From 431bd9a539b40d070f6ecaa72abed8f90cad5816 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Wed, 15 Apr 2026 10:34:38 +0800 Subject: [PATCH 3/3] rebase --- .../src/main/java/org/apache/fluss/metrics/MetricNames.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 738188237e..650a93abbf 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -224,11 +224,13 @@ public class MetricNames { // metrics for table bucket // -------------------------------------------------------------------------------------------- + // for tablet + public static final String LAKE_PENDING_RECORDS = "pendingRecords"; + // for log tablet public static final String LOG_NUM_SEGMENTS = "numSegments"; public static final String LOG_END_OFFSET = "endOffset"; public static final String REMOTE_LOG_SIZE = "size"; - public static final String LOG_LAKE_PENDING_RECORDS = "pendingRecords"; public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag"; // for logic storage