Skip to content

Commit

Permalink
HBASE-16221 Have ThriftScanner update the ConnectionCache's last used…
Browse files Browse the repository at this point in the history
… time overtime getScannerRow() to keep the connection alive for long lived scanners

Signed-off-by: Elliott Clark <eclark@apache.org>
  • Loading branch information
Joseph Hwang authored and elliottneilclark committed Jul 23, 2016
1 parent cc766df commit 03fe257
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ ConnectionInfo getCurrentConnection() throws IOException {
return connInfo;
}

/**
* Updates the access time for the current connection. Used to keep Connections alive for
* long-lived scanners.
* @return whether we successfully updated the last access time
*/
public boolean updateConnectionAccessTime() {
String userName = getEffectiveUser();
ConnectionInfo connInfo = connections.get(userName);
if (connInfo != null) {
return connInfo.updateAccessTime();
}
return false;
}

class ConnectionInfo {
final Connection connection;
final String userName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
ex.setMessage("Invalid scanner Id");
throw ex;
}

try {
connectionCache.updateConnectionAccessTime();
return resultsFromHBase(scanner.next(numRows));
} catch (IOException e) {
throw getTIOError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,57 @@ public void testScan() throws Exception {
}
}

/**
* Tests keeping a HBase scanner alive for long periods of time. Each call to getScannerRow()
* should reset the ConnectionCache timeout for the scanner's connection
* @throws Exception
*/
@Test
public void testLongLivedScan() throws Exception {
int numTrials = 6;
int trialPause = 1000;
int cleanUpInterval = 100;
Configuration conf = new Configuration(UTIL.getConfiguration());
// Set the ConnectionCache timeout to trigger halfway through the trials
conf.setInt(ThriftHBaseServiceHandler.MAX_IDLETIME, (numTrials / 2) * trialPause);
conf.setInt(ThriftHBaseServiceHandler.CLEANUP_INTERVAL, cleanUpInterval);
ThriftHBaseServiceHandler handler = new ThriftHBaseServiceHandler(conf,
UserProvider.instantiate(conf));

ByteBuffer table = wrap(tableAname);
// insert data
TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
wrap(valueAname));
List<TColumnValue> columnValues = new ArrayList<TColumnValue>();
columnValues.add(columnValue);
for (int i = 0; i < numTrials; i++) {
TPut put = new TPut(wrap(("testScan" + i).getBytes()), columnValues);
handler.put(table, put);
}

// create scan instance
TScan scan = new TScan();
List<TColumn> columns = new ArrayList<TColumn>();
TColumn column = new TColumn();
column.setFamily(familyAname);
column.setQualifier(qualifierAname);
columns.add(column);
scan.setColumns(columns);
scan.setStartRow("testScan".getBytes());
scan.setStopRow("testScan\uffff".getBytes());
// Prevent the scanner from caching results
scan.setCaching(1);

// get scanner and rows
int scanId = handler.openScanner(table, scan);
for (int i = 0; i < numTrials; i++) {
// Make sure that the Scanner doesn't throw an exception after the ConnectionCache timeout
List<TResult> results = handler.getScannerRows(scanId, 1);
assertArrayEquals(("testScan" + i).getBytes(), results.get(0).getRow());
Thread.sleep(trialPause);
}
}

@Test
public void testReverseScan() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
Expand Down

0 comments on commit 03fe257

Please sign in to comment.