Skip to content

Commit

Permalink
Revert "HBASE-5980 Scanner responses from RS should include metrics o…
Browse files Browse the repository at this point in the history
…n rows/KVs filtered"

Committed by mistake. Backing out till sure it good.

This reverts commit dc72dad.
  • Loading branch information
saintstack committed May 20, 2015
1 parent 7f2b33d commit e2e9ea5
Show file tree
Hide file tree
Showing 17 changed files with 193 additions and 1,158 deletions.
Expand Up @@ -21,8 +21,6 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -204,9 +202,7 @@ protected void checkIfRegionServerIsRemote() {
setHeartbeatMessage(false);
try {
incRPCcallsMetrics();
request =
RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
this.scanMetrics != null);
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
ScanResponse response = null;
controller = controllerFactory.newController();
controller.setPriority(getTableName());
Expand Down Expand Up @@ -236,7 +232,6 @@ protected void checkIfRegionServerIsRemote() {
+ rows + " rows from scanner=" + scannerId);
}
}
updateServerSideMetrics(response);
// moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults() && !response.getMoreResults()) {
scannerId = -1L;
Expand Down Expand Up @@ -346,29 +341,14 @@ protected void updateResultsMetrics(Result[] rrs) {
}
}

/**
* Use the scan metrics returned by the server to add to the identically named counters in the
* client side metrics. If a counter does not exist with the same name as the server side metric,
* the attempt to increase the counter will fail.
* @param response
*/
private void updateServerSideMetrics(ScanResponse response) {
if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;

Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
for (Entry<String, Long> entry : serverMetrics.entrySet()) {
this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
}
}

private void close() {
if (this.scannerId == -1L) {
return;
}
try {
incRPCcallsMetrics();
ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
RequestConverter.buildScanRequest(this.scannerId, 0, true);
try {
getStub().scan(null, request);
} catch (ServiceException se) {
Expand Down
Expand Up @@ -18,32 +18,41 @@

package org.apache.hadoop.hbase.client.metrics;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;

import com.google.common.collect.ImmutableMap;


/**
* Provides metrics related to scan operations (both server side and client side metrics).
* <p>
* Provides client-side metrics related to scan operations.
* The data can be passed to mapreduce framework or other systems.
* We use atomic longs so that one thread can increment,
* while another atomically resets to zero after the values are reported
* to hadoop's counters.
* <p>
*
* Some of these metrics are general for any client operation such as put
* However, there is no need for this. So they are defined under scan operation
* for now.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ScanMetrics extends ServerSideScanMetrics {
public class ScanMetrics {

/**
* Hash to hold the String -> Atomic Long mappings.
*/
private final Map<String, AtomicLong> counters = new HashMap<String, AtomicLong>();

// AtomicLongs to hold the metrics values. These are all updated through ClientScanner and
// ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the
// AtomicLongs to hold the metrics values. These are all updated through ClientScanner and
// ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the
// values after progress is passed to hadoop's counters.


/**
* number of RPC calls
*/
Expand Down Expand Up @@ -94,4 +103,36 @@ public class ScanMetrics extends ServerSideScanMetrics {
*/
public ScanMetrics() {
}

private AtomicLong createCounter(String counterName) {
AtomicLong c = new AtomicLong(0);
counters.put(counterName, c);
return c;
}

public void setCounter(String counterName, long value) {
AtomicLong c = this.counters.get(counterName);
if (c != null) {
c.set(value);
}
}

/**
* Get all of the values since the last time this function was called.
*
* Calling this function will reset all AtomicLongs in the instance back to 0.
*
* @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap() {
//Create a builder
ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
//For every entry add the value and reset the AtomicLong back to zero
for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
builder.put(e.getKey(), e.getValue().getAndSet(0));
}
//Build the immutable map so that people can't mess around with it.
return builder.build();
}

}

This file was deleted.

Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
Expand All @@ -60,7 +61,6 @@
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
Expand Down Expand Up @@ -478,8 +478,9 @@ private static RegionAction.Builder getRegionActionBuilderWithRegion(
* @return a scan request
* @throws IOException
*/
public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan,
final int numberOfRows, final boolean closeScanner) throws IOException {
public static ScanRequest buildScanRequest(final byte[] regionName,
final Scan scan, final int numberOfRows,
final boolean closeScanner) throws IOException {
ScanRequest.Builder builder = ScanRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
Expand All @@ -489,7 +490,6 @@ public static ScanRequest buildScanRequest(final byte[] regionName, final Scan s
builder.setScan(ProtobufUtil.toScan(scan));
builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(scan != null && scan.isScanMetricsEnabled());
return builder.build();
}

Expand All @@ -501,15 +501,14 @@ public static ScanRequest buildScanRequest(final byte[] regionName, final Scan s
* @param closeScanner
* @return a scan request
*/
public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
final boolean closeScanner, final boolean trackMetrics) {
public static ScanRequest buildScanRequest(final long scannerId,
final int numberOfRows, final boolean closeScanner) {
ScanRequest.Builder builder = ScanRequest.newBuilder();
builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner);
builder.setScannerId(scannerId);
builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(trackMetrics);
return builder.build();
}

Expand All @@ -523,15 +522,14 @@ public static ScanRequest buildScanRequest(final long scannerId, final int numbe
* @return a scan request
*/
public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics) {
final boolean closeScanner, final long nextCallSeq) {
ScanRequest.Builder builder = ScanRequest.newBuilder();
builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner);
builder.setScannerId(scannerId);
builder.setNextCallSeq(nextCallSeq);
builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(trackMetrics);
return builder.build();
}

Expand Down
Expand Up @@ -20,9 +20,7 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -50,8 +48,6 @@
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
Expand Down Expand Up @@ -399,26 +395,4 @@ public static Result[] getResults(CellScanner cellScanner, ScanResponse response
}
return results;
}

public static Map<String, Long> getScanMetrics(ScanResponse response) {
Map<String, Long> metricMap = new HashMap<String, Long>();
if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
return metricMap;
}

ScanMetrics metrics = response.getScanMetrics();
int numberOfMetrics = metrics.getMetricsCount();
for (int i = 0; i < numberOfMetrics; i++) {
NameInt64Pair metricPair = metrics.getMetrics(i);
if (metricPair != null) {
String name = metricPair.getName();
Long value = metricPair.getValue();
if (name != null && value != null) {
metricMap.put(name, value);
}
}
}

return metricMap;
}
}

0 comments on commit e2e9ea5

Please sign in to comment.