diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index c1cc2b5b69..650a93abbf 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -70,6 +70,8 @@ public class MetricNames { public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL = "failuresTotal"; public static final String LAKE_TIERING_TABLE_FILE_SIZE = "fileSize"; public static final String LAKE_TIERING_TABLE_RECORD_COUNT = "recordCount"; + public static final String LAKE_TIERING_TABLE_PENDING_TIME = "pendingTime"; + public static final String LAKE_TIERING_TABLE_FRESHNESS = "freshness"; // -------------------------------------------------------------------------------------------- // metrics for tablet server diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java index a3448d148b..439092b898 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java @@ -141,6 +141,9 @@ public class LakeTableTieringManager implements AutoCloseable { // table_id -> start time (ms) of the currently in-progress tiering round private final Map currentTieringStartTime; + // table_id -> time (ms) when the table entered pending queue + private final Map pendingEnterTime; + // the live tables that are tiering, // from table_id -> last heartbeat time by the tiering service private final Map liveTieringTableIds; @@ -184,6 +187,7 @@ protected LakeTableTieringManager( this.delayedTieringByTableId = new HashMap<>(); this.tableFailureCounters = new HashMap<>(); this.currentTieringStartTime = new HashMap<>(); + this.pendingEnterTime = new HashMap<>(); this.tieringMetricGroup = lakeTieringMetricGroup; registerMetrics(); } @@ -279,6 +283,17 @@ private void registerTableMetrics(long tableId, TablePath tablePath) { MetricNames.LAKE_TIERING_TABLE_TIER_DURATION, () -> inReadLock(lock, () -> getLastResultField(tableId, r -> r.tierDuration))); + // pendingTime: how long the table has been waiting in the pending queue + tableMetricGroup.gauge( + MetricNames.LAKE_TIERING_TABLE_PENDING_TIME, + () -> + inReadLock( + lock, + () -> { + long enterTime = pendingEnterTime.getOrDefault(tableId, 0L); + return enterTime > 0 ? clock.milliseconds() - enterTime : 0L; + })); + // failuresTotal: total failure count for this table Counter failuresCounter = tableMetricGroup.counter(MetricNames.LAKE_TIERING_TABLE_FAILURES_TOTAL); @@ -293,6 +308,11 @@ private void registerTableMetrics(long tableId, TablePath tablePath) { tableMetricGroup.gauge( MetricNames.LAKE_TIERING_TABLE_RECORD_COUNT, () -> inReadLock(lock, () -> getLastResultField(tableId, r -> r.recordCount))); + + // freshness: the user-configured table data freshness interval in milliseconds + tableMetricGroup.gauge( + MetricNames.LAKE_TIERING_TABLE_FRESHNESS, + () -> inReadLock(lock, () -> tableLakeFreshness.getOrDefault(tableId, -1L))); } /** @@ -316,6 +336,7 @@ public void removeLakeTable(long tableId) { tableLakeFreshness.remove(tableId); lastTieringResult.remove(tableId); currentTieringStartTime.remove(tableId); + pendingEnterTime.remove(tableId); tableFailureCounters.remove(tableId); // close and remove the metric group to unregister metrics tieringMetricGroup.removeTableLakeTieringMetricGroup(tableId); @@ -579,10 +600,12 @@ private void doHandleStateChange(long tableId, TieringState targetState) { // increase tiering epoch and initialize the heartbeat of the tiering table tableTierEpoch.computeIfPresent(tableId, (t, v) -> v + 1); pendingTieringTables.add(tableId); + pendingEnterTime.put(tableId, clock.milliseconds()); break; case Tiering: liveTieringTableIds.put(tableId, clock.milliseconds()); currentTieringStartTime.put(tableId, clock.milliseconds()); + pendingEnterTime.put(tableId, 0L); break; case Tiered: liveTieringTableIds.remove(tableId); diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index 330a326b2f..961d0bc8f3 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM - coordinator + coordinator - activeCoordinatorCount The number of active CoordinatorServer (only leader) in this cluster. @@ -401,7 +401,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM Gauge - lakeTiering_table + lakeTiering_table tierLag Time in milliseconds since the last successful tiering operation for this table. For newly registered tables that have never completed a tiering round, the lag is measured from the time the table was registered. Gauge @@ -426,6 +426,16 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM Cumulative total record count of the lake table after the last tiering round. Returns -1 if no tiering has completed yet. Gauge + + pendingTime + How long (in milliseconds) the table has been waiting in the pending queue for tiering. Returns 0 when the table is not currently pending. + Gauge + + + freshness + The user-configured data freshness interval (in milliseconds) for this table. + Gauge +