Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.TickerType;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

Expand Down Expand Up @@ -75,6 +76,7 @@ public class RocksDBDAO {
private final String rocksDBBasePath;
private final transient ConcurrentHashMap<String, CustomSerializer<?>> columnFamilySerializers;
private transient WriteOptions defaultWriteOptions;
private transient Statistics statistics;
private final boolean disableWALForWrites;
@Getter
private long totalBytesWritten;
Expand Down Expand Up @@ -113,9 +115,10 @@ private void init() {
managedDescriptorMap = new ConcurrentHashMap<>();

// If already present, loads the existing column-family handles

final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
.setWalDir(rocksDBBasePath).setStatsDumpPeriodSec(300).setStatistics(new Statistics());
.setWalDir(rocksDBBasePath).setStatsDumpPeriodSec(300);
this.statistics = new Statistics();
dbOptions.setStatistics(statistics);
dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) {
@Override
protected void log(InfoLogLevel infoLogLevel, String logMsg) {
Expand Down Expand Up @@ -465,6 +468,20 @@ public void addColumnFamily(String columnFamilyName) {
});
}

/**
* Retrieves a numeric property aggregated across all column families.
*/
public synchronized long getLongProperty(String property) throws RocksDBException {
return closed ? 0L : getRocksDB().getAggregatedLongProperty(property);
}

/**
* Retrieves the current ticker count.
*/
public synchronized long getTickerCount(TickerType tickerType) {
return closed || statistics == null ? 0L : statistics.getTickerCount(tickerType);
}

/**
* Note : Does not delete from underlying DB. Just closes the handle.
*
Expand Down Expand Up @@ -499,6 +516,10 @@ public synchronized void close() {
defaultWriteOptions.close();
}
getRocksDB().close();
if (statistics != null) {
statistics.close();
statistics = null;
}
try {
FileIOUtils.deleteDirectory(new File(rocksDBBasePath));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.metrics;

import org.apache.hudi.sink.partitioner.index.RocksDBIndexBackend;

import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.rocksdb.TickerType;

/**
* Metrics for RocksDB-backed index bootstrap state in flink bucket assign.
*/
public class FlinkRocksDBIndexMetrics extends HoodieFlinkMetrics {
private static final String TOTAL_SST_FILES_SIZE_PROPERTY = "rocksdb.total-sst-files-size";
private static final String LIVE_SST_FILES_SIZE_PROPERTY = "rocksdb.live-sst-files-size";
private static final String BLOCK_CACHE_CAPACITY_PROPERTY = "rocksdb.block-cache-capacity";
private static final String BLOCK_CACHE_USAGE_PROPERTY = "rocksdb.block-cache-usage";
private static final String ACTIVE_MEMTABLE_SIZE_PROPERTY = "rocksdb.cur-size-active-mem-table";
private static final String ALL_MEMTABLES_SIZE_PROPERTY = "rocksdb.cur-size-all-mem-tables";
private static final String IMMUTABLE_MEMTABLE_COUNT_PROPERTY = "rocksdb.num-immutable-mem-table";

public static final String ROCKSDB_DISK_TOTAL_SST_FILES_SIZE = "rocksdb.disk.total_sst_files_size";
public static final String ROCKSDB_DISK_LIVE_SST_FILES_SIZE = "rocksdb.disk.live_sst_files_size";
public static final String ROCKSDB_BLOCK_CACHE_CAPACITY = "rocksdb.block_cache.capacity";
public static final String ROCKSDB_BLOCK_CACHE_USAGE = "rocksdb.block_cache.usage";
public static final String ROCKSDB_BLOCK_CACHE_HIT_RATIO = "rocksdb.block_cache.hit_ratio";
public static final String ROCKSDB_BLOCK_CACHE_DATA_HIT_RATIO = "rocksdb.block_cache.data_hit_ratio";
public static final String ROCKSDB_BLOCK_CACHE_INDEX_HIT_RATIO = "rocksdb.block_cache.index_hit_ratio";
public static final String ROCKSDB_BLOCK_CACHE_FILTER_HIT_RATIO = "rocksdb.block_cache.filter_hit_ratio";

public static final String ROCKSDB_MEMTABLE_ACTIVE_SIZE = "rocksdb.memtable.active_size";
public static final String ROCKSDB_MEMTABLE_ALL_SIZE = "rocksdb.memtable.all_size";
public static final String ROCKSDB_MEMTABLE_IMMUTABLE_COUNT = "rocksdb.memtable.immutable_count";
public static final String ROCKSDB_MEMTABLE_HIT_RATIO = "rocksdb.memtable.hit_ratio";

private final RocksDBIndexBackend rocksDBIndexBackend;

public FlinkRocksDBIndexMetrics(MetricGroup metricGroup, RocksDBIndexBackend rocksDBIndexBackend) {
super(metricGroup);
this.rocksDBIndexBackend = rocksDBIndexBackend;
}

@Override
public void registerMetrics() {
// disk metric
metricGroup.gauge(ROCKSDB_DISK_TOTAL_SST_FILES_SIZE, (Gauge<Long>) this::getTotalSstFilesSize);
metricGroup.gauge(ROCKSDB_DISK_LIVE_SST_FILES_SIZE, (Gauge<Long>) this::getLiveSstFilesSize);

// block cache metric
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_CAPACITY, (Gauge<Long>) this::getBlockCacheCapacity);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_USAGE, (Gauge<Long>) this::getBlockCacheUsage);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_HIT_RATIO, (Gauge<Double>) this::getBlockCacheHitRatio);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_DATA_HIT_RATIO, (Gauge<Double>) this::getBlockCacheDataHitRatio);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_INDEX_HIT_RATIO, (Gauge<Double>) this::getBlockCacheIndexHitRatio);
metricGroup.gauge(ROCKSDB_BLOCK_CACHE_FILTER_HIT_RATIO, (Gauge<Double>) this::getBlockCacheFilterHitRatio);

// mem-table metric
metricGroup.gauge(ROCKSDB_MEMTABLE_ACTIVE_SIZE, (Gauge<Long>) this::getActiveMemTableSize);
metricGroup.gauge(ROCKSDB_MEMTABLE_ALL_SIZE, (Gauge<Long>) this::getAllMemTablesSize);
metricGroup.gauge(ROCKSDB_MEMTABLE_IMMUTABLE_COUNT, (Gauge<Long>) this::getImmutableMemTableCount);
metricGroup.gauge(ROCKSDB_MEMTABLE_HIT_RATIO, (Gauge<Double>) this::getMemTableHitRatio);
}

private long getTotalSstFilesSize() {
return rocksDBIndexBackend.getLongMetric(TOTAL_SST_FILES_SIZE_PROPERTY);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these metrics on RocksDB do they belong to index_cache column family or does it include all the column families?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metrics are not limited to the index_cache column family. The long properties are read via getAggregatedLongProperty(), and the ticker/statistics values are maintained at the RocksDB instance level. But in practice that still reflects this index backend, since the RocksDB instance is dedicated to it.

}

private long getLiveSstFilesSize() {
return rocksDBIndexBackend.getLongMetric(LIVE_SST_FILES_SIZE_PROPERTY);
}

private long getBlockCacheCapacity() {
return rocksDBIndexBackend.getLongMetric(BLOCK_CACHE_CAPACITY_PROPERTY);
}

private long getBlockCacheUsage() {
return rocksDBIndexBackend.getLongMetric(BLOCK_CACHE_USAGE_PROPERTY);
}

private double getBlockCacheHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.BLOCK_CACHE_HIT, TickerType.BLOCK_CACHE_MISS);
}

private double getBlockCacheDataHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.BLOCK_CACHE_DATA_HIT, TickerType.BLOCK_CACHE_DATA_MISS);
}

private double getBlockCacheIndexHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.BLOCK_CACHE_INDEX_HIT, TickerType.BLOCK_CACHE_INDEX_MISS);
}

private double getBlockCacheFilterHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.BLOCK_CACHE_FILTER_HIT, TickerType.BLOCK_CACHE_FILTER_MISS);
}

private long getActiveMemTableSize() {
return rocksDBIndexBackend.getLongMetric(ACTIVE_MEMTABLE_SIZE_PROPERTY);
}

private long getAllMemTablesSize() {
return rocksDBIndexBackend.getLongMetric(ALL_MEMTABLES_SIZE_PROPERTY);
}

private long getImmutableMemTableCount() {
return rocksDBIndexBackend.getLongMetric(IMMUTABLE_MEMTABLE_COUNT_PROPERTY);
}

private double getMemTableHitRatio() {
return rocksDBIndexBackend.getRatioMetric(TickerType.MEMTABLE_HIT, TickerType.MEMTABLE_MISS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void open(Configuration parameters) throws Exception {
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.indexBackend = IndexBackendFactory.create(conf, context, getRuntimeContext());
this.indexBackend.registerMetrics(getRuntimeContext().getMetricGroup());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we are emitting the metrics only at the intialization. Can we emit these metrics as part of every checkpoint and reset the hit and miss counters?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are registered as Flink gauges, so they are not emitted only once during initializeState(). After registration, the reporter keeps polling the current values periodically.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.sink.event.Correspondent;

import org.apache.flink.metrics.MetricGroup;

import java.io.Closeable;
import java.io.IOException;

Expand Down Expand Up @@ -64,4 +66,13 @@ default void onCheckpoint(long checkpointId) {
default void onCheckpointComplete(Correspondent correspondent, long completedCheckpointId) {
// do nothing.
}

/**
* Registers metrics for this backend.
*
* @param metricGroup flink metric group
*/
default void registerMetrics(MetricGroup metricGroup) {
// do nothing.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,25 @@
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.serialization.CustomSerializer;
import org.apache.hudi.common.util.collection.RocksDBDAO;
import org.apache.hudi.metrics.FlinkRocksDBIndexMetrics;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.metrics.MetricGroup;
import org.rocksdb.RocksDBException;
import org.rocksdb.TickerType;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
* An implementation of {@link IndexBackend} based on RocksDB.
*/
@Slf4j
public class RocksDBIndexBackend implements IndexBackend {
private static final String COLUMN_FAMILY = "index_cache";

private final RocksDBDAO rocksDBDAO;
private transient FlinkRocksDBIndexMetrics rocksDBIndexMetrics;

public RocksDBIndexBackend(String rocksDbBasePath) {
// Register custom serializer for HoodieRecordGlobalLocation to minimize storage overhead
Expand All @@ -52,6 +60,36 @@ public void update(String recordKey, HoodieRecordGlobalLocation recordGlobalLoca
this.rocksDBDAO.put(COLUMN_FAMILY, recordKey, recordGlobalLocation);
}

@Override
public void registerMetrics(MetricGroup metricGroup) {
if (rocksDBIndexMetrics != null) {
return;
}
this.rocksDBIndexMetrics = new FlinkRocksDBIndexMetrics(metricGroup, this);
this.rocksDBIndexMetrics.registerMetrics();
}

public long getLongMetric(String property) {
try {
return this.rocksDBDAO.getLongProperty(property);
} catch (RocksDBException | RuntimeException e) {
log.debug("Failed to read RocksDB metric property {}", property, e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The two getTickerCount calls are not atomic — the lock is released between them. On shutdown, this could produce a brief 100% hit ratio (e.g., first call returns a real hit count, then close() runs, second call returns 0 for misses). Is this acceptable for your use case, or would it be worth adding a single synchronized method on RocksDBDAO that fetches both tickers atomically?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Copy link
Copy Markdown
Collaborator Author

@cshuo cshuo Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's acceptable for metrics, since it only happens when the job is being shutdown.

return 0L;
}
}

public double getRatioMetric(TickerType hitTicker, TickerType missTicker) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The hit ratios here are computed from cumulative tickers since DB open, so they reflect the all-time ratio rather than a recent window. For long-running Flink jobs, this means the metric becomes very stable over time and won't surface recent changes in cache behavior. Have you considered computing a delta-based ratio (comparing ticker values between consecutive gauge polls) to make the metric more responsive?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

try {
long hits = this.rocksDBDAO.getTickerCount(hitTicker);
long misses = this.rocksDBDAO.getTickerCount(missTicker);
long total = hits + misses;
return total == 0 ? 0D : (double) hits / total;
} catch (RuntimeException e) {
log.debug("Failed to read RocksDB ticker metrics {} and {}", hitTicker, missTicker, e);
return 0D;
}
}

@Override
public void close() throws IOException {
this.rocksDBDAO.close();
Expand Down
Loading
Loading