Skip to content

Commit

Permalink
Add a config flag (via file or JVM flag) called 'hbase.meta.scan' tha…
Browse files Browse the repository at this point in the history
…t should

be set when HBase 2.x is used. This will turn on reverse scanning for meta
information as opposed to the old way of calling getClosestRowBefore().
As part of this, some callbacks in the Scanner class are now package private
so we can re-use them in HBaseClient.java.
Fixes #150 Thanks @saintstack

TODO - need to see how this behaves with split meta.
TODO - I'd still like to auto-detect HBase 2.0 somehow.

Signed-off-by: Chris Larsen <clarsen@yahoo-inc.com>
  • Loading branch information
manolama committed Mar 18, 2018
1 parent 4247a7e commit 89513a5
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 8 deletions.
100 changes: 96 additions & 4 deletions src/HBaseClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved.
* Copyright (C) 2010-2018 The Async HBase Authors. All rights reserved.
* This file is part of Async HBase.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -416,6 +416,9 @@ public final class HBaseClient {

/** Default RPC timeout in milliseconds from the config */
private final int rpc_timeout;

/** Whether or not we have to scan meta instead of making getClosestBeforeRow calls. */
private final boolean scan_meta;

private boolean increment_buffer_durable = false;

Expand Down Expand Up @@ -550,6 +553,11 @@ public HBaseClient(final String quorum_spec, final String base_path,
if (config.properties.containsKey("hbase.increments.durable")) {
increment_buffer_durable = config.getBoolean("hbase.increments.durable");
}
if (config.hasProperty("hbase.meta.scan")) {
scan_meta = config.getBoolean("hbase.meta.scan");
} else {
scan_meta = Boolean.parseBoolean(System.getProperty("hbase.meta.scan", "false"));
}
}

/**
Expand Down Expand Up @@ -611,6 +619,11 @@ public HBaseClient(final Config config,
if (config.properties.containsKey("hbase.increments.durable")) {
increment_buffer_durable = config.getBoolean("hbase.increments.durable");
}
if (config.hasProperty("hbase.meta.scan")) {
scan_meta = config.getBoolean("hbase.meta.scan");
} else {
scan_meta = Boolean.parseBoolean(System.getProperty("hbase.meta.scan", "false"));
}
}

/**
Expand Down Expand Up @@ -2715,11 +2728,21 @@ private Deferred<Object> locateRegion(final HBaseRpc request,
Deferred<Object> d = null;
try {
if (return_location) {
d = client.getClosestRowBefore(meta_region, meta_name, meta_key, INFO)
if (scan_meta) {
d = scanMeta(client, meta_region, meta_name, meta_key, INFO)
.addCallback(meta_lookup_done_return_location);
} else{
d = client.getClosestRowBefore(meta_region, meta_name, meta_key, INFO)
} else {
d = client.getClosestRowBefore(meta_region, meta_name, meta_key, INFO)
.addCallback(meta_lookup_done_return_location);
}
} else {
if (scan_meta) {
d = scanMeta(client, meta_region, meta_name, meta_key, INFO)
.addCallback(meta_lookup_done);
} else {
d = client.getClosestRowBefore(meta_region, meta_name, meta_key, INFO)
.addCallback(meta_lookup_done);
}
}
} catch (RuntimeException e) {
LOG.error("Unexpected exception while performing meta lookup", e);
Expand Down Expand Up @@ -2771,6 +2794,75 @@ public String toString() {
closest_before, return_location));
}

/**
* Later versions of HBase dropped the old
* {@link RegionClient#getClosestRowBefore(RegionInfo, byte[], byte[], byte[])}
* method and switched to performing reverse scans. This method will
* handle the scanning returning either a meta row if found or an empty
* array if the row/table was not found.
* TODO - need to see what happens with split meta.
*
* @param client The region client to open the scanner on.
* @param region The region we're scanning for.
* @param table The name of the table to scan for.
* @param row The row to scan for.
* @param family The family to scan on.
* @return A deferred resolving to an array list containing region
* info if successful or an empty array if the table/region doesn't
* exist. Or an exception if something goes pear shaped.
*/
private Deferred<ArrayList<KeyValue>> scanMeta(final RegionClient client,
final RegionInfo region,
final byte[] table,
final byte[] row,
final byte[] family) {
final Scanner scanner = newScanner(table);
scanner.setReversed(true);
scanner.setMaxNumRows(1);
scanner.setStartKey(row);
scanner.setFamily(family);
scanner.setRegionName(region);

final Deferred<ArrayList<KeyValue>> deferred =
new Deferred<ArrayList<KeyValue>>();

class ErrorCB implements Callback<Object, Exception> {
@Override
public Object call(final Exception ex) throws Exception {
scanner.close();
deferred.callback(ex);
return null;
}
@Override
public String toString() {
return "scanMeta.ErrorCB";
}
}

class MetaScanCB implements Callback<Void, ArrayList<ArrayList<KeyValue>>> {
@Override
public Void call(final ArrayList<ArrayList<KeyValue>> rows)
throws Exception {
final ArrayList<KeyValue> row = (rows == null || rows.isEmpty())
? new ArrayList<KeyValue>(0) : rows.get(0);
scanner.close();
deferred.callback(row);
return null;
}
@Override
public String toString() {
return "scanMeta.MetaScanCB";
}
}

HBaseRpc open_request = scanner.getOpenRequestForReverseScan(row);
open_request.region = region;
open_request.getDeferred().addCallbackDeferring(scanner.opened_scanner)
.addCallbacks(new MetaScanCB(), new ErrorCB());
client.sendRpc(open_request);
return deferred;
}

/**
* Callback executed when a lookup in META completes
* and user wants RegionLocation.
Expand Down
8 changes: 4 additions & 4 deletions src/Scanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ public Deferred<ArrayList<ArrayList<KeyValue>>> nextRows() {
/**
* Callback to handle response from opening a scanner
*/
private final Callback<Deferred<ArrayList<ArrayList<KeyValue>>>, Object>
final Callback<Deferred<ArrayList<ArrayList<KeyValue>>>, Object>
opened_scanner =
new Callback<Deferred<ArrayList<ArrayList<KeyValue>>>, Object>() {
public Deferred<ArrayList<ArrayList<KeyValue>>> call(final Object arg) {
Expand Down Expand Up @@ -806,7 +806,7 @@ public String toString() {
* This returns an {@code ArrayList<ArrayList<KeyValue>>} (possibly inside a
* deferred one).
*/
private final Callback<Object, Object> got_next_row =
final Callback<Object, Object> got_next_row =
new Callback<Object, Object>() {
public Object call(final Object response) {
ArrayList<ArrayList<KeyValue>> rows = null;
Expand Down Expand Up @@ -841,7 +841,7 @@ public String toString() {
/**
* Creates a new errback to handle errors while trying to get more rows.
*/
private final Callback<Object, Object> nextRowErrback() {
final Callback<Object, Object> nextRowErrback() {
return new Callback<Object, Object>() {
public Object call(final Object error) {
final RegionInfo old_region = region; // Save before invalidate().
Expand Down Expand Up @@ -1481,7 +1481,7 @@ public String toString() {
* RPC sent out to fetch the next rows from the RegionServer.
*/
final class GetNextRowsRequest extends HBaseRpc {

@Override
byte[] method(final byte server_version) {
return (server_version >= RegionClient.SERVER_VERSION_095_OR_ABOVE
Expand Down
137 changes: 137 additions & 0 deletions test/TestHBaseClientLocateRegion.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

import java.util.ArrayList;

import org.hbase.async.Scanner.OpenScannerRequest;
import org.hbase.async.Scanner.Response;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -21,6 +24,7 @@
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

import com.google.common.collect.Lists;
import com.stumbleupon.async.Deferred;

@RunWith(PowerMockRunner.class)
Expand Down Expand Up @@ -313,6 +317,139 @@ public void locateRegionLookupInZK() throws Exception {
assertCounters(0, 0, 0);
}

//--------------- TABLE LOOKUP IN META --------------

@Test
public void locateRegionInMeta() throws Exception {
clearCaches();

Whitebox.setInternalState(client, "has_root", false);

when(rootclient.isAlive()).thenReturn(true);
when(rootclient.acquireMetaLookupPermit()).thenReturn(true);
when(rootclient.getClosestRowBefore(any(RegionInfo.class), any(byte[].class),
any(byte[].class), any(byte[].class)))
.thenReturn(Deferred.<ArrayList<KeyValue>>fromResult(metaRow()));

final Deferred<Object> obj = Whitebox.invokeMethod(client, "locateRegion",
get, TABLE, KEY);
assertTrue(root_deferred != obj);
final RegionClient rc = (RegionClient) obj.join(1);
assertCounters(0, 1, 0);
assertEquals(1, client2regions.size());
assertNotNull(client2regions.get(rc));
}

@Test
public void locateRegionInMetaNoSuchTable() throws Exception {
clearCaches();

Whitebox.setInternalState(client, "has_root", false);

when(rootclient.isAlive()).thenReturn(true);
when(rootclient.acquireMetaLookupPermit()).thenReturn(true);
when(rootclient.getClosestRowBefore(any(RegionInfo.class), any(byte[].class),
any(byte[].class), any(byte[].class)))
.thenReturn(Deferred.<ArrayList<KeyValue>>fromResult(new ArrayList<KeyValue>(0)));

final Deferred<Object> obj = Whitebox.invokeMethod(client, "locateRegion",
get, HBaseClient.META, EMPTY_ARRAY);
assertTrue(root_deferred != obj);
try {
obj.join(1);
fail("Expected TableNotFoundException");
} catch (TableNotFoundException e) { }
assertCounters(0, 1, 0);
assertEquals(0, client2regions.size());
}

@Test
public void locateRegionInMetaScan() throws Exception {
clearCaches();

Whitebox.setInternalState(client, "has_root", false);
Whitebox.setInternalState(client, "scan_meta", true);

when(rootclient.isAlive()).thenReturn(true);
when(rootclient.acquireMetaLookupPermit()).thenReturn(true);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final ArrayList<ArrayList<KeyValue>> rows = Lists.newArrayList();
rows.add(metaRow());
((HBaseRpc) invocation.getArguments()[0]).getDeferred()
.callback(new Response(0, rows, false, true));
return null;
}
}).when(rootclient).sendRpc(any(OpenScannerRequest.class));

final Deferred<Object> obj = Whitebox.invokeMethod(client, "locateRegion",
get, TABLE, KEY);
assertTrue(root_deferred != obj);
final RegionClient rc = (RegionClient) obj.join(1);
assertCounters(0, 1, 0);
assertEquals(1, client2regions.size());
assertNotNull(client2regions.get(rc));
}

@Test
public void locateRegionInMetaScanNoSuchTable() throws Exception {
clearCaches();

Whitebox.setInternalState(client, "has_root", false);
Whitebox.setInternalState(client, "scan_meta", true);

when(rootclient.isAlive()).thenReturn(true);
when(rootclient.acquireMetaLookupPermit()).thenReturn(true);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
((HBaseRpc) invocation.getArguments()[0]).getDeferred()
.callback(new Response(0, null, false, true));
return null;
}
}).when(rootclient).sendRpc(any(OpenScannerRequest.class));

final Deferred<Object> obj = Whitebox.invokeMethod(client, "locateRegion",
get, TABLE, KEY);
assertTrue(root_deferred != obj);
try {
obj.join(1);
fail("Expected TableNotFoundException");
} catch (TableNotFoundException e) { }
assertCounters(0, 1, 0);
assertEquals(0, client2regions.size());
}

@Test
public void locateRegionInMetaScanException() throws Exception {
clearCaches();

Whitebox.setInternalState(client, "has_root", false);
Whitebox.setInternalState(client, "scan_meta", true);

when(rootclient.isAlive()).thenReturn(true);
when(rootclient.acquireMetaLookupPermit()).thenReturn(true);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
((HBaseRpc) invocation.getArguments()[0]).getDeferred()
.callback(new NonRecoverableException("Boo!"));
return null;
}
}).when(rootclient).sendRpc(any(OpenScannerRequest.class));

final Deferred<Object> obj = Whitebox.invokeMethod(client, "locateRegion",
get, TABLE, KEY);
assertTrue(root_deferred != obj);
try {
obj.join(1);
fail("Expected NonRecoverableException");
} catch (NonRecoverableException e) { }
assertCounters(0, 1, 0);
assertEquals(0, client2regions.size());
}

// ---------- PARAMS -----------
@Test (expected = NullPointerException.class)
public void locateRegionNullTable() throws Exception {
Expand Down

0 comments on commit 89513a5

Please sign in to comment.