diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java index 86020c0fd27..26cb3349575 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java @@ -127,9 +127,16 @@ public void dec() { } @Override - public void add(long l) { - firstCounter.add(l); - secondCounter.add(l); + public void addCount(long l) { + firstCounter.addCount(l); + secondCounter.addCount(l); + } + + @Override + public void addLatency(long eventLatency, TimeUnit unit) { + long valueMillis = unit.toMillis(eventLatency); + firstCounter.addCount(valueMillis); + secondCounter.addCount(valueMillis); } @Override diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java index c2a06d6f414..d3299b077e8 100644 --- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java @@ -60,10 +60,16 @@ public void dec() { } @Override - public void add(long delta) { + public void addCount(long delta) { updateMax(val.addAndGet(delta)); } + @Override + public void addLatency(long eventLatency, TimeUnit unit) { + long valueMillis = unit.toMillis(eventLatency); + updateMax(val.addAndGet(valueMillis)); + } + @Override public Long get() { return val.get(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 404743a7617..df11852235f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -931,7 +931,7 @@ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, long ledgerId = handle.getLedgerId(); long entryId = handle.addEntry(entry); - bookieStats.getWriteBytes().add(entry.readableBytes()); + bookieStats.getWriteBytes().addCount(entry.readableBytes()); // journal `addEntry` should happen after the entry is added to ledger storage. // otherwise the journal entry can potentially be rolled before the ledger is created in ledger storage. @@ -1110,7 +1110,7 @@ public ByteBuf readEntry(long ledgerId, long entryId) } ByteBuf entry = handle.readEntry(entryId); entrySize = entry.readableBytes(); - bookieStats.getReadBytes().add(entrySize); + bookieStats.getReadBytes().addCount(entrySize); success = true; return entry; } finally { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java index b868e466578..332b152a423 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java @@ -258,7 +258,7 @@ long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws } } } - memTableStats.getFlushBytesCounter().add(size); + memTableStats.getFlushBytesCounter().addCount(size); clearSnapshot(keyValues); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java index d092f2064ce..42a7f19a657 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java @@ -139,7 +139,7 @@ public void safeRun() { throw new IOException("Failed to complete the flushSnapshotByParallelizing", exceptionWhileFlushingParallelly.get()); } - memTableStats.getFlushBytesCounter().add(flushedSize.get()); + memTableStats.getFlushBytesCounter().addCount(flushedSize.get()); clearSnapshot(keyValues); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index c89acf9a052..e7549aaddb5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -489,7 +489,7 @@ private void doGcEntryLogs() throws EntryLogMetadataMapException { // We can remove this entry log file now. LOG.info("Deleting entryLogId {} as it has no active ledgers!", entryLogId); removeEntryLog(entryLogId); - gcStats.getReclaimedSpaceViaDeletes().add(meta.getTotalSize()); + gcStats.getReclaimedSpaceViaDeletes().addCount(meta.getTotalSize()); } else if (modified) { // update entryLogMetaMap only when the meta modified. entryLogMetaMap.put(meta.getEntryLogId(), meta); @@ -607,7 +607,7 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet long priorRemainingSize = meta.getRemainingSize(); compactEntryLog(meta); - gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize); + gcStats.getReclaimedSpaceViaCompaction().addCount(meta.getTotalSize() - priorRemainingSize); compactedBuckets[bucketIndex]++; }); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 5b60eae2e8a..69a0ac85663 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -346,7 +346,7 @@ public void run() { journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS); cb.writeComplete(0, ledgerId, entryId, null, ctx); recycle(); - callbackTime.add(MathUtils.elapsedNanos(startTime)); + callbackTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } private final Handle recyclerHandle; @@ -515,7 +515,7 @@ public void run() { while (running) { ForceWriteRequest req = null; try { - forceWriteThreadTime.add(MathUtils.elapsedNanos(busyStartTime)); + forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS); req = forceWriteRequests.take(); busyStartTime = System.nanoTime(); // Force write the file and then notify the write completions @@ -1081,7 +1081,7 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(), } if (numEntriesToFlush == 0) { - journalTime.add(MathUtils.elapsedNanos(busyStartTime)); + journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS); qe = queue.take(); dequeueStartTime = MathUtils.nowInNano(); busyStartTime = dequeueStartTime; @@ -1230,7 +1230,7 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(), qe.entry.release(); } else if (qe.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) { int entrySize = qe.entry.readableBytes(); - journalStats.getJournalWriteBytes().add(entrySize); + journalStats.getJournalWriteBytes().addCount(entrySize); batchSize += (4 + entrySize); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java index b2fe868492d..4ea6dd27e48 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java @@ -111,7 +111,7 @@ protected void doCheckpoint(Checkpoint checkpoint) { log.error("Exception in SyncThread", t); dirsListener.fatalError(); } finally { - syncExecutorTime.add(MathUtils.elapsedNanos(startTime)); + syncExecutorTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); } @@ -124,7 +124,7 @@ public Future requestFlush() { } catch (Throwable t) { log.error("Exception flushing ledgers ", t); } finally { - syncExecutorTime.add(MathUtils.elapsedNanos(startTime)); + syncExecutorTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 0c5f328d2d2..f450906de56 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -212,7 +212,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le ThreadRegistry.register(dbStoragerExecutorName, 0); // ensure the metric gets registered on start-up as this thread only executes // when the write cache is full which may not happen or not for a long time - flushExecutorTime.add(0); + flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS); }); ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); @@ -463,7 +463,7 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) } catch (IOException e) { log.error("Error during flush", e); } finally { - flushExecutorTime.add(MathUtils.elapsedNanos(startTime)); + flushExecutorTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); } @@ -570,14 +570,16 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book throw new NoEntryException(ledgerId, entryId); } } finally { - dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano)); + dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency( + MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS); } long readEntryStartNano = MathUtils.nowInNano(); try { entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); } finally { - dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano)); + dbLedgerStorageStats.getReadFromEntryLogTime().addLatency( + MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS); } readCache.put(ledgerId, entryId, entry); @@ -634,7 +636,8 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi } finally { dbLedgerStorageStats.getReadAheadBatchCountStats().registerSuccessfulValue(count); dbLedgerStorageStats.getReadAheadBatchSizeStats().registerSuccessfulValue(size); - dbLedgerStorageStats.getReadAheadTime().add(MathUtils.elapsedNanos(readAheadStartNano)); + dbLedgerStorageStats.getReadAheadTime().addLatency( + MathUtils.elapsedNanos(readAheadStartNano), TimeUnit.NANOSECONDS); } } @@ -689,11 +692,13 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException, BookieException { } long entryLocation = entryLocationIndex.getLocation(ledgerId, lastEntryId); - dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano)); + dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency( + MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS); long readEntryStartNano = MathUtils.nowInNano(); ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, entryLocation); - dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano)); + dbLedgerStorageStats.getReadFromEntryLogTime().addLatency( + MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS); return content; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java index 4c027a61401..68ebc9fa916 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java @@ -105,7 +105,7 @@ private void run() { try { List errors = ledgerStorage.localConsistencyCheck(scrubRateLimiter); if (errors.size() > 0) { - errorCounter.add(errors.size()); + errorCounter.addCount(errors.size()); LOG.error("Found inconsistency during localConsistencyCheck:"); for (LedgerStorage.DetectedInconsistency error : errors) { LOG.error("Ledger {}, entry {}: ", error.getLedgerId(), error.getEntryId(), error.getException()); diff --git a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/Counter.java b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/Counter.java index 7160d7ccc13..8f70f44e697 100644 --- a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/Counter.java +++ b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/Counter.java @@ -16,6 +16,8 @@ */ package org.apache.bookkeeper.stats; +import java.util.concurrent.TimeUnit; + /** * Simple stats that require only increment and decrement * functions on a Long. Metrics like the number of topics, persist queue size @@ -41,7 +43,15 @@ public interface Counter { * Add delta to the value associated with this stat. * @param delta */ - void add(long delta); + void addCount(long delta); + + /** + * An operation succeeded with the given eventLatency. Update + * stats to reflect the same + * @param eventLatency The event latency + * @param unit + */ + void addLatency(long eventLatency, TimeUnit unit); /** * Get the value associated with this stat. diff --git a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java index ba6be364d39..a130b76b674 100644 --- a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java +++ b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java @@ -88,7 +88,12 @@ public void dec() { } @Override - public void add(long delta) { + public void addCount(long delta) { + // nop + } + + @Override + public void addLatency(long eventLatency, TimeUnit unit) { // nop } diff --git a/stats/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java b/stats/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java index ade182d1c7f..4851362efd2 100644 --- a/stats/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java +++ b/stats/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java @@ -20,6 +20,7 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -70,9 +71,15 @@ public void dec() { } @Override - public void add(long delta) { + public void addCount(long delta) { c.inc(delta); } + + @Override + public void addLatency(long eventLatency, TimeUnit unit) { + long valueMillis = unit.toMillis(eventLatency); + c.inc(valueMillis); + } }; } diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java index 9e4f28c54e3..687931ef5d9 100644 --- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java +++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java @@ -17,6 +17,7 @@ package org.apache.bookkeeper.stats.prometheus; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.stats.Counter; @@ -54,10 +55,20 @@ public void dec() { } @Override - public void add(long delta) { + public void addCount(long delta) { counter.add(delta); } + /** + * When counter is used to count time. + * consistent with the {@link DataSketchesOpStatsLogger#registerSuccessfulEvent(long, TimeUnit)} 's logic + * */ + @Override + public void addLatency(long eventLatency, TimeUnit unit) { + long valueMillis = unit.toMillis(eventLatency); + counter.add(valueMillis); + } + @Override public Long get() { return counter.sum(); diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java index 5b0110da2b1..ad717e55a59 100644 --- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java +++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.ThreadRegistry; @@ -65,8 +66,13 @@ public void dec() { } @Override - public void add(long delta) { - getCounter().add(delta); + public void addCount(long delta) { + getCounter().addCount(delta); + } + + @Override + public void addLatency(long eventLatency, TimeUnit unit) { + getCounter().addLatency(eventLatency, unit); } @Override diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java index 167b79295f2..93084fa053e 100644 --- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java +++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; @@ -98,7 +99,16 @@ public void testCounter() { assertEquals(1L, counter.get().longValue()); counter.dec(); assertEquals(0L, counter.get().longValue()); - counter.add(3); + counter.addCount(3); + assertEquals(3L, counter.get().longValue()); + } + + @Test + public void testCounter2() { + LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap()); + long value = counter.get(); + assertEquals(0L, value); + counter.addLatency(3 * 1000 * 1000L, TimeUnit.NANOSECONDS); assertEquals(3L, counter.get().longValue()); } diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java index 5c9d035d481..55bf6dd5088 100644 --- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java +++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java @@ -127,9 +127,16 @@ public void dec() { } @Override - public void add(long l) { - firstCounter.add(l); - secondCounter.add(l); + public void addCount(long l) { + firstCounter.addCount(l); + secondCounter.addCount(l); + } + + @Override + public void addLatency(long eventLatency, TimeUnit unit) { + long valueMillis = unit.toMillis(eventLatency); + firstCounter.addCount(valueMillis); + secondCounter.addCount(valueMillis); } @Override diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java index 780107ed84a..819a43c15a1 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java @@ -367,7 +367,7 @@ public void onSuccess(BKLogSegmentWriter writer) { FutureUtils.complete(rollingFuture, writer); } rollingFuture = null; - pendingRequestDispatch.add(pendingRequests.size()); + pendingRequestDispatch.addCount(pendingRequests.size()); pendingRequests = null; } } catch (IOException ioe) { @@ -395,7 +395,7 @@ void errorOutPendingRequests(Throwable cause, boolean errorOutWriter) { rollingFuture = null; } - pendingRequestDispatch.add(pendingRequestsSnapshot.size()); + pendingRequestDispatch.addCount(pendingRequestsSnapshot.size()); // After erroring out the writer above, no more requests // will be enqueued to pendingRequests