Skip to content

Commit

Permalink
HBASE-21761 Align the methods in RegionLocator and AsyncTableRegionLo…
Browse files Browse the repository at this point in the history
…cator

Signed-off-by: Guanghao Zhang <zghao@apache.org>
  • Loading branch information
Apache9 committed Jan 24, 2019
1 parent d4085d1 commit 416b70f
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 254 deletions.
Expand Up @@ -149,7 +149,7 @@ public void close() {


@Override @Override
public AsyncTableRegionLocator getRegionLocator(TableName tableName) { public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
return new AsyncTableRegionLocatorImpl(tableName, locator); return new AsyncTableRegionLocatorImpl(tableName, this);
} }


// we will override this method for testing retry caller, so do not remove this method. // we will override this method for testing retry caller, so do not remove this method.
Expand Down
Expand Up @@ -17,10 +17,13 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;


import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;


/** /**
Expand Down Expand Up @@ -56,7 +59,7 @@ default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row) {
* @param reload true to reload information or false to use cached information * @param reload true to reload information or false to use cached information
*/ */
default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) { default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload); return getRegionLocation(row, RegionInfo.DEFAULT_REPLICA_ID, reload);
} }


/** /**
Expand All @@ -82,9 +85,78 @@ default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int rep
*/ */
CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload); CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);


/**
* Find all the replicas for the region on which the given row is being served.
* @param row Row to find.
* @return Locations for all the replicas of the row.
* @throws IOException if a remote or network exception occurs
*/
default CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row) {
return getRegionLocations(row, false);
}

/**
* Find all the replicas for the region on which the given row is being served.
* @param row Row to find.
* @param reload true to reload information or false to use cached information
* @return Locations for all the replicas of the row.
* @throws IOException if a remote or network exception occurs
*/
CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row, boolean reload);

/** /**
* Retrieves all of the regions associated with this table. * Retrieves all of the regions associated with this table.
* <p/>
* Usually we will go to meta table directly in this method so there is no {@code reload}
* parameter.
* <p/>
* Notice that the location for region replicas other than the default replica are also returned.
* @return a {@link List} of all regions associated with this table. * @return a {@link List} of all regions associated with this table.
*/ */
CompletableFuture<List<HRegionLocation>> getAllRegionLocations(); CompletableFuture<List<HRegionLocation>> getAllRegionLocations();

/**
* Gets the starting row key for every region in the currently open table.
* <p>
* This is mainly useful for the MapReduce integration.
* @return Array of region starting row keys
* @throws IOException if a remote or network exception occurs
*/
default CompletableFuture<List<byte[]>> getStartKeys() throws IOException {
return getStartEndKeys().thenApply(
startEndKeys -> startEndKeys.stream().map(Pair::getFirst).collect(Collectors.toList()));
}

/**
* Gets the ending row key for every region in the currently open table.
* <p>
* This is mainly useful for the MapReduce integration.
* @return Array of region ending row keys
* @throws IOException if a remote or network exception occurs
*/
default CompletableFuture<List<byte[]>> getEndKeys() throws IOException {
return getStartEndKeys().thenApply(
startEndKeys -> startEndKeys.stream().map(Pair::getSecond).collect(Collectors.toList()));
}

/**
* Gets the starting and ending row keys for every region in the currently open table.
* <p>
* This is mainly useful for the MapReduce integration.
* @return Pair of arrays of region starting and ending row keys
* @throws IOException if a remote or network exception occurs
*/
default CompletableFuture<List<Pair<byte[], byte[]>>> getStartEndKeys() throws IOException {
return getAllRegionLocations().thenApply(
locs -> locs.stream().filter(loc -> RegionReplicaUtil.isDefaultReplica(loc.getRegion()))
.map(HRegionLocation::getRegion).map(r -> Pair.newPair(r.getStartKey(), r.getEndKey()))
.collect(Collectors.toList()));
}

/**
* Clear all the entries in the region location cache.
* <p/>
* This may cause performance issue so use it with caution.
*/
void clearRegionLocationCache();
} }
Expand Up @@ -17,20 +17,15 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;


import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;


import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/** /**
* The implementation of AsyncRegionLocator. * The implementation of AsyncRegionLocator.
*/ */
Expand All @@ -39,11 +34,11 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {


private final TableName tableName; private final TableName tableName;


private final AsyncRegionLocator locator; private final AsyncConnectionImpl conn;


public AsyncTableRegionLocatorImpl(TableName tableName, AsyncRegionLocator locator) { public AsyncTableRegionLocatorImpl(TableName tableName, AsyncConnectionImpl conn) {
this.tableName = tableName; this.tableName = tableName;
this.locator = locator; this.conn = conn;
} }


@Override @Override
Expand All @@ -54,67 +49,29 @@ public TableName getName() {
@Override @Override
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId,
boolean reload) { boolean reload) {
return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload, return conn.getLocator().getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT,
-1L); reload, -1L);
} }


// this is used to prevent stack overflow if there are thousands of regions for the table. If the @Override
// location is in cache, the CompletableFuture will be completed immediately inside the same public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
// thread, and then in the action we will call locate again, also in the same thread. If all the if (TableName.isMetaTableName(tableName)) {
// locations are in cache, and we do not use whenCompleteAsync to break the tie, the stack will be return conn.registry.getMetaRegionLocation()
// very very deep and cause stack overflow. .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
@VisibleForTesting
static final ThreadLocal<MutableInt> STACK_DEPTH = new ThreadLocal<MutableInt>() {

@Override
protected MutableInt initialValue() {
return new MutableInt(0);
} }
}; return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),

Optional.of(tableName));
@VisibleForTesting }
static final int MAX_STACK_DEPTH = 16;


private void locate(CompletableFuture<List<HRegionLocation>> future, @Override
ConcurrentLinkedQueue<HRegionLocation> result, byte[] row) { public CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row, boolean reload) {
BiConsumer<HRegionLocation, Throwable> listener = (loc, error) -> { return conn.getLocator()
if (error != null) { .getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L)
future.completeExceptionally(error); .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
return;
}
result.add(loc);
if (ConnectionUtils.isEmptyStartRow(loc.getRegion().getStartKey())) {
future.complete(result.stream()
.sorted((l1, l2) -> RegionInfo.COMPARATOR.compare(l1.getRegion(), l2.getRegion()))
.collect(Collectors.toList()));
} else {
locate(future, result, loc.getRegion().getStartKey());
}
};
MutableInt depth = STACK_DEPTH.get();
boolean async = depth.incrementAndGet() >= MAX_STACK_DEPTH;
try {
CompletableFuture<HRegionLocation> f =
locator.getRegionLocation(tableName, row, RegionLocateType.BEFORE, -1L);
if (async) {
FutureUtils.addListenerAsync(f, listener);
} else {
FutureUtils.addListener(f, listener);
}
} finally {
if (depth.decrementAndGet() == 0) {
STACK_DEPTH.remove();
}
}
} }


@Override @Override
public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() { public void clearRegionLocationCache() {
ConcurrentLinkedQueue<HRegionLocation> result = new ConcurrentLinkedQueue<>(); conn.getLocator().clearCache(tableName);
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
// start from end to start, as when locating we will do reverse scan, so we will prefetch the
// location of the regions before the current one.
locate(future, result, HConstants.EMPTY_END_ROW);
return future;
} }
} }
Expand Up @@ -108,6 +108,9 @@ default List<HRegionLocation> getRegionLocations(byte[] row) throws IOException
/** /**
* Retrieves all of the regions associated with this table. * Retrieves all of the regions associated with this table.
* <p/> * <p/>
* Usually we will go to meta table directly in this method so there is no {@code reload}
* parameter.
* <p/>
* Notice that the location for region replicas other than the default replica are also returned. * Notice that the location for region replicas other than the default replica are also returned.
* @return a {@link List} of all regions associated with this table. * @return a {@link List} of all regions associated with this table.
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
Expand Down
Expand Up @@ -17,12 +17,18 @@
*/ */
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;


import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import org.apache.hbase.thirdparty.com.google.common.base.Throwables;

/** /**
* Helper class for processing futures. * Helper class for processing futures.
*/ */
Expand Down Expand Up @@ -59,21 +65,18 @@ public static <T> void addListener(CompletableFuture<T> future,
} }


/** /**
* Almost the same with the {@link #addListener(CompletableFuture, BiConsumer)} method above, the * A helper class for getting the result of a Future, and convert the error to an
* difference is that in this method we will call * {@link IOException}.
* {@link CompletableFuture#whenCompleteAsync(BiConsumer)} instead of
* {@link CompletableFuture#whenComplete(BiConsumer)}.
* @see #addListener(CompletableFuture, BiConsumer)
*/ */
@SuppressWarnings("FutureReturnValueIgnored") public static <T> T get(Future<T> future) throws IOException {
public static <T> void addListenerAsync(CompletableFuture<T> future, try {
BiConsumer<? super T, ? super Throwable> action) { return future.get();
future.whenCompleteAsync((resp, error) -> { } catch (InterruptedException e) {
try { throw (IOException) new InterruptedIOException().initCause(e);
action.accept(resp, error); } catch (ExecutionException e) {
} catch (Throwable t) { Throwable cause = e.getCause();
LOG.error("Unexpected error caught when processing CompletableFuture", t); Throwables.propagateIfPossible(cause, IOException.class);
} throw new IOException(cause);
}); }
} }
} }

0 comments on commit 416b70f

Please sign in to comment.