Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP-58: Change the API for org.apache.bookkeeper.stats.Counter #3501

Merged
merged 5 commits into from Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -127,9 +127,16 @@ public void dec() {
}

@Override
public void add(long l) {
firstCounter.add(l);
secondCounter.add(l);
public void addCount(long l) {
firstCounter.addCount(l);
secondCounter.addCount(l);
}

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
long valueMillis = unit.toMicros(eventLatency) / 1000;
firstCounter.addCount(valueMillis);
secondCounter.addCount(valueMillis);
}

@Override
Expand Down
Expand Up @@ -60,10 +60,16 @@ public void dec() {
}

@Override
public void add(long delta) {
public void addCount(long delta) {
updateMax(val.addAndGet(delta));
}

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
long valueMillis = unit.toMicros(eventLatency) / 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just have a question. Why not use unit.toMills?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just have a question. Why not use unit.toMills?

it's the same, just use the logic with OpStatsLogger.registerSuccessfulEvent @zymap
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can both use unit.toMills

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can both use unit.toMills

goog idea, I will update it @shoothzj @zymap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shoothzj have a look the new update for unit.toMillis

updateMax(val.addAndGet(valueMillis));
}

@Override
public Long get() {
return val.get();
Expand Down
Expand Up @@ -931,7 +931,7 @@ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
long ledgerId = handle.getLedgerId();
long entryId = handle.addEntry(entry);

bookieStats.getWriteBytes().add(entry.readableBytes());
bookieStats.getWriteBytes().addCount(entry.readableBytes());

// journal `addEntry` should happen after the entry is added to ledger storage.
// otherwise the journal entry can potentially be rolled before the ledger is created in ledger storage.
Expand Down Expand Up @@ -1110,7 +1110,7 @@ public ByteBuf readEntry(long ledgerId, long entryId)
}
ByteBuf entry = handle.readEntry(entryId);
entrySize = entry.readableBytes();
bookieStats.getReadBytes().add(entrySize);
bookieStats.getReadBytes().addCount(entrySize);
success = true;
return entry;
} finally {
Expand Down
Expand Up @@ -258,7 +258,7 @@ long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws
}
}
}
memTableStats.getFlushBytesCounter().add(size);
memTableStats.getFlushBytesCounter().addCount(size);
clearSnapshot(keyValues);
}
}
Expand Down
Expand Up @@ -139,7 +139,7 @@ public void safeRun() {
throw new IOException("Failed to complete the flushSnapshotByParallelizing",
exceptionWhileFlushingParallelly.get());
}
memTableStats.getFlushBytesCounter().add(flushedSize.get());
memTableStats.getFlushBytesCounter().addCount(flushedSize.get());
clearSnapshot(keyValues);
}
}
Expand Down
Expand Up @@ -489,7 +489,7 @@ private void doGcEntryLogs() throws EntryLogMetadataMapException {
// We can remove this entry log file now.
LOG.info("Deleting entryLogId {} as it has no active ledgers!", entryLogId);
removeEntryLog(entryLogId);
gcStats.getReclaimedSpaceViaDeletes().add(meta.getTotalSize());
gcStats.getReclaimedSpaceViaDeletes().addCount(meta.getTotalSize());
} else if (modified) {
// update entryLogMetaMap only when the meta modified.
entryLogMetaMap.put(meta.getEntryLogId(), meta);
Expand Down Expand Up @@ -607,7 +607,7 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet

long priorRemainingSize = meta.getRemainingSize();
compactEntryLog(meta);
gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize);
gcStats.getReclaimedSpaceViaCompaction().addCount(meta.getTotalSize() - priorRemainingSize);
compactedBuckets[bucketIndex]++;
});
}
Expand Down
Expand Up @@ -346,7 +346,7 @@ public void run() {
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
cb.writeComplete(0, ledgerId, entryId, null, ctx);
recycle();
callbackTime.add(MathUtils.elapsedNanos(startTime));
callbackTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}

private final Handle<QueueEntry> recyclerHandle;
Expand Down Expand Up @@ -515,7 +515,7 @@ public void run() {
while (running) {
ForceWriteRequest req = null;
try {
forceWriteThreadTime.add(MathUtils.elapsedNanos(busyStartTime));
forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS);
req = forceWriteRequests.take();
busyStartTime = System.nanoTime();
// Force write the file and then notify the write completions
Expand Down Expand Up @@ -1081,7 +1081,7 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
}

if (numEntriesToFlush == 0) {
journalTime.add(MathUtils.elapsedNanos(busyStartTime));
journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS);
qe = queue.take();
dequeueStartTime = MathUtils.nowInNano();
busyStartTime = dequeueStartTime;
Expand Down Expand Up @@ -1230,7 +1230,7 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
qe.entry.release();
} else if (qe.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
int entrySize = qe.entry.readableBytes();
journalStats.getJournalWriteBytes().add(entrySize);
journalStats.getJournalWriteBytes().addCount(entrySize);

batchSize += (4 + entrySize);

Expand Down
Expand Up @@ -111,7 +111,7 @@ protected void doCheckpoint(Checkpoint checkpoint) {
log.error("Exception in SyncThread", t);
dirsListener.fatalError();
} finally {
syncExecutorTime.add(MathUtils.elapsedNanos(startTime));
syncExecutorTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
});
}
Expand All @@ -124,7 +124,7 @@ public Future requestFlush() {
} catch (Throwable t) {
log.error("Exception flushing ledgers ", t);
} finally {
syncExecutorTime.add(MathUtils.elapsedNanos(startTime));
syncExecutorTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
});
}
Expand Down
Expand Up @@ -212,7 +212,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
ThreadRegistry.register(dbStoragerExecutorName, 0);
// ensure the metric gets registered on start-up as this thread only executes
// when the write cache is full which may not happen or not for a long time
flushExecutorTime.add(0);
flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS);
});

ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
Expand Down Expand Up @@ -463,7 +463,7 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)
} catch (IOException e) {
log.error("Error during flush", e);
} finally {
flushExecutorTime.add(MathUtils.elapsedNanos(startTime));
flushExecutorTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
});
}
Expand Down Expand Up @@ -570,14 +570,16 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book
throw new NoEntryException(ledgerId, entryId);
}
} finally {
dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano));
dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency(
MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS);
}

long readEntryStartNano = MathUtils.nowInNano();
try {
entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
} finally {
dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano));
dbLedgerStorageStats.getReadFromEntryLogTime().addLatency(
MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS);
}

readCache.put(ledgerId, entryId, entry);
Expand Down Expand Up @@ -634,7 +636,8 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi
} finally {
dbLedgerStorageStats.getReadAheadBatchCountStats().registerSuccessfulValue(count);
dbLedgerStorageStats.getReadAheadBatchSizeStats().registerSuccessfulValue(size);
dbLedgerStorageStats.getReadAheadTime().add(MathUtils.elapsedNanos(readAheadStartNano));
dbLedgerStorageStats.getReadAheadTime().addLatency(
MathUtils.elapsedNanos(readAheadStartNano), TimeUnit.NANOSECONDS);
}
}

Expand Down Expand Up @@ -689,11 +692,13 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException, BookieException {
}

long entryLocation = entryLocationIndex.getLocation(ledgerId, lastEntryId);
dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano));
dbLedgerStorageStats.getReadFromLocationIndexTime().addLatency(
MathUtils.elapsedNanos(locationIndexStartNano), TimeUnit.NANOSECONDS);

long readEntryStartNano = MathUtils.nowInNano();
ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, entryLocation);
dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano));
dbLedgerStorageStats.getReadFromEntryLogTime().addLatency(
MathUtils.elapsedNanos(readEntryStartNano), TimeUnit.NANOSECONDS);
return content;
}

Expand Down
Expand Up @@ -105,7 +105,7 @@ private void run() {
try {
List<LedgerStorage.DetectedInconsistency> errors = ledgerStorage.localConsistencyCheck(scrubRateLimiter);
if (errors.size() > 0) {
errorCounter.add(errors.size());
errorCounter.addCount(errors.size());
LOG.error("Found inconsistency during localConsistencyCheck:");
for (LedgerStorage.DetectedInconsistency error : errors) {
LOG.error("Ledger {}, entry {}: ", error.getLedgerId(), error.getEntryId(), error.getException());
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.bookkeeper.stats;

import java.util.concurrent.TimeUnit;

/**
* Simple stats that require only increment and decrement
* functions on a Long. Metrics like the number of topics, persist queue size
Expand All @@ -41,7 +43,15 @@ public interface Counter {
* Add delta to the value associated with this stat.
* @param delta
*/
void add(long delta);
void addCount(long delta);

/**
* An operation succeeded with the given eventLatency. Update
* stats to reflect the same
* @param eventLatency The event latency
* @param unit
*/
void addLatency(long eventLatency, TimeUnit unit);

/**
* Get the value associated with this stat.
Expand Down
Expand Up @@ -88,7 +88,12 @@ public void dec() {
}

@Override
public void add(long delta) {
public void addCount(long delta) {
// nop
}

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
// nop
}

Expand Down
Expand Up @@ -25,6 +25,8 @@
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;

import java.util.concurrent.TimeUnit;

/**
* A {@link StatsLogger} implemented based on <i>Codahale</i> metrics library.
*/
Expand Down Expand Up @@ -70,9 +72,15 @@ public void dec() {
}

@Override
public void add(long delta) {
public void addCount(long delta) {
c.inc(delta);
}

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
long valueMillis = unit.toMicros(eventLatency) / 1000;
c.inc(valueMillis);
}
};
}

Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.bookkeeper.stats.prometheus;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.stats.Counter;

Expand Down Expand Up @@ -54,10 +55,20 @@ public void dec() {
}

@Override
public void add(long delta) {
public void addCount(long delta) {
counter.add(delta);
}

/**
* When counter is used to count time, consistent with
* the {@link DataSketchesOpStatsLogger#registerSuccessfulEvent(long, TimeUnit)} 's logic
* */
@Override
public void addLatency(long eventLatency, TimeUnit unit) {
long valueMillis = unit.toMicros(eventLatency) / 1000;
counter.add(valueMillis);
}

@Override
public Long get() {
return counter.sum();
Expand Down
Expand Up @@ -18,6 +18,8 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.ThreadRegistry;

Expand Down Expand Up @@ -65,8 +67,13 @@ public void dec() {
}

@Override
public void add(long delta) {
getCounter().add(delta);
public void addCount(long delta) {
getCounter().addCount(delta);
}

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
getCounter().addLatency(eventLatency, unit);
}

@Override
Expand Down
Expand Up @@ -25,6 +25,8 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
Expand Down Expand Up @@ -98,7 +100,16 @@ public void testCounter() {
assertEquals(1L, counter.get().longValue());
counter.dec();
assertEquals(0L, counter.get().longValue());
counter.add(3);
counter.addCount(3);
assertEquals(3L, counter.get().longValue());
}

@Test
public void testCounter2() {
LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap());
long value = counter.get();
assertEquals(0L, value);
counter.addLatency(3 * 1000 * 1000L, TimeUnit.NANOSECONDS);
assertEquals(3L, counter.get().longValue());
}

Expand Down
Expand Up @@ -127,9 +127,16 @@ public void dec() {
}

@Override
public void add(long l) {
firstCounter.add(l);
secondCounter.add(l);
public void addCount(long l) {
firstCounter.addCount(l);
secondCounter.addCount(l);
}

@Override
public void addLatency(long eventLatency, TimeUnit unit) {
long valueMillis = unit.toMicros(eventLatency) / 1000;
firstCounter.addCount(valueMillis);
secondCounter.addCount(valueMillis);
}

@Override
Expand Down