diff --git a/src/HBaseClient.java b/src/HBaseClient.java index 0708ed64..629eea5d 100644 --- a/src/HBaseClient.java +++ b/src/HBaseClient.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved. + * Copyright (C) 2010-2018 The Async HBase Authors. All rights reserved. * This file is part of Async HBase. * * Redistribution and use in source and binary forms, with or without @@ -416,6 +416,9 @@ public final class HBaseClient { /** Default RPC timeout in milliseconds from the config */ private final int rpc_timeout; + + /** Whether or not we have to scan meta instead of making getClosestBeforeRow calls. */ + private final boolean scan_meta; private boolean increment_buffer_durable = false; @@ -550,6 +553,11 @@ public HBaseClient(final String quorum_spec, final String base_path, if (config.properties.containsKey("hbase.increments.durable")) { increment_buffer_durable = config.getBoolean("hbase.increments.durable"); } + if (config.hasProperty("hbase.meta.scan")) { + scan_meta = config.getBoolean("hbase.meta.scan"); + } else { + scan_meta = Boolean.parseBoolean(System.getProperty("hbase.meta.scan", "false")); + } } /** @@ -611,6 +619,11 @@ public HBaseClient(final Config config, if (config.properties.containsKey("hbase.increments.durable")) { increment_buffer_durable = config.getBoolean("hbase.increments.durable"); } + if (config.hasProperty("hbase.meta.scan")) { + scan_meta = config.getBoolean("hbase.meta.scan"); + } else { + scan_meta = Boolean.parseBoolean(System.getProperty("hbase.meta.scan", "false")); + } } /** @@ -2715,11 +2728,21 @@ private Deferred locateRegion(final HBaseRpc request, Deferred d = null; try { if (return_location) { - d = client.getClosestRowBefore(meta_region, meta_name, meta_key, INFO) + if (scan_meta) { + d = scanMeta(client, meta_region, meta_name, meta_key, INFO) .addCallback(meta_lookup_done_return_location); - } else{ - d = client.getClosestRowBefore(meta_region, meta_name, meta_key, INFO) + } else { + d = client.getClosestRowBefore(meta_region, meta_name, meta_key, INFO) + .addCallback(meta_lookup_done_return_location); + } + } else { + if (scan_meta) { + d = scanMeta(client, meta_region, meta_name, meta_key, INFO) .addCallback(meta_lookup_done); + } else { + d = client.getClosestRowBefore(meta_region, meta_name, meta_key, INFO) + .addCallback(meta_lookup_done); + } } } catch (RuntimeException e) { LOG.error("Unexpected exception while performing meta lookup", e); @@ -2771,6 +2794,75 @@ public String toString() { closest_before, return_location)); } + /** + * Later versions of HBase dropped the old + * {@link RegionClient#getClosestRowBefore(RegionInfo, byte[], byte[], byte[])} + * method and switched to performing reverse scans. This method will + * handle the scanning returning either a meta row if found or an empty + * array if the row/table was not found. + * TODO - need to see what happens with split meta. + * + * @param client The region client to open the scanner on. + * @param region The region we're scanning for. + * @param table The name of the table to scan for. + * @param row The row to scan for. + * @param family The family to scan on. + * @return A deferred resolving to an array list containing region + * info if successful or an empty array if the table/region doesn't + * exist. Or an exception if something goes pear shaped. + */ + private Deferred> scanMeta(final RegionClient client, + final RegionInfo region, + final byte[] table, + final byte[] row, + final byte[] family) { + final Scanner scanner = newScanner(table); + scanner.setReversed(true); + scanner.setMaxNumRows(1); + scanner.setStartKey(row); + scanner.setFamily(family); + scanner.setRegionName(region); + + final Deferred> deferred = + new Deferred>(); + + class ErrorCB implements Callback { + @Override + public Object call(final Exception ex) throws Exception { + scanner.close(); + deferred.callback(ex); + return null; + } + @Override + public String toString() { + return "scanMeta.ErrorCB"; + } + } + + class MetaScanCB implements Callback>> { + @Override + public Void call(final ArrayList> rows) + throws Exception { + final ArrayList row = (rows == null || rows.isEmpty()) + ? new ArrayList(0) : rows.get(0); + scanner.close(); + deferred.callback(row); + return null; + } + @Override + public String toString() { + return "scanMeta.MetaScanCB"; + } + } + + HBaseRpc open_request = scanner.getOpenRequestForReverseScan(row); + open_request.region = region; + open_request.getDeferred().addCallbackDeferring(scanner.opened_scanner) + .addCallbacks(new MetaScanCB(), new ErrorCB()); + client.sendRpc(open_request); + return deferred; + } + /** * Callback executed when a lookup in META completes * and user wants RegionLocation. diff --git a/src/Scanner.java b/src/Scanner.java index ecb4a05c..2496d432 100644 --- a/src/Scanner.java +++ b/src/Scanner.java @@ -766,7 +766,7 @@ public Deferred>> nextRows() { /** * Callback to handle response from opening a scanner */ - private final Callback>>, Object> + final Callback>>, Object> opened_scanner = new Callback>>, Object>() { public Deferred>> call(final Object arg) { @@ -806,7 +806,7 @@ public String toString() { * This returns an {@code ArrayList>} (possibly inside a * deferred one). */ - private final Callback got_next_row = + final Callback got_next_row = new Callback() { public Object call(final Object response) { ArrayList> rows = null; @@ -841,7 +841,7 @@ public String toString() { /** * Creates a new errback to handle errors while trying to get more rows. */ - private final Callback nextRowErrback() { + final Callback nextRowErrback() { return new Callback() { public Object call(final Object error) { final RegionInfo old_region = region; // Save before invalidate(). @@ -1481,7 +1481,7 @@ public String toString() { * RPC sent out to fetch the next rows from the RegionServer. */ final class GetNextRowsRequest extends HBaseRpc { - + @Override byte[] method(final byte server_version) { return (server_version >= RegionClient.SERVER_VERSION_095_OR_ABOVE diff --git a/test/TestHBaseClientLocateRegion.java b/test/TestHBaseClientLocateRegion.java index 0afb22a2..74f87a0e 100644 --- a/test/TestHBaseClientLocateRegion.java +++ b/test/TestHBaseClientLocateRegion.java @@ -7,10 +7,13 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; import java.util.ArrayList; +import org.hbase.async.Scanner.OpenScannerRequest; +import org.hbase.async.Scanner.Response; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -21,6 +24,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import com.google.common.collect.Lists; import com.stumbleupon.async.Deferred; @RunWith(PowerMockRunner.class) @@ -313,6 +317,139 @@ public void locateRegionLookupInZK() throws Exception { assertCounters(0, 0, 0); } + //--------------- TABLE LOOKUP IN META -------------- + + @Test + public void locateRegionInMeta() throws Exception { + clearCaches(); + + Whitebox.setInternalState(client, "has_root", false); + + when(rootclient.isAlive()).thenReturn(true); + when(rootclient.acquireMetaLookupPermit()).thenReturn(true); + when(rootclient.getClosestRowBefore(any(RegionInfo.class), any(byte[].class), + any(byte[].class), any(byte[].class))) + .thenReturn(Deferred.>fromResult(metaRow())); + + final Deferred obj = Whitebox.invokeMethod(client, "locateRegion", + get, TABLE, KEY); + assertTrue(root_deferred != obj); + final RegionClient rc = (RegionClient) obj.join(1); + assertCounters(0, 1, 0); + assertEquals(1, client2regions.size()); + assertNotNull(client2regions.get(rc)); + } + + @Test + public void locateRegionInMetaNoSuchTable() throws Exception { + clearCaches(); + + Whitebox.setInternalState(client, "has_root", false); + + when(rootclient.isAlive()).thenReturn(true); + when(rootclient.acquireMetaLookupPermit()).thenReturn(true); + when(rootclient.getClosestRowBefore(any(RegionInfo.class), any(byte[].class), + any(byte[].class), any(byte[].class))) + .thenReturn(Deferred.>fromResult(new ArrayList(0))); + + final Deferred obj = Whitebox.invokeMethod(client, "locateRegion", + get, HBaseClient.META, EMPTY_ARRAY); + assertTrue(root_deferred != obj); + try { + obj.join(1); + fail("Expected TableNotFoundException"); + } catch (TableNotFoundException e) { } + assertCounters(0, 1, 0); + assertEquals(0, client2regions.size()); + } + + @Test + public void locateRegionInMetaScan() throws Exception { + clearCaches(); + + Whitebox.setInternalState(client, "has_root", false); + Whitebox.setInternalState(client, "scan_meta", true); + + when(rootclient.isAlive()).thenReturn(true); + when(rootclient.acquireMetaLookupPermit()).thenReturn(true); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final ArrayList> rows = Lists.newArrayList(); + rows.add(metaRow()); + ((HBaseRpc) invocation.getArguments()[0]).getDeferred() + .callback(new Response(0, rows, false, true)); + return null; + } + }).when(rootclient).sendRpc(any(OpenScannerRequest.class)); + + final Deferred obj = Whitebox.invokeMethod(client, "locateRegion", + get, TABLE, KEY); + assertTrue(root_deferred != obj); + final RegionClient rc = (RegionClient) obj.join(1); + assertCounters(0, 1, 0); + assertEquals(1, client2regions.size()); + assertNotNull(client2regions.get(rc)); + } + + @Test + public void locateRegionInMetaScanNoSuchTable() throws Exception { + clearCaches(); + + Whitebox.setInternalState(client, "has_root", false); + Whitebox.setInternalState(client, "scan_meta", true); + + when(rootclient.isAlive()).thenReturn(true); + when(rootclient.acquireMetaLookupPermit()).thenReturn(true); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ((HBaseRpc) invocation.getArguments()[0]).getDeferred() + .callback(new Response(0, null, false, true)); + return null; + } + }).when(rootclient).sendRpc(any(OpenScannerRequest.class)); + + final Deferred obj = Whitebox.invokeMethod(client, "locateRegion", + get, TABLE, KEY); + assertTrue(root_deferred != obj); + try { + obj.join(1); + fail("Expected TableNotFoundException"); + } catch (TableNotFoundException e) { } + assertCounters(0, 1, 0); + assertEquals(0, client2regions.size()); + } + + @Test + public void locateRegionInMetaScanException() throws Exception { + clearCaches(); + + Whitebox.setInternalState(client, "has_root", false); + Whitebox.setInternalState(client, "scan_meta", true); + + when(rootclient.isAlive()).thenReturn(true); + when(rootclient.acquireMetaLookupPermit()).thenReturn(true); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ((HBaseRpc) invocation.getArguments()[0]).getDeferred() + .callback(new NonRecoverableException("Boo!")); + return null; + } + }).when(rootclient).sendRpc(any(OpenScannerRequest.class)); + + final Deferred obj = Whitebox.invokeMethod(client, "locateRegion", + get, TABLE, KEY); + assertTrue(root_deferred != obj); + try { + obj.join(1); + fail("Expected NonRecoverableException"); + } catch (NonRecoverableException e) { } + assertCounters(0, 1, 0); + assertEquals(0, client2regions.size()); + } + // ---------- PARAMS ----------- @Test (expected = NullPointerException.class) public void locateRegionNullTable() throws Exception {