Skip to content

Commit

Permalink
HBASE-8868 add metric to report client shortcircuit reads.
Browse files Browse the repository at this point in the history
branch-1 backport of apache#1334

Signed-off-by: stack <stack@apache.net>
Signed-off-by: Sean Busbey <busbey@apache.org>

Co-authored-by: Andrew Purtell <apurtell@apache.org>
  • Loading branch information
2 people authored and Reidddddd committed Apr 22, 2020
1 parent a0a87e4 commit d7cfb0e
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,17 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String MAJOR_COMPACTED_CELLS_SIZE_DESC =
"The total amount of data processed during major compactions, in bytes";

String TOTAL_BYTES_READ = "totalBytesRead";
String TOTAL_BYTES_READ_DESC = "The total number of bytes read from HDFS";
String LOCAL_BYTES_READ = "localBytesRead";
String LOCAL_BYTES_READ_DESC =
"The number of bytes read from the local HDFS DataNode";
String SHORTCIRCUIT_BYTES_READ = "shortCircuitBytesRead";
String SHORTCIRCUIT_BYTES_READ_DESC = "The number of bytes read through HDFS short circuit read";
String ZEROCOPY_BYTES_READ = "zeroCopyBytesRead";
String ZEROCOPY_BYTES_READ_DESC =
"The number of bytes read through HDFS zero copy";

String BLOCKED_REQUESTS_COUNT = "blockedRequestCount";
String BLOCKED_REQUESTS_COUNT_DESC = "The number of blocked requests because of memstore size is "
+ "larger than blockingMemStoreSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,26 @@ public interface MetricsRegionServerWrapper {
*/
long getMajorCompactedCellsSize();

/**
* @return Number of total bytes read from HDFS.
*/
long getTotalBytesRead();

/**
* @return Number of bytes read from the local HDFS DataNode.
*/
long getLocalBytesRead();

/**
* @return Number of bytes read locally through HDFS short circuit.
*/
long getShortCircuitBytesRead();

/**
* @return Number of bytes read locally through HDFS zero copy.
*/
long getZeroCopyBytesRead();

/**
* @return Count of requests blocked because the memstore size is larger than blockingMemStoreSize
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,18 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
.addGauge(Interns.info(PERCENT_FILES_LOCAL_SECONDARY_REGIONS,
PERCENT_FILES_LOCAL_SECONDARY_REGIONS_DESC),
rsWrap.getPercentFileLocalSecondaryRegions())
.addGauge(Interns.info(TOTAL_BYTES_READ,
TOTAL_BYTES_READ_DESC),
rsWrap.getTotalBytesRead())
.addGauge(Interns.info(LOCAL_BYTES_READ,
LOCAL_BYTES_READ_DESC),
rsWrap.getLocalBytesRead())
.addGauge(Interns.info(SHORTCIRCUIT_BYTES_READ,
SHORTCIRCUIT_BYTES_READ_DESC),
rsWrap.getShortCircuitBytesRead())
.addGauge(Interns.info(ZEROCOPY_BYTES_READ,
ZEROCOPY_BYTES_READ_DESC),
rsWrap.getZeroCopyBytesRead())
.addGauge(Interns.info(SPLIT_QUEUE_LENGTH, SPLIT_QUEUE_LENGTH_DESC),
rsWrap.getSplitQueueSize())
.addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;

import com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -88,6 +90,15 @@ public class FSDataInputStreamWrapper {
// errors against Hadoop pre 2.6.4 and 2.7.1 versions.
private Method unbuffer = null;

private final static ReadStatistics readStatistics = new ReadStatistics();

private static class ReadStatistics {
long totalBytesRead;
long totalLocalBytesRead;
long totalShortCircuitBytesRead;
long totalZeroCopyBytesRead;
}

public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, null, path, false);
}
Expand Down Expand Up @@ -214,13 +225,61 @@ public void checksumOk() {
}
}

private void updateInputStreamStatistics(FSDataInputStream stream) {
// If the underlying file system is HDFS, update read statistics upon close.
if (stream instanceof HdfsDataInputStream) {
/**
* Because HDFS ReadStatistics is calculated per input stream, it is not
* feasible to update the aggregated number in real time. Instead, the
* metrics are updated when an input stream is closed.
*/
HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream;
synchronized (readStatistics) {
readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalBytesRead();
readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalBytesRead();
readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalShortCircuitBytesRead();
readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalZeroCopyBytesRead();
}
}
}

public static long getTotalBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalBytesRead;
}
}

public static long getLocalBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalLocalBytesRead;
}
}

public static long getShortCircuitBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalShortCircuitBytesRead;
}
}

public static long getZeroCopyBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalZeroCopyBytesRead;
}
}

/** Close stream(s) if necessary. */
public void close() throws IOException {
public void close() {
if (!doCloseStreams) {
return;
}
updateInputStreamStatistics(this.streamNoFsChecksum);
// we do not care about the close exception as it is for reading, no data loss issue.
IOUtils.closeQuietly(streamNoFsChecksum);
updateInputStreamStatistics(stream);
IOUtils.closeQuietly(stream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
Expand Down Expand Up @@ -769,6 +770,26 @@ synchronized public void run() {
}
}

@Override
public long getTotalBytesRead() {
return FSDataInputStreamWrapper.getTotalBytesRead();
}

@Override
public long getLocalBytesRead() {
return FSDataInputStreamWrapper.getLocalBytesRead();
}

@Override
public long getShortCircuitBytesRead() {
return FSDataInputStreamWrapper.getShortCircuitBytesRead();
}

@Override
public long getZeroCopyBytesRead() {
return FSDataInputStreamWrapper.getZeroCopyBytesRead();
}

@Override
public long getBlockedRequestsCount() {
return blockedRequestsCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,26 @@ public long getMajorCompactedCellsSize() {
return 10240000;
}

@Override
public long getTotalBytesRead() {
return 0;
}

@Override
public long getLocalBytesRead() {
return 0;
}

@Override
public long getShortCircuitBytesRead() {
return 0;
}

@Override
public long getZeroCopyBytesRead() {
return 0;
}

@Override
public long getBlockedRequestsCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
Expand All @@ -32,13 +41,6 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import static org.junit.Assert.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


@Category(MediumTests.class)
public class TestRegionServerMetrics {
private static MetricsAssertHelper metricsHelper;
Expand Down Expand Up @@ -682,4 +684,34 @@ public void testAverageRegionSize() throws Exception {

t.close();
}

@Test
public void testReadBytes() throws Exception {
TableName tableName = TableName.valueOf("testReadBytes");
byte[] cf = Bytes.toBytes("d");
byte[] row = Bytes.toBytes("rk");
byte[] qualifier = Bytes.toBytes("qual");
byte[] val = Bytes.toBytes("Value");

Table t = TEST_UTIL.createTable(tableName, cf);
// Do a first put to be sure that the connection is established, meta is there and so on.
Put p = new Put(row);
p.addColumn(cf, qualifier, val);
t.put(p);
// Do a few gets
for (int i = 0; i < 10; i++) {
t.get(new Get(row));
}
TEST_UTIL.getHBaseAdmin().flush(tableName);
metricsRegionServer.getRegionServerWrapper().forceRecompute();

assertTrue("Total read bytes should be larger than 0",
metricsRegionServer.getRegionServerWrapper().getTotalBytesRead() > 0);
assertTrue("Total local read bytes should be larger than 0",
metricsRegionServer.getRegionServerWrapper().getLocalBytesRead() > 0);
assertEquals("Total short circuit read bytes should be equal to 0", 0,
metricsRegionServer.getRegionServerWrapper().getShortCircuitBytesRead());
assertEquals("Total zero-byte read bytes should be equal to 0", 0,
metricsRegionServer.getRegionServerWrapper().getZeroCopyBytesRead());
}
}

0 comments on commit d7cfb0e

Please sign in to comment.