Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,35 +94,37 @@ public class MetricNames {
public static final String FAILED_PREFIX_LOOKUP_REQUESTS_RATE =
"failedPrefixLookupRequestsPerSecond";

// --------------------------------------------------------------------------------------------
// metrics for table bucket
// --------------------------------------------------------------------------------------------

// for replica
public static final String UNDER_REPLICATED = "underReplicated";
public static final String IN_SYNC_REPLICAS = "inSyncReplicasCount";
public static final String UNDER_MIN_ISR = "underMinIsr";
public static final String AT_MIN_ISR = "atMinIsr";
public static final String ISR_EXPANDS_RATE = "isrExpandsPerSecond";
public static final String ISR_SHRINKS_RATE = "isrShrinksPerSecond";
public static final String FAILED_ISR_UPDATES_RATE = "failedIsrUpdatesPerSecond";

// for log tablet
public static final String LOG_NUM_SEGMENTS = "numSegments";
public static final String LOG_END_OFFSET = "endOffset";
public static final String LOG_SIZE = "size";
public static final String LOG_FLUSH_RATE = "flushPerSecond";
public static final String LOG_FLUSH_LATENCY_MS = "flushLatencyMs";
public static final String LOG_FLUSH_RATE = "logFlushPerSecond";
public static final String LOG_FLUSH_LATENCY_MS = "logFlushLatencyMs";

// for kv tablet
public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
public static final String KV_FLUSH_RATE = "kvFlushPerSecond";
public static final String KV_FLUSH_LATENCY_MS = "kvFlushLatencyMs";
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove the void registerMetrics(BucketMetricGroup bucketMetricGroup) of KvTablet, because we don't need to report bucket-level pre-write-buffer metrics.

"preWriteBufferTruncateAsDuplicatedPerSecond";
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
"preWriteBufferTruncateAsErrorPerSecond";
public static final String KV_PRE_WRITE_BUFFER_FLUSH_RATE = "preWriteBufferFlushPerSecond";
public static final String KV_PRE_WRITE_BUFFER_FLUSH_LATENCY_MS =
"preWriteBufferFlushLatencyMs";

// --------------------------------------------------------------------------------------------
// metrics for table bucket
// --------------------------------------------------------------------------------------------

// for log tablet
public static final String LOG_NUM_SEGMENTS = "numSegments";
public static final String LOG_END_OFFSET = "endOffset";
public static final String LOG_SIZE = "size";

// for kv tablet
public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";

// --------------------------------------------------------------------------------------------
// metrics for rpc client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.log.LogManager;
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -69,6 +70,8 @@ public final class KvManager extends TabletManagerBase {
private static final Logger LOG = LoggerFactory.getLogger(KvManager.class);
private final LogManager logManager;

private final TabletServerMetricGroup serverMetricGroup;

private final ZooKeeperClient zkClient;

private final Map<TableBucket, KvTablet> currentKvs = MapUtils.newConcurrentHashMap();
Expand All @@ -91,7 +94,8 @@ private KvManager(
Configuration conf,
ZooKeeperClient zkClient,
int recoveryThreadsPerDataDir,
LogManager logManager)
LogManager logManager,
TabletServerMetricGroup tabletServerMetricGroup)
throws IOException {
super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
this.logManager = logManager;
Expand All @@ -100,10 +104,14 @@ private KvManager(
this.zkClient = zkClient;
this.remoteKvDir = FlussPaths.remoteKvDir(conf);
this.remoteFileSystem = remoteKvDir.getFileSystem();
this.serverMetricGroup = tabletServerMetricGroup;
}

public static KvManager create(
Configuration conf, ZooKeeperClient zkClient, LogManager logManager)
Configuration conf,
ZooKeeperClient zkClient,
LogManager logManager,
TabletServerMetricGroup tabletServerMetricGroup)
throws IOException {
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
File dataDir = new File(dataDirString).getAbsoluteFile();
Expand All @@ -112,7 +120,8 @@ public static KvManager create(
conf,
zkClient,
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
logManager);
logManager,
tabletServerMetricGroup);
}

public void startup() {
Expand Down Expand Up @@ -171,6 +180,7 @@ public KvTablet getOrCreateKv(
logTablet,
tabletDir,
conf,
serverMetricGroup,
arrowBufferAllocator,
memorySegmentPool,
kvFormat,
Expand Down Expand Up @@ -279,6 +289,7 @@ public KvTablet loadKv(File tabletDir) throws Exception {
logTablet,
tabletDir,
conf,
serverMetricGroup,
arrowBufferAllocator,
memorySegmentPool,
tableInfo.getTableConfig().getKvFormat(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.MeterView;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.KvRecord;
import org.apache.fluss.record.KvRecordBatch;
Expand All @@ -55,7 +52,7 @@
import org.apache.fluss.server.kv.wal.WalBuilder;
import org.apache.fluss.server.log.LogAppendInfo;
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.utils.FatalErrorHandler;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.types.DataType;
Expand Down Expand Up @@ -124,6 +121,7 @@ private KvTablet(
TableBucket tableBucket,
LogTablet logTablet,
File kvTabletDir,
TabletServerMetricGroup serverMetricGroup,
RocksDBKv rocksDBKv,
long writeBatchSize,
LogFormat logFormat,
Expand All @@ -139,7 +137,7 @@ private KvTablet(
this.kvTabletDir = kvTabletDir;
this.rocksDBKv = rocksDBKv;
this.writeBatchSize = writeBatchSize;
this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter());
this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter(), serverMetricGroup);
this.logFormat = logFormat;
this.arrowWriterProvider = new ArrowWriterPool(arrowBufferAllocator);
this.memorySegmentPool = memorySegmentPool;
Expand All @@ -153,6 +151,7 @@ public static KvTablet create(
LogTablet logTablet,
File kvTabletDir,
Configuration serverConf,
TabletServerMetricGroup serverMetricGroup,
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat,
Expand All @@ -168,6 +167,7 @@ public static KvTablet create(
logTablet,
kvTabletDir,
serverConf,
serverMetricGroup,
arrowBufferAllocator,
memorySegmentPool,
kvFormat,
Expand All @@ -182,6 +182,7 @@ public static KvTablet create(
LogTablet logTablet,
File kvTabletDir,
Configuration serverConf,
TabletServerMetricGroup serverMetricGroup,
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat,
Expand All @@ -195,6 +196,7 @@ public static KvTablet create(
tableBucket,
logTablet,
kvTabletDir,
serverMetricGroup,
kv,
serverConf.get(ConfigOptions.KV_WRITE_BATCH_SIZE).getBytes(),
logTablet.getLogFormat(),
Expand Down Expand Up @@ -243,24 +245,6 @@ public long getFlushedLogOffset() {
return flushedLogOffset;
}

public void registerMetrics(BucketMetricGroup bucketMetricGroup) {
MetricGroup metricGroup = bucketMetricGroup.addGroup("kv");

// about pre-write buffer.
metricGroup.meter(
MetricNames.KV_PRE_WRITE_BUFFER_FLUSH_RATE,
new MeterView(kvPreWriteBuffer.getFlushCount()));
metricGroup.histogram(
MetricNames.KV_PRE_WRITE_BUFFER_FLUSH_LATENCY_MS,
kvPreWriteBuffer.getFlushLatencyHistogram());
metricGroup.meter(
MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE,
new MeterView(kvPreWriteBuffer.getTruncateAsDuplicatedCount()));
metricGroup.meter(
MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE,
new MeterView(kvPreWriteBuffer.getTruncateAsErrorCount()));
}

/**
* Put the KvRecordBatch into the kv storage, and return the appended wal log info.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metrics.Counter;
import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
import org.apache.fluss.metrics.Histogram;
import org.apache.fluss.metrics.SimpleCounter;
import org.apache.fluss.server.kv.KvBatchWriter;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.utils.MurmurHashUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -61,7 +60,7 @@
* <li>Buffer all the key-value pairs that are waiting for the corresponding WAL to be persisted.
* And flush these key-value pairs whose WAL has been persisted to underlying kv storage.
* <li>A temporary in-memory key-value buffer for put/get a key. Since Fluss will lookup the
* previous written data to generate CDC as WAL, it need a buffer to buffer the data been
* previous written data to generate CDC as WAL, it needs a buffer to buffer the data been
* written before but is still waiting for the WAL to be persisted before flush to underlying
* kv storage.
* </ol>
Expand Down Expand Up @@ -104,14 +103,14 @@ public class KvPreWriteBuffer implements AutoCloseable {
// the max LSN in the buffer
private long maxLogSequenceNumber = -1;

public KvPreWriteBuffer(KvBatchWriter kvBatchWriter) {
public KvPreWriteBuffer(
KvBatchWriter kvBatchWriter, TabletServerMetricGroup serverMetricGroup) {
this.kvBatchWriter = kvBatchWriter;

flushCount = new SimpleCounter();
// consider won't flush frequently, we set a small window size
flushLatencyHistogram = new DescriptiveStatisticsHistogram(5);
truncateAsDuplicatedCount = new SimpleCounter();
truncateAsErrorCount = new SimpleCounter();
flushCount = serverMetricGroup.kvFlushCount();
flushLatencyHistogram = serverMetricGroup.kvFlushLatencyHistogram();
truncateAsDuplicatedCount = serverMetricGroup.kvTruncateAsDuplicatedCount();
truncateAsErrorCount = serverMetricGroup.kvTruncateAsErrorCount();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metrics.Counter;
import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
import org.apache.fluss.metrics.Histogram;
import org.apache.fluss.metrics.SimpleCounter;
import org.apache.fluss.record.FileLogProjection;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.FlussPaths;

Expand Down Expand Up @@ -90,6 +89,7 @@ public final class LocalLog {
public LocalLog(
File logTabletDir,
Configuration config,
TabletServerMetricGroup serverMetricGroup,
LogSegments segments,
long recoveryPoint,
LogOffsetMetadata nextOffsetMetadata,
Expand All @@ -105,9 +105,8 @@ public LocalLog(
this.logFormat = logFormat;

lastFlushedTime = new AtomicLong(System.currentTimeMillis());
flushCount = new SimpleCounter();
// consider won't flush frequently, we set a small window size
flushLatencyHistogram = new DescriptiveStatisticsHistogram(5);
flushCount = serverMetricGroup.logFlushCount();
flushLatencyHistogram = serverMetricGroup.logFlushLatencyHistogram();
localLogStartOffset = segments.isEmpty() ? 0L : segments.firstSegmentBaseOffset().get();
localMaxTimestamp =
segments.isEmpty() ? 0L : segments.lastSegment().get().maxTimestampSoFar();
Expand All @@ -125,14 +124,6 @@ long getRecoveryPoint() {
return recoveryPoint;
}

Histogram getFlushLatencyHistogram() {
return flushLatencyHistogram;
}

Counter getFlushCount() {
return flushCount;
}

/** The offset metadata of the next message that will be appended to the log. */
@VisibleForTesting
LogOffsetMetadata getLocalLogEndOffsetMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.TabletManagerBase;
import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.FlussPaths;
Expand Down Expand Up @@ -87,6 +88,7 @@ public final class LogManager extends TabletManagerBase {
private final ZooKeeperClient zkClient;
private final Scheduler scheduler;
private final Clock clock;
private final TabletServerMetricGroup serverMetricGroup;
private final ReentrantLock logCreationOrDeletionLock = new ReentrantLock();

private final Map<TableBucket, LogTablet> currentLogs = MapUtils.newConcurrentHashMap();
Expand All @@ -100,19 +102,25 @@ private LogManager(
ZooKeeperClient zkClient,
int recoveryThreadsPerDataDir,
Scheduler scheduler,
Clock clock)
Clock clock,
TabletServerMetricGroup serverMetricGroup)
throws Exception {
super(TabletType.LOG, dataDir, conf, recoveryThreadsPerDataDir);
this.zkClient = zkClient;
this.scheduler = scheduler;
this.clock = clock;
this.serverMetricGroup = serverMetricGroup;
createAndValidateDataDir(dataDir);

initializeCheckpointMaps();
}

public static LogManager create(
Configuration conf, ZooKeeperClient zkClient, Scheduler scheduler, Clock clock)
Configuration conf,
ZooKeeperClient zkClient,
Scheduler scheduler,
Clock clock,
TabletServerMetricGroup serverMetricGroup)
throws Exception {
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
File dataDir = new File(dataDirString).getAbsoluteFile();
Expand All @@ -122,7 +130,8 @@ public static LogManager create(
zkClient,
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
scheduler,
clock);
clock,
serverMetricGroup);
}

public void startup() {
Expand Down Expand Up @@ -246,6 +255,7 @@ public LogTablet getOrCreateLog(
tablePath,
tabletDir,
conf,
serverMetricGroup,
0L,
scheduler,
logFormat,
Expand Down Expand Up @@ -348,6 +358,7 @@ private LogTablet loadLog(
physicalTablePath,
tabletDir,
conf,
serverMetricGroup,
logRecoveryPoint,
scheduler,
tableInfo.getTableConfig().getLogFormat(),
Expand Down
Loading