Skip to content

Commit

Permalink
HBASE-16302 age of last shipped op and age of last applied op should …
Browse files Browse the repository at this point in the history
…be histograms

Signed-off-by: Ashish Singhi <ashishsinghi@apache.org>
  • Loading branch information
ashu210890 authored and ashishsinghi committed Nov 29, 2016
1 parent 346e904 commit 7bcbac9
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 13 deletions.
Expand Up @@ -20,11 +20,12 @@


import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;


public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
private final MetricsReplicationSourceImpl rms; private final MetricsReplicationSourceImpl rms;


private final MutableGaugeLong ageOfLastShippedOpGauge; private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge; private final MutableGaugeLong sizeOfLogQueueGauge;
private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter logEditsFilteredCounter; private final MutableFastCounter logEditsFilteredCounter;
Expand All @@ -47,7 +48,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms; this.rms = rms;


ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, 0L); ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);


sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);


Expand Down Expand Up @@ -80,7 +81,7 @@ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
} }


@Override public void setLastShippedAge(long age) { @Override public void setLastShippedAge(long age) {
ageOfLastShippedOpGauge.set(age); ageOfLastShippedOpHist.add(age);
} }


@Override public void incrSizeOfLogQueue(int size) { @Override public void incrSizeOfLogQueue(int size) {
Expand Down Expand Up @@ -137,7 +138,7 @@ static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCoun


@Override @Override
public long getLastShippedAge() { public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value(); return ageOfLastShippedOpHist.getMax();
} }


@Override public void incrHFilesShipped(long hfiles) { @Override public void incrHFilesShipped(long hfiles) {
Expand Down
Expand Up @@ -20,23 +20,24 @@


import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;


public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkSource { public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkSource {


private final MutableGaugeLong ageGauge; private final MutableHistogram ageHist;
private final MutableFastCounter batchesCounter; private final MutableFastCounter batchesCounter;
private final MutableFastCounter opsCounter; private final MutableFastCounter opsCounter;
private final MutableFastCounter hfilesCounter; private final MutableFastCounter hfilesCounter;


public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) { public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
ageGauge = rms.getMetricsRegistry().getGauge(SINK_AGE_OF_LAST_APPLIED_OP, 0L); ageHist = rms.getMetricsRegistry().getHistogram(SINK_AGE_OF_LAST_APPLIED_OP);
batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L); batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L);
opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L); opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L);
hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L); hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L);
} }


@Override public void setLastAppliedOpAge(long age) { @Override public void setLastAppliedOpAge(long age) {
ageGauge.set(age); ageHist.add(age);
} }


@Override public void incrAppliedBatches(long batches) { @Override public void incrAppliedBatches(long batches) {
Expand All @@ -49,7 +50,7 @@ public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {


@Override @Override
public long getLastAppliedOpAge() { public long getLastAppliedOpAge() {
return ageGauge.value(); return ageHist.getMax();
} }


@Override @Override
Expand Down
Expand Up @@ -19,6 +19,7 @@


import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;


public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSourceSource { public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSourceSource {


Expand All @@ -39,7 +40,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String shippedHFilesKey; private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey; private final String sizeOfHFileRefsQueueKey;


private final MutableGaugeLong ageOfLastShippedOpGauge; private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge; private final MutableGaugeLong sizeOfLogQueueGauge;
private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter logEditsFilteredCounter; private final MutableFastCounter logEditsFilteredCounter;
Expand Down Expand Up @@ -72,7 +73,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
this.keyPrefix = "source." + this.id + "."; this.keyPrefix = "source." + this.id + ".";


ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp"; ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L); ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey);


sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue"; sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L); sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
Expand Down Expand Up @@ -127,7 +128,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
} }


@Override public void setLastShippedAge(long age) { @Override public void setLastShippedAge(long age) {
ageOfLastShippedOpGauge.set(age); ageOfLastShippedOpHist.add(age);
} }


@Override public void incrSizeOfLogQueue(int size) { @Override public void incrSizeOfLogQueue(int size) {
Expand Down Expand Up @@ -193,7 +194,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri


@Override @Override
public long getLastShippedAge() { public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value(); return ageOfLastShippedOpHist.getMax();
} }


@Override @Override
Expand Down
Expand Up @@ -75,6 +75,10 @@ public void add(final long val) {
histogram.add(val, 1); histogram.add(val, 1);
} }


public long getMax() {
return histogram.getMax();
}

@Override @Override
public synchronized void snapshot(MetricsRecordBuilder metricsRecordBuilder, boolean all) { public synchronized void snapshot(MetricsRecordBuilder metricsRecordBuilder, boolean all) {
// Get a reference to the old histogram. // Get a reference to the old histogram.
Expand Down
Expand Up @@ -82,7 +82,7 @@ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSourc
public void setAgeOfLastShippedOp(long timestamp, String walGroup) { public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
long age = EnvironmentEdgeManager.currentTime() - timestamp; long age = EnvironmentEdgeManager.currentTime() - timestamp;
singleSourceSource.setLastShippedAge(age); singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(Math.max(age, globalSourceSource.getLastShippedAge())); globalSourceSource.setLastShippedAge(age);
this.lastTimeStamps.put(walGroup, timestamp); this.lastTimeStamps.put(walGroup, timestamp);
} }


Expand Down

0 comments on commit 7bcbac9

Please sign in to comment.