Skip to content

Commit

Permalink
HBASE-27210 Clean up error-prone findings in hbase-endpoint (#4646)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>

Conflicts:
	hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
	hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
  • Loading branch information
apurtell committed Jul 27, 2022
1 parent ce1455c commit 15db425
Show file tree
Hide file tree
Showing 16 changed files with 129 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,9 @@ public <R, S, P extends Message, Q extends Message, T extends Message> double st
public <R, S, P extends Message, Q extends Message, T extends Message> double
std(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
double res = 0d;
double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
res = avgOfSumSq - (avg) * (avg); // variance
double res = avgOfSumSq - avg * avg; // variance
res = Math.pow(res, 0.5);
return res;
}
Expand Down Expand Up @@ -868,14 +867,6 @@ public <R, S, P extends Message, Q extends Message, T extends Message> R median(
}

byte[] getBytesFromResponse(ByteString response) {
ByteBuffer bb = response.asReadOnlyByteBuffer();
bb.rewind();
byte[] bytes;
if (bb.hasArray()) {
bytes = bb.array();
} else {
bytes = response.toByteArray();
}
return bytes;
return response.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) thr
* @return the instance
* @throws IOException Either we couldn't instantiate the method object, or "parseFrom" failed.
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "TypeParameterUnusedInFormals" })
// Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
public static <T extends Message> T getParsedGenericInstance(Class<?> runtimeClass, int position,
ByteString b) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
Expand All @@ -50,12 +51,12 @@
* aggregate function at a region level. {@link ColumnInterpreter} is used to interpret column
* value. This class is parameterized with the following (these are the types with which the
* {@link ColumnInterpreter} is parameterized, and for more description on these, refer to
* {@link ColumnInterpreter}):
* @param <T> Cell value data type
* @param <S> Promoted data type
* @param <P> PB message that is used to transport initializer specific bytes
* @param <Q> PB message that is used to transport Cell (&lt;T&gt;) instance
* @param <R> PB message that is used to transport Promoted (&lt;S&gt;) instance
* {@link ColumnInterpreter}):<br>
* &lt;T&gt; Cell value data type<br>
* &lt;S&gt; Promoted data type<br>
* &lt;P&gt; PB message that is used to transport initializer specific bytes<br>
* &lt;Q&gt; PB message that is used to transport Cell (&lt;T&gt;) instance<br>
* &lt;R&gt; PB message that is used to transport Promoted (&lt;S&gt;) instance<br>
*/
@InterfaceAudience.Private
public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
Expand Down Expand Up @@ -107,10 +108,7 @@ public void getMax(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
log.info("Maximum from this region is "
Expand Down Expand Up @@ -160,10 +158,7 @@ public void getMin(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
log.info("Minimum from this region is "
Expand Down Expand Up @@ -216,10 +211,7 @@ public void getSum(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
log.debug("Sum from this region is " + env.getRegion().getRegionInfo().getRegionNameAsString()
Expand Down Expand Up @@ -267,10 +259,7 @@ public void getRowNum(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
log.info("Row counter from this region is "
Expand Down Expand Up @@ -331,10 +320,7 @@ public void getAvg(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
done.run(response);
Expand Down Expand Up @@ -397,10 +383,7 @@ public void getStd(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
done.run(response);
Expand Down Expand Up @@ -462,10 +445,7 @@ public void getMedian(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
done.run(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -109,6 +108,7 @@ static Map<byte[], Response> run(final Configuration conf, final String[] args)
return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
}

@SuppressWarnings("ModifiedButNotUsed")
public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan,
Path dir) throws Throwable {
FileSystem fs = dir.getFileSystem(conf);
Expand All @@ -125,7 +125,6 @@ public static Map<byte[], Response> run(final Configuration conf, TableName tabl
table.coprocessorService(ExportProtos.ExportService.class, scan.getStartRow(),
scan.getStopRow(), (ExportProtos.ExportService service) -> {
ServerRpcController controller = new ServerRpcController();
Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
service.export(controller, request, rpcCallback);
Expand Down Expand Up @@ -190,7 +189,7 @@ private static SequenceFile.Writer.Option getOutputPath(final Configuration conf

private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
final RegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
List<SequenceFile.Writer.Option> rval = new LinkedList<>();
List<SequenceFile.Writer.Option> rval = new ArrayList<>(5);
rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
rval.add(SequenceFile.Writer.valueClass(Result.class));
rval.add(getOutputPath(conf, info, request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TestAsyncAggregationClient {

private static byte[] CQ2 = Bytes.toBytes("CQ2");

private static int COUNT = 1000;
private static long COUNT = 1000;

private static AsyncConnection CONN;

Expand Down Expand Up @@ -141,7 +141,7 @@ public void testMedian() throws InterruptedException, ExecutionException {
long halfSum = COUNT * (COUNT - 1) / 4;
long median = 0L;
long sum = 0L;
for (int i = 0; i < COUNT; i++) {
for (long i = 0; i < COUNT; i++) {
sum += i;
if (sum > halfSum) {
median = i - 1;
Expand All @@ -158,7 +158,7 @@ public void testMedianWithWeight() throws InterruptedException, ExecutionExcepti
LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() / 2;
long median = 0L;
long sum = 0L;
for (int i = 0; i < COUNT; i++) {
for (long i = 0; i < COUNT; i++) {
sum += i * i;
if (sum > halfSum) {
median = i - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,71 +155,70 @@ public void testCountController() throws Exception {
// change one of the connection properties so we get a new Connection with our configuration
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);

Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(tableName);
byte[] row = Bytes.toBytes("row");
Put p = new Put(row);
p.addColumn(fam1, fam1, Bytes.toBytes("val0"));
table.put(p);

Integer counter = 1;
counter = verifyCount(counter);

Delete d = new Delete(row);
d.addColumn(fam1, fam1);
table.delete(d);
counter = verifyCount(counter);

Put p2 = new Put(row);
p2.addColumn(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
table.batch(Lists.newArrayList(p, p2), null);
// this only goes to a single server, so we don't need to change the count here
counter = verifyCount(counter);

Append append = new Append(row);
append.addColumn(fam1, fam1, Bytes.toBytes("val2"));
table.append(append);
counter = verifyCount(counter);

// and check the major lookup calls as well
Get g = new Get(row);
table.get(g);
counter = verifyCount(counter);

ResultScanner scan = table.getScanner(fam1);
scan.next();
scan.close();
counter = verifyCount(counter + 1);

Get g2 = new Get(row);
table.get(Lists.newArrayList(g, g2));
// same server, so same as above for not changing count
counter = verifyCount(counter);

// make sure all the scanner types are covered
Scan scanInfo = new Scan(row);
// regular small
scanInfo.setSmall(true);
counter = doScan(table, scanInfo, counter);

// reversed, small
scanInfo.setReversed(true);
counter = doScan(table, scanInfo, counter);

// reversed, regular
scanInfo.setSmall(false);
counter = doScan(table, scanInfo, counter + 1);

// make sure we have no priority count
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
// lets set a custom priority on a get
Get get = new Get(row);
get.setPriority(HConstants.ADMIN_QOS);
table.get(get);
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);

table.close();
connection.close();
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Table table = connection.getTable(tableName)) {
byte[] row = Bytes.toBytes("row");
Put p = new Put(row);
p.addColumn(fam1, fam1, Bytes.toBytes("val0"));
table.put(p);

Integer counter = 1;
counter = verifyCount(counter);

Delete d = new Delete(row);
d.addColumn(fam1, fam1);
table.delete(d);
counter = verifyCount(counter);

Put p2 = new Put(row);
p2.addColumn(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
table.batch(Lists.newArrayList(p, p2), null);
// this only goes to a single server, so we don't need to change the count here
counter = verifyCount(counter);

Append append = new Append(row);
append.addColumn(fam1, fam1, Bytes.toBytes("val2"));
table.append(append);
counter = verifyCount(counter);

// and check the major lookup calls as well
Get g = new Get(row);
table.get(g);
counter = verifyCount(counter);

ResultScanner scan = table.getScanner(fam1);
scan.next();
scan.close();
counter = verifyCount(counter + 1);

Get g2 = new Get(row);
table.get(Lists.newArrayList(g, g2));
// same server, so same as above for not changing count
counter = verifyCount(counter);

// make sure all the scanner types are covered
Scan scanInfo = new Scan(row);
// regular small
scanInfo.setSmall(true);
counter = doScan(table, scanInfo, counter);

// reversed, small
scanInfo.setReversed(true);
counter = doScan(table, scanInfo, counter);

// reversed, regular
scanInfo.setSmall(false);
doScan(table, scanInfo, counter + 1);

// make sure we have no priority count
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
// lets set a custom priority on a get
Get get = new Get(row);
get.setPriority(HConstants.ADMIN_QOS);
table.get(get);
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
}
}
}

int doScan(Table table, Scan scan, int expectedCount) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public void itCreatesConnectionless() throws Throwable {
try {
client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
fail("Expected IOException");
} catch (Throwable e) {
assertTrue(e instanceof IOException);
} catch (IOException e) {
assertTrue(e.getMessage().contains("Connection not initialized"));
}

Expand Down
Loading

0 comments on commit 15db425

Please sign in to comment.