Skip to content

Commit

Permalink
HBASE-26629 Add expiration for long time vacant scanners in Thrift2 (#…
Browse files Browse the repository at this point in the history
…3984)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
YutSean authored and Apache9 committed Jan 2, 2022
1 parent cd87e10 commit a478d02
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 15 deletions.
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.thrift2;

import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
Expand Down Expand Up @@ -50,9 +52,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -93,6 +94,9 @@
import org.apache.hadoop.hbase.thrift2.generated.TTableName;
import org.apache.hadoop.hbase.thrift2.generated.TThriftServerType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.RemovalListener;
import org.apache.thrift.TException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand All @@ -110,9 +114,8 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);

// nextScannerId and scannerMap are used to manage scanner state
// TODO: Cleanup thread for Scanners, Scanner id wrap
private final AtomicInteger nextScannerId = new AtomicInteger(0);
private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>();
private final Cache<Integer, ResultScanner> scannerMap;

private static final IOException ioe
= new DoNotRetryIOException("Thrift Server is in Read-only mode.");
Expand Down Expand Up @@ -156,7 +159,14 @@ public int hashCode() {
public ThriftHBaseServiceHandler(final Configuration conf,
final UserProvider userProvider) throws IOException {
super(conf, userProvider);
long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT);
scannerMap = CacheBuilder.newBuilder()
.expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
.removalListener((RemovalListener<Integer, ResultScanner>) removalNotification ->
removalNotification.getValue().close())
.build();
}

@Override
Expand Down Expand Up @@ -208,16 +218,15 @@ private int addScanner(ResultScanner scanner) {
* @return a Scanner, or null if the Id is invalid
*/
private ResultScanner getScanner(int id) {
return scannerMap.get(id);
return scannerMap.getIfPresent(id);
}

/**
* Removes the scanner associated with the specified ID from the internal HashMap.
* @param id of the Scanner to remove
* @return the removed Scanner, or <code>null</code> if the Id is invalid
*/
protected ResultScanner removeScanner(int id) {
return scannerMap.remove(id);
protected void removeScanner(int id) {
scannerMap.invalidate(id);
}

@Override
Expand Down Expand Up @@ -461,18 +470,15 @@ public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows
return results;
}



@Override
public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
LOG.debug("scannerClose: id=" + scannerId);
ResultScanner scanner = getScanner(scannerId);
if (scanner == null) {
String message = "scanner ID is invalid";
LOG.warn(message);
TIllegalArgument ex = new TIllegalArgument();
ex.setMessage("Invalid scanner Id");
throw ex;
LOG.warn("scanner ID: " + scannerId + "is invalid");
// While the scanner could be already expired,
// we should not throw exception here. Just log and return.
return;
}
scanner.close();
removeScanner(scannerId);
Expand Down
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.thrift2;

import static java.nio.ByteBuffer.wrap;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.CLEANUP_INTERVAL;
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.MAX_IDLETIME;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
Expand Down Expand Up @@ -1021,6 +1023,31 @@ public void testSmallScan() throws Exception {
}
}

@Test
public void testExpiredScanner() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1000);
ThriftHBaseServiceHandler handler =
new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));

TScan scan = new TScan();
ByteBuffer table = wrap(tableAname);

int scannerId = handler.openScanner(table, scan);
handler.getScannerRows(scannerId, 1);
Thread.sleep(1000);

try {
handler.getScannerRows(scannerId, 1);
fail("The scanner should be expired and have an TIllegalArgument exception here.");
} catch (TIllegalArgument e) {
assertEquals("Invalid scanner Id", e.getMessage());
} finally {
conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
}
}

@Test
public void testPutTTL() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
Expand Down

0 comments on commit a478d02

Please sign in to comment.