Skip to content
This repository was archived by the owner on Oct 16, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,14 @@ public void run() {
writer.write(new LogRecord(requestMillis, data)).whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
requestStat.registerSuccessfulEvent(
System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
}

@Override
public void onFailure(Throwable cause) {
requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
requestStat.registerFailedEvent(
System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
LOG.error("Failed to publish, rescue it : ", cause);
scheduleRescue(streamIdx, writer, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,14 @@ public void processRecord(final LogRecordWithDLSN record) {
long e2eLatency = curTimeMillis - msg.getPublishTime();
long deliveryLatency = curTimeMillis - record.getTransactionId();
if (e2eLatency >= 0) {
e2eStat.registerSuccessfulEvent(e2eLatency);
e2eStat.registerSuccessfulEvent(e2eLatency, TimeUnit.MILLISECONDS);
} else {
negativeE2EStat.registerSuccessfulEvent(-e2eLatency);
negativeE2EStat.registerSuccessfulEvent(-e2eLatency, TimeUnit.MILLISECONDS);
}
if (deliveryLatency >= 0) {
deliveryStat.registerSuccessfulEvent(deliveryLatency);
deliveryStat.registerSuccessfulEvent(deliveryLatency, TimeUnit.MILLISECONDS);
} else {
negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency);
negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency, TimeUnit.MILLISECONDS);
}

prevDLSN = record.getDlsn();
Expand Down Expand Up @@ -200,12 +200,14 @@ public void run() {
new FutureEventListener<Boolean>() {
@Override
public void onSuccess(Boolean value) {
truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
truncationStat.registerSuccessfulEvent(
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}

@Override
public void onFailure(Throwable cause) {
truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
truncationStat.registerFailedEvent(
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
LOG.error("Failed to truncate stream {} to {} : ",
new Object[]{streamName, dlsnToTruncate, cause});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,12 @@ public void onFailure(Throwable cause) {
@Override
public void run() {
if (null != dlsn) {
requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
requestStat.registerSuccessfulEvent(
System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
} else {
LOG.error("Failed to publish to {} : ", streamName, cause);
requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
requestStat.registerFailedEvent(
System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
exceptionsLogger.getCounter(cause.getClass().getName()).inc();
if (cause instanceof DLException) {
DLException dle = (DLException) cause;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,12 @@ protected void benchmark(Namespace namespace, String logName, StatsLogger statsL
reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN));
}
long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
openReaderStats.registerSuccessfulEvent(elapsedMs);
openReaderStats.registerSuccessfulEvent(elapsedMs, TimeUnit.MICROSECONDS);
logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}",
lastTxId, lastDLSN);
} catch (Exception ioe) {
openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
openReaderStats.registerFailedEvent(
stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.",
new Object[] { streamName, lastTxId, lastDLSN });
}
Expand All @@ -133,7 +134,7 @@ protected void benchmark(Namespace namespace, String logName, StatsLogger statsL
stopwatch.start();
records = FutureUtils.result(reader.readBulk(batchSize));
long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
blockingReadStats.registerSuccessfulEvent(elapsedMicros);
blockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
if (!records.isEmpty()) {
readCounter.add(records.size());
LogRecordWithDLSN lastRecord = records.get(records.size() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ protected void benchmark(Namespace namespace, String streamName, StatsLogger sta
try {
reader = dlm.getInputStream(lastTxId);
long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
openReaderStats.registerSuccessfulEvent(elapsedMs);
openReaderStats.registerSuccessfulEvent(elapsedMs, TimeUnit.MICROSECONDS);
logger.info("It took {} ms to position the reader to transaction id {}", lastTxId);
} catch (IOException ioe) {
openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
openReaderStats.registerFailedEvent(
stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
logger.warn("Failed to create reader for stream {} reading from {}.", streamName, lastTxId);
}
if (null == reader) {
Expand Down Expand Up @@ -129,11 +130,11 @@ record = reader.readNext(nonBlocking);
if (null != record) {
long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
if (nonBlocking) {
nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros);
nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
} else {
numCatchupBytes += record.getPayload().length;
++numCatchupReads;
blockingReadStats.registerSuccessfulEvent(elapsedMicros);
blockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
}
lastTxId = record.getTransactionId();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.CachingStatsLogger;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
Expand Down Expand Up @@ -63,16 +64,29 @@ public OpStatsLogger getOpStatsLogger(final String statName) {
final OpStatsLogger firstLogger = first.getOpStatsLogger(statName);
final OpStatsLogger secondLogger = second.getOpStatsLogger(statName);
return new OpStatsLogger() {

@Override
public void registerFailedEvent(long l, TimeUnit timeUnit) {
firstLogger.registerFailedEvent(l, timeUnit);
secondLogger.registerFailedEvent(l, timeUnit);
}

@Override
public void registerSuccessfulEvent(long l, TimeUnit timeUnit) {
firstLogger.registerSuccessfulEvent(l, timeUnit);
secondLogger.registerSuccessfulEvent(l, timeUnit);
}

@Override
public void registerFailedEvent(long l) {
firstLogger.registerFailedEvent(l);
secondLogger.registerFailedEvent(l);
public void registerSuccessfulValue(long l) {
firstLogger.registerSuccessfulValue(l);
secondLogger.registerSuccessfulValue(l);
}

@Override
public void registerSuccessfulEvent(long l) {
firstLogger.registerSuccessfulEvent(l);
secondLogger.registerSuccessfulEvent(l);
public void registerFailedValue(long l) {
firstLogger.registerFailedValue(l);
secondLogger.registerFailedValue(l);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public OpStatsListener(OpStatsLogger opStatsLogger) {

@Override
public void onSuccess(T value) {
opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}

@Override
public void onFailure(Throwable cause) {
opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ public void testStatsSuccess() throws Exception {
Stopwatch.createStarted());
underlyFuture.complete(1234L);
FutureUtils.result(statsFuture);
verify(statsLogger, times(1)).registerSuccessfulEvent(anyLong());
verify(statsLogger, times(1))
.registerSuccessfulEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
}

@Test
Expand All @@ -325,7 +326,8 @@ public void testStatsFailure() throws Exception {
Stopwatch.createStarted());
underlyFuture.completeExceptionally(new TestException());
FutureUtils.result(FutureUtils.ignore(statsFuture));
verify(statsLogger, times(1)).registerFailedEvent(anyLong());
verify(statsLogger, times(1))
.registerFailedEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
}

@Test
Expand Down
7 changes: 7 additions & 0 deletions distributedlog-core-twitter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,11 @@
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>twitter-repo</id>
<name>Twitter Maven Repo</name>
<url>http://maven.twttr.com</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ public OpStatsListener(OpStatsLogger opStatsLogger) {

@Override
public void onSuccess(T value) {
opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
opStatsLogger.registerSuccessfulEvent(
stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}

@Override
public void onFailure(Throwable cause) {
opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
opStatsLogger.registerFailedEvent(
stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,9 @@
*/
package org.apache.bookkeeper.client;

import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
Expand All @@ -36,6 +29,12 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Reader used for DL tools to read entries
Expand Down Expand Up @@ -93,17 +92,21 @@ public void readEntriesFromAllBookies(final LedgerHandle lh, long eid,
final Set<ReadResult<InputStream>> readResults = new HashSet<ReadResult<InputStream>>();
ReadEntryCallback readEntryCallback = new ReadEntryCallback() {
@Override
public void readEntryComplete(int rc, long lid, long eid, ChannelBuffer buffer, Object ctx) {
public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) {
BookieSocketAddress bookieAddress = (BookieSocketAddress) ctx;
ReadResult<InputStream> rr;
if (BKException.Code.OK != rc) {
rr = new ReadResult<InputStream>(eid, rc, null, bookieAddress.getSocketAddress());
} else {
ByteBuf content;
try {
ChannelBufferInputStream is = lh.macManager.verifyDigestAndReturnData(eid, buffer);
rr = new ReadResult<InputStream>(eid, BKException.Code.OK, is, bookieAddress.getSocketAddress());
content = lh.macManager.verifyDigestAndReturnData(eid, buffer);
ByteBuf toRet = Unpooled.copiedBuffer(content);
rr = new ReadResult<InputStream>(eid, BKException.Code.OK, new ByteBufInputStream(toRet), bookieAddress.getSocketAddress());
} catch (BKException.BKDigestMatchException e) {
rr = new ReadResult<InputStream>(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
} finally {
buffer.release();
}
}
readResults.add(rr);
Expand Down Expand Up @@ -184,7 +187,7 @@ public void readLacs(final LedgerHandle lh, long eid,
final Set<ReadResult<Long>> readResults = new HashSet<ReadResult<Long>>();
ReadEntryCallback readEntryCallback = new ReadEntryCallback() {
@Override
public void readEntryComplete(int rc, long lid, long eid, ChannelBuffer buffer, Object ctx) {
public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) {
InetSocketAddress bookieAddress = (InetSocketAddress) ctx;
ReadResult<Long> rr;
if (BKException.Code.OK != rc) {
Expand Down
Loading