diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 3aa01ab9526a..271f0acf2370 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -24,7 +24,10 @@ public interface MetricsReplicationSourceSource { public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; + @Deprecated + /** @deprecated Use SOURCE_SHIPPED_BYTES instead */ public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs"; + public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes"; public static final String SOURCE_SHIPPED_OPS = "source.shippedOps"; public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes"; @@ -41,7 +44,7 @@ public interface MetricsReplicationSourceSource { void incrLogEditsFiltered(long size); void incrBatchesShipped(int batches); void incrOpsShipped(long ops); - void incrShippedKBs(long size); + void incrShippedBytes(long size); void incrLogReadInBytes(long size); void incrLogReadInEdits(long size); void clear(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 2526f32c7bb8..476d2f76a5e3 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -30,6 +30,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS private final MutableFastCounter logEditsFilteredCounter; private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedOpsCounter; + private final MutableFastCounter shippedBytesCounter; + @Deprecated private final MutableFastCounter shippedKBsCounter; private final MutableFastCounter logReadInBytesCounter; private final MutableFastCounter shippedHFilesCounter; @@ -48,6 +50,8 @@ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L); + shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); + logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); @@ -88,8 +92,25 @@ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { shippedOpsCounter.incr(ops); } - @Override public void incrShippedKBs(long size) { - shippedKBsCounter.incr(size); + @Override public void incrShippedBytes(long size) { + shippedBytesCounter.incr(size); + // obtained value maybe smaller than 1024. We should make sure that KB count + // eventually picks up even from multiple smaller updates. + incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); + } + + static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) { + // Following code should be thread-safe. + long delta = 0; + while(true) { + long bytes = bytesCounter.value(); + delta = (bytes / 1024) - kbsCounter.value(); + if (delta > 0) { + kbsCounter.incr(delta); + } else { + break; + } + } } @Override public void incrLogReadInBytes(long size) { diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 03e311671794..835e81c207d5 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -30,7 +30,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String logEditsFilteredKey; private final String shippedBatchesKey; private final String shippedOpsKey; + @Deprecated private final String shippedKBsKey; + private final String shippedBytesKey; private final String logReadInBytesKey; private final String shippedHFilesKey; private final String sizeOfHFileRefsQueueKey; @@ -42,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedKBsCounter; + private final MutableFastCounter shippedBytesCounter; private final MutableFastCounter logReadInBytesCounter; private final MutableFastCounter shippedHFilesCounter; private final MutableGaugeLong sizeOfHFileRefsQueueGauge; @@ -65,6 +68,9 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri shippedKBsKey = "source." + this.id + ".shippedKBs"; shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L); + shippedBytesKey = "source." + this.id + ".shippedBytes"; + shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L); + logReadInBytesKey = "source." + this.id + ".logReadInBytes"; logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L); @@ -109,8 +115,10 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri shippedOpsCounter.incr(ops); } - @Override public void incrShippedKBs(long size) { - shippedKBsCounter.incr(size); + @Override public void incrShippedBytes(long size) { + shippedBytesCounter.incr(size); + MetricsReplicationGlobalSourceSource + .incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); } @Override public void incrLogReadInBytes(long size) { @@ -125,6 +133,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri rms.removeMetric(shippedBatchesKey); rms.removeMetric(shippedOpsKey); rms.removeMetric(shippedKBsKey); + rms.removeMetric(shippedBytesKey); rms.removeMetric(logReadInBytesKey); rms.removeMetric(logReadInEditsKey); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 4a044bfb9707..b07f1d1b4ad2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -136,15 +136,15 @@ public void incrLogEditsFiltered() { * * @param batchSize the size of the batch that was shipped to sinks. */ - public void shipBatch(long batchSize, int sizeInKB) { + public void shipBatch(long batchSize, int sizeInBytes) { singleSourceSource.incrBatchesShipped(1); globalSourceSource.incrBatchesShipped(1); singleSourceSource.incrOpsShipped(batchSize); globalSourceSource.incrOpsShipped(batchSize); - singleSourceSource.incrShippedKBs(sizeInKB); - globalSourceSource.incrShippedKBs(sizeInKB); + singleSourceSource.incrShippedBytes(sizeInBytes); + globalSourceSource.incrShippedBytes(sizeInBytes); } /** @@ -153,8 +153,8 @@ public void shipBatch(long batchSize, int sizeInKB) { * @param batchSize the size of the batch that was shipped to sinks. * @param hfiles total number of hfiles shipped to sinks. */ - public void shipBatch(long batchSize, int sizeInKB, long hfiles) { - shipBatch(batchSize, sizeInKB); + public void shipBatch(long batchSize, int sizeInBytes, long hfiles) { + shipBatch(batchSize, sizeInBytes); singleSourceSource.incrHFilesShipped(hfiles); globalSourceSource.incrHFilesShipped(hfiles); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 0f956c874be1..a25c80b70e45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -1060,7 +1060,7 @@ protected void shipEdits(boolean currentWALisBeingWrittenTo, List ent totalReplicatedEdits.addAndGet(entries.size()); totalReplicatedOperations.addAndGet(currentNbOperations); // FIXME check relationship between wal group and overall - metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles); + metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles); metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); if (LOG.isTraceEnabled()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/protobuf/generated/DummyRegionServerEndpointProtos.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/protobuf/generated/DummyRegionServerEndpointProtos.java index 2ad3c59bdd22..a011b30eebc1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/protobuf/generated/DummyRegionServerEndpointProtos.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/protobuf/generated/DummyRegionServerEndpointProtos.java @@ -1,5 +1,5 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! -// source: hbase-server/src/test/protobuf/DummyRegionServerEndpoint.proto +// source: DummyRegionServerEndpoint.proto package org.apache.hadoop.hbase.coprocessor.protobuf.generated; @@ -1185,16 +1185,15 @@ public org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerE descriptor; static { java.lang.String[] descriptorData = { - "\n>hbase-server/src/test/protobuf/DummyRe" + - "gionServerEndpoint.proto\022\rhbase.test.pb\"" + - "\016\n\014DummyRequest\"\036\n\rDummyResponse\022\r\n\005valu" + - "e\030\001 \002(\t2\237\001\n\014DummyService\022F\n\tdummyCall\022\033." + - "hbase.test.pb.DummyRequest\032\034.hbase.test." + - "pb.DummyResponse\022G\n\ndummyThrow\022\033.hbase.t" + - "est.pb.DummyRequest\032\034.hbase.test.pb.Dumm" + - "yResponseB_\n6org.apache.hadoop.hbase.cop" + - "rocessor.protobuf.generatedB\037DummyRegion" + - "ServerEndpointProtos\210\001\001\240\001\001" + "\n\037DummyRegionServerEndpoint.proto\022\rhbase" + + ".test.pb\"\016\n\014DummyRequest\"\036\n\rDummyRespons" + + "e\022\r\n\005value\030\001 \002(\t2\237\001\n\014DummyService\022F\n\tdum" + + "myCall\022\033.hbase.test.pb.DummyRequest\032\034.hb" + + "ase.test.pb.DummyResponse\022G\n\ndummyThrow\022" + + "\033.hbase.test.pb.DummyRequest\032\034.hbase.tes" + + "t.pb.DummyResponseB_\n6org.apache.hadoop." + + "hbase.coprocessor.protobuf.generatedB\037Du" + + "mmyRegionServerEndpointProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {