Skip to content

Commit

Permalink
HBASE-15740 Replication source.shippedKBs metric is undercounting bec…
Browse files Browse the repository at this point in the history
…ause it is in KB
  • Loading branch information
enis committed May 9, 2016
1 parent 541d1da commit b75b226
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 22 deletions.
Expand Up @@ -24,7 +24,10 @@ public interface MetricsReplicationSourceSource {
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";


@Deprecated
/** @deprecated Use SOURCE_SHIPPED_BYTES instead */
public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs"; public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs";
public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes";
public static final String SOURCE_SHIPPED_OPS = "source.shippedOps"; public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";


public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes"; public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes";
Expand All @@ -41,7 +44,7 @@ public interface MetricsReplicationSourceSource {
void incrLogEditsFiltered(long size); void incrLogEditsFiltered(long size);
void incrBatchesShipped(int batches); void incrBatchesShipped(int batches);
void incrOpsShipped(long ops); void incrOpsShipped(long ops);
void incrShippedKBs(long size); void incrShippedBytes(long size);
void incrLogReadInBytes(long size); void incrLogReadInBytes(long size);
void incrLogReadInEdits(long size); void incrLogReadInEdits(long size);
void clear(); void clear();
Expand Down
Expand Up @@ -30,6 +30,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter logEditsFilteredCounter; private final MutableFastCounter logEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter;
@Deprecated
private final MutableFastCounter shippedKBsCounter; private final MutableFastCounter shippedKBsCounter;
private final MutableFastCounter logReadInBytesCounter; private final MutableFastCounter logReadInBytesCounter;
private final MutableFastCounter shippedHFilesCounter; private final MutableFastCounter shippedHFilesCounter;
Expand All @@ -48,6 +50,8 @@ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {


shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L); shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);


shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);

logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);


logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
Expand Down Expand Up @@ -88,8 +92,25 @@ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
shippedOpsCounter.incr(ops); shippedOpsCounter.incr(ops);
} }


@Override public void incrShippedKBs(long size) { @Override public void incrShippedBytes(long size) {
shippedKBsCounter.incr(size); shippedBytesCounter.incr(size);
// obtained value maybe smaller than 1024. We should make sure that KB count
// eventually picks up even from multiple smaller updates.
incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
}

static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) {
// Following code should be thread-safe.
long delta = 0;
while(true) {
long bytes = bytesCounter.value();
delta = (bytes / 1024) - kbsCounter.value();
if (delta > 0) {
kbsCounter.incr(delta);
} else {
break;
}
}
} }


@Override public void incrLogReadInBytes(long size) { @Override public void incrLogReadInBytes(long size) {
Expand Down
Expand Up @@ -30,7 +30,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey; private final String logEditsFilteredKey;
private final String shippedBatchesKey; private final String shippedBatchesKey;
private final String shippedOpsKey; private final String shippedOpsKey;
@Deprecated
private final String shippedKBsKey; private final String shippedKBsKey;
private final String shippedBytesKey;
private final String logReadInBytesKey; private final String logReadInBytesKey;
private final String shippedHFilesKey; private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey; private final String sizeOfHFileRefsQueueKey;
Expand All @@ -42,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedKBsCounter; private final MutableFastCounter shippedKBsCounter;
private final MutableFastCounter shippedBytesCounter;
private final MutableFastCounter logReadInBytesCounter; private final MutableFastCounter logReadInBytesCounter;
private final MutableFastCounter shippedHFilesCounter; private final MutableFastCounter shippedHFilesCounter;
private final MutableGaugeLong sizeOfHFileRefsQueueGauge; private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
Expand All @@ -65,6 +68,9 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
shippedKBsKey = "source." + this.id + ".shippedKBs"; shippedKBsKey = "source." + this.id + ".shippedKBs";
shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L); shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);


shippedBytesKey = "source." + this.id + ".shippedBytes";
shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);

logReadInBytesKey = "source." + this.id + ".logReadInBytes"; logReadInBytesKey = "source." + this.id + ".logReadInBytes";
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L); logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);


Expand Down Expand Up @@ -109,8 +115,10 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
shippedOpsCounter.incr(ops); shippedOpsCounter.incr(ops);
} }


@Override public void incrShippedKBs(long size) { @Override public void incrShippedBytes(long size) {
shippedKBsCounter.incr(size); shippedBytesCounter.incr(size);
MetricsReplicationGlobalSourceSource
.incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
} }


@Override public void incrLogReadInBytes(long size) { @Override public void incrLogReadInBytes(long size) {
Expand All @@ -125,6 +133,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
rms.removeMetric(shippedBatchesKey); rms.removeMetric(shippedBatchesKey);
rms.removeMetric(shippedOpsKey); rms.removeMetric(shippedOpsKey);
rms.removeMetric(shippedKBsKey); rms.removeMetric(shippedKBsKey);
rms.removeMetric(shippedBytesKey);


rms.removeMetric(logReadInBytesKey); rms.removeMetric(logReadInBytesKey);
rms.removeMetric(logReadInEditsKey); rms.removeMetric(logReadInEditsKey);
Expand Down
Expand Up @@ -136,15 +136,15 @@ public void incrLogEditsFiltered() {
* *
* @param batchSize the size of the batch that was shipped to sinks. * @param batchSize the size of the batch that was shipped to sinks.
*/ */
public void shipBatch(long batchSize, int sizeInKB) { public void shipBatch(long batchSize, int sizeInBytes) {
singleSourceSource.incrBatchesShipped(1); singleSourceSource.incrBatchesShipped(1);
globalSourceSource.incrBatchesShipped(1); globalSourceSource.incrBatchesShipped(1);


singleSourceSource.incrOpsShipped(batchSize); singleSourceSource.incrOpsShipped(batchSize);
globalSourceSource.incrOpsShipped(batchSize); globalSourceSource.incrOpsShipped(batchSize);


singleSourceSource.incrShippedKBs(sizeInKB); singleSourceSource.incrShippedBytes(sizeInBytes);
globalSourceSource.incrShippedKBs(sizeInKB); globalSourceSource.incrShippedBytes(sizeInBytes);
} }


/** /**
Expand All @@ -153,8 +153,8 @@ public void shipBatch(long batchSize, int sizeInKB) {
* @param batchSize the size of the batch that was shipped to sinks. * @param batchSize the size of the batch that was shipped to sinks.
* @param hfiles total number of hfiles shipped to sinks. * @param hfiles total number of hfiles shipped to sinks.
*/ */
public void shipBatch(long batchSize, int sizeInKB, long hfiles) { public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
shipBatch(batchSize, sizeInKB); shipBatch(batchSize, sizeInBytes);
singleSourceSource.incrHFilesShipped(hfiles); singleSourceSource.incrHFilesShipped(hfiles);
globalSourceSource.incrHFilesShipped(hfiles); globalSourceSource.incrHFilesShipped(hfiles);
} }
Expand Down
Expand Up @@ -1060,7 +1060,7 @@ protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> ent
totalReplicatedEdits.addAndGet(entries.size()); totalReplicatedEdits.addAndGet(entries.size());
totalReplicatedOperations.addAndGet(currentNbOperations); totalReplicatedOperations.addAndGet(currentNbOperations);
// FIXME check relationship between wal group and overall // FIXME check relationship between wal group and overall
metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles); metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles);
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId); walGroupId);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b75b226

Please sign in to comment.