Skip to content
Permalink
Browse files
HBASE-26869 RSRpcServices.scan should deep clone cells when RpcCallCo… (
  • Loading branch information
comnetwork committed Mar 23, 2022
1 parent b67c16a commit ab4351a15528511f2a65f6f32f39a23a19a447fe
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 7 deletions.
@@ -24,6 +24,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -849,4 +850,18 @@ public final static int compareColumns(Cell left, byte[] right, int rfoffset, in
if (diff != 0) return diff;
return compareQualifiers(left, right, rqoffset, rqlength);
}

public static void cloneIfNecessary(ArrayList<Cell> cells) {
if (cells == null || cells.isEmpty()) {
return;
}
for (int i = 0; i < cells.size(); i++) {
Cell cell = cells.get(i);
cells.set(i, cloneIfNecessary(cell));
}
}

public static Cell cloneIfNecessary(Cell cell) {
return (cell instanceof ByteBufferExtendedCell ? KeyValueUtil.copyToNewKeyValue(cell) : cell);
}
}
@@ -78,7 +78,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator;
@@ -94,7 +93,6 @@
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MetaCellComparator;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
@@ -7871,8 +7869,8 @@ private List<Cell> getInternal(Get get, boolean withCoprocessor, long nonceGroup
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers.
// See more details in HBASE-26036.
for (Cell cell : tmp) {
results.add(cell instanceof ByteBufferExtendedCell ?
KeyValueUtil.copyToNewKeyValue(cell) : cell);
results.add(
CellUtil.cloneIfNecessary(cell));
}
}

@@ -3307,7 +3307,7 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
// This is cells inside a row. Default size is 10 so if many versions or many cfs,
// then we'll resize. Resizings show in profiler. Set it higher than 10. For now
// arbitrary 32. TODO: keep record of general size of results being returned.
List<Cell> values = new ArrayList<>(32);
ArrayList<Cell> values = new ArrayList<>(32);
region.startRegionOperation(Operation.SCAN);
long before = EnvironmentEdgeManager.currentTime();
// Used to check if we've matched the row limit set on the Scan
@@ -3368,9 +3368,16 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
// reset the batch progress between nextRaw invocations since we don't want the
// batch progress from previous calls to affect future calls
scannerContext.setBatchProgress(0);
assert values.isEmpty();

// Collect values to be returned here
moreRows = scanner.nextRaw(values, scannerContext);
if (context == null) {
// When there is no RpcCallContext,copy EC to heap, then the scanner would close,
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
// buffers.See more details in HBASE-26036.
CellUtil.cloneIfNecessary(values);
}
numOfNextRawCalls++;

if (!values.isEmpty()) {
@@ -3727,14 +3734,25 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
if (context != null) {
context.setCallBack(rsh.shippedCallback);
} else {
// When context != null, adding back the lease will be done in callback set above.
addScannerLeaseBack(lease);
// If context is null,here we call rsh.shippedCallback directly to reuse the logic in
// rsh.shippedCallback to release the internal resources in rsh,and lease is also added
// back to regionserver's LeaseManager in rsh.shippedCallback.
runShippedCallback(rsh);
}
}
quota.close();
}
}

private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
assert rsh.shippedCallback != null;
try {
rsh.shippedCallback.run();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}

private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
RpcCallContext context) throws IOException {
if (region.getCoprocessorHost() != null) {
@@ -0,0 +1,214 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;

@Category({ RegionServerTests.class, LargeTests.class })
public class TestRegionServerScan {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionServerScan.class);

@Rule
public TestName name = new TestName();

private static final byte[] CF = Bytes.toBytes("CF");
private static final byte[] CQ = Bytes.toBytes("CQ");
private static final byte[] VALUE = new byte[1200];

private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final Configuration conf = TEST_UTIL.getConfiguration();
private static Admin admin = null;
static final TableName tableName = TableName.valueOf("TestRegionServerScan");
static final byte[] r0 = Bytes.toBytes("row-0");
static final byte[] r1 = Bytes.toBytes("row-1");
static final byte[] r2 = Bytes.toBytes("row-2");

@BeforeClass
public static void setupBeforeClass() throws Exception {
/**
* Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the bytebuffers right after
* released.
*/
conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS,
DeallocateRewriteByteBuffAllocator.class.getName());
conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20);
conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048);
conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);
conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000);
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000);

conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000);
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 1024);
TEST_UTIL.startMiniCluster(1);
admin = TEST_UTIL.getAdmin();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testScannWhenRpcCallContextNull() throws Exception {
ResultScanner resultScanner = null;
Table table = null;
try {
table =
TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null);
putToTable(table, r0);
putToTable(table, r1);
putToTable(table, r2);

admin.flush(table.getName());

Scan scan = new Scan();
scan.setCaching(2);
scan.withStartRow(r0, true).withStopRow(r2, true);

MyRSRpcServices.inTest = true;
resultScanner = table.getScanner(scan);
Result result = resultScanner.next();
byte[] rowKey = result.getRow();
assertTrue(Bytes.equals(r0, rowKey));

result = resultScanner.next();
rowKey = result.getRow();
assertTrue(Bytes.equals(r1, rowKey));

result = resultScanner.next();
rowKey = result.getRow();
assertTrue(Bytes.equals(r2, rowKey));
assertNull(resultScanner.next());
assertTrue(MyRSRpcServices.exceptionRef.get() == null);
} finally {
MyRSRpcServices.inTest = false;
if (resultScanner != null) {
resultScanner.close();
}
if (table != null) {
table.close();
}
}
}

private static void putToTable(Table table, byte[] rowkey) throws IOException {
Put put = new Put(rowkey);
put.addColumn(CF, CQ, VALUE);
table.put(put);
}

private static class MyRegionServer extends MiniHBaseClusterRegionServer {
public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
super(conf);
}

@Override
protected RSRpcServices createRpcServices() throws IOException {
return new MyRSRpcServices(this);
}
}

private static class MyRSRpcServices extends RSRpcServices {
private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null);
private static volatile boolean inTest = false;

public MyRSRpcServices(HRegionServer rs) throws IOException {
super(rs);
}

@Override
public ScanResponse scan(RpcController controller, ScanRequest request)
throws ServiceException {
try {
if (!inTest) {
return super.scan(controller, request);
}

HRegion region = null;
if (request.hasRegion()) {
region = this.getRegion(request.getRegion());
}

if (region != null
&& !tableName.equals(region.getTableDescriptor().getTableName())) {
return super.scan(controller, request);
}

ScanResponse result = null;
//Simulate RpcCallContext is null for test.
Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
try {
result = super.scan(controller, request);
} finally {
rpcCall.ifPresent(RpcServer::setCurrentCall);
}
return result;
} catch (Throwable e) {
exceptionRef.set(e);
throw new ServiceException(e);
}
}
}

}

0 comments on commit ab4351a

Please sign in to comment.