Skip to content

Commit

Permalink
HBASE-6580 Deprecate HTablePool in favor of HConnection.getTable(...)
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1511543 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
lhofhansl committed Aug 7, 2013
1 parent 602d95c commit e4c01da
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 32 deletions.
Expand Up @@ -70,6 +70,60 @@ public interface HConnection extends Abortable, Closeable {
*/
Configuration getConfiguration();

/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
* (created with {@link HConnectionManager#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(String tableName) throws IOException;

/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
* (created with {@link HConnectionManager#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(byte[] tableName) throws IOException;

/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
* (created with {@link HConnectionManager#createConnection(Configuration)}).
* @param tableName
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException;

/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
* (created with {@link HConnectionManager#createConnection(Configuration)}).
* @param tableName
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException;

/** @return - true if the master server is running */
boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;
Expand Down
Expand Up @@ -36,6 +36,9 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -137,6 +140,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
Expand All @@ -148,8 +152,22 @@
import com.google.protobuf.ServiceException;

/**
* A non-instantiable class that manages {@link HConnection}s.
* This class has a static Map of {@link HConnection} instances keyed by
* A non-instantiable class that manages creation of {@link HConnection}s.
* <p>The simplest way to use this class is by using {@link #createConnection(Configuration)}.
* This creates a new {@link HConnection} that is managed by the caller.
* From this {@link HConnection} {@link HTableInterface} implementations are retrieved
* with {@link HConnection#getTable(byte[])}. Example:
* <pre>
* {@code
* HConnection connection = HConnectionManager.createConnection(config);
* HTableInterface table = connection.getTable("table1");
* // use the table as needed, for a single operation and a single thread
* table.close();
* connection.close();
* }
* </pre>
* <p>The following logic and API will be removed in the future:
* <p>This class has a static Map of {@link HConnection} instances keyed by
* {@link Configuration}; all invocations of {@link #getConnection(Configuration)}
* that pass the same {@link Configuration} instance will be returned the same
* {@link HConnection} instance (Adding properties to a Configuration
Expand Down Expand Up @@ -241,6 +259,7 @@ private HConnectionManager() {
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
@Deprecated
@SuppressWarnings("resource")
public static HConnection getConnection(final Configuration conf)
throws IOException {
Expand All @@ -263,18 +282,61 @@ public static HConnection getConnection(final Configuration conf)
/**
* Create a new HConnection instance using the passed <code>conf</code> instance.
* <p>Note: This bypasses the usual HConnection life cycle management done by
* {@link #getConnection(Configuration)}. Use this with caution, the caller is responsible for
* {@link #getConnection(Configuration)}. The caller is responsible for
* calling {@link HConnection#close()} on the returned connection instance.
*
* This is the recommended way to create HConnections.
* {@code
* HConnection connection = HConnectionManager.createConnection(conf);
* HTableInterface table = connection.getTable("mytable");
* table.get(...);
* ...
* table.close();
* connection.close();
* }
*
* @param conf configuration
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
public static HConnection createConnection(Configuration conf)
throws IOException {
return createConnection(conf, false);
return createConnection(conf, false, null);
}

/**
* Create a new HConnection instance using the passed <code>conf</code> instance.
* <p>Note: This bypasses the usual HConnection life cycle management done by
* {@link #getConnection(Configuration)}. The caller is responsible for
* calling {@link HConnection#close()} on the returned connection instance.
* This is the recommended way to create HConnections.
* {@code
* ExecutorService pool = ...;
* HConnection connection = HConnectionManager.createConnection(conf, pool);
* HTableInterface table = connection.getTable("mytable");
* table.get(...);
* ...
* table.close();
* connection.close();
* }
* @param conf configuration
* @param pool the thread pool to use for batch operation in HTables used via this HConnection
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
public static HConnection createConnection(Configuration conf, ExecutorService pool)
throws IOException {
return createConnection(conf, false, pool);
}

@Deprecated
static HConnection createConnection(final Configuration conf, final boolean managed)
throws IOException {
return createConnection(conf, managed, null);
}

@Deprecated
static HConnection createConnection(final Configuration conf, final boolean managed, final ExecutorService pool)
throws IOException {
String className = conf.get("hbase.client.connection.impl",
HConnectionManager.HConnectionImplementation.class.getName());
Expand All @@ -287,9 +349,9 @@ static HConnection createConnection(final Configuration conf, final boolean mana
try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor =
clazz.getDeclaredConstructor(Configuration.class, boolean.class);
clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class);
constructor.setAccessible(true);
return (HConnection) constructor.newInstance(conf, managed);
return (HConnection) constructor.newInstance(conf, managed, pool);
} catch (Exception e) {
throw new IOException(e);
}
Expand All @@ -301,6 +363,7 @@ static HConnection createConnection(final Configuration conf, final boolean mana
* then close connection to the zookeeper ensemble and let go of all associated resources.
*
* @param conf configuration whose identity is used to find {@link HConnection} instance.
* @deprecated
*/
public static void deleteConnection(Configuration conf) {
deleteConnection(new HConnectionKey(conf), false);
Expand All @@ -311,6 +374,7 @@ public static void deleteConnection(Configuration conf) {
* This will then close connection to the zookeeper ensemble and let go of all resources.
*
* @param connection
* @deprecated
*/
public static void deleteStaleConnection(HConnection connection) {
deleteConnection(connection, true);
Expand All @@ -320,6 +384,7 @@ public static void deleteStaleConnection(HConnection connection) {
* Delete information for all connections. Close or not the connection, depending on the
* staleConnection boolean and the ref count. By default, you should use it with
* staleConnection to true.
* @deprecated
*/
public static void deleteAllConnections(boolean staleConnection) {
synchronized (CONNECTION_INSTANCES) {
Expand All @@ -342,6 +407,7 @@ public static void deleteAllConnections() {
}


@Deprecated
private static void deleteConnection(HConnection connection, boolean staleConnection) {
synchronized (CONNECTION_INSTANCES) {
for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
Expand All @@ -353,6 +419,7 @@ private static void deleteConnection(HConnection connection, boolean staleConnec
}
}

@Deprecated
private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
synchronized (CONNECTION_INSTANCES) {
HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
Expand Down Expand Up @@ -464,6 +531,10 @@ static class HConnectionImplementation implements HConnection, Closeable {
private final DelayedClosing delayedClosing =
DelayedClosing.createAndStart(this);

// thread executor shared by all HTableInterface instances created
// by this connection
private volatile ExecutorService batchPool = null;
private volatile boolean cleanupPool = false;

private final Configuration conf;

Expand Down Expand Up @@ -499,6 +570,10 @@ static class HConnectionImplementation implements HConnection, Closeable {
*/
Registry registry;

HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
this(conf, managed, null);
}

/**
* constructor
* @param conf Configuration object
Expand All @@ -510,8 +585,9 @@ static class HConnectionImplementation implements HConnection, Closeable {
* are shared, we have reference counting going on and will only do full cleanup when no more
* users of an HConnectionImplementation instance.
*/
HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool) throws IOException {
this(conf);
this.batchPool = pool;
this.managed = managed;
this.registry = setupRegistry();
retrieveClusterId();
Expand Down Expand Up @@ -556,6 +632,74 @@ protected HConnectionImplementation(Configuration conf) {
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
}

@Override
public HTableInterface getTable(String tableName) throws IOException {
return getTable(Bytes.toBytes(tableName));
}

@Override
public HTableInterface getTable(byte[] tableName) throws IOException {
return getTable(tableName, getBatchPool());
}

@Override
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
return getTable(Bytes.toBytes(tableName), pool);
}

@Override
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
if (managed) {
throw new IOException("The connection has to be unmanaged.");
}
return new HTable(tableName, this, pool);
}

private ExecutorService getBatchPool() {
if (batchPool == null) {
// shared HTable thread executor not yet initialized
synchronized (this) {
if (batchPool == null) {
int maxThreads = conf.getInt("hbase.hconnection.threads.max",
Integer.MAX_VALUE);
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors();
}
long keepAliveTime = conf.getLong(
"hbase.hconnection.threads.keepalivetime", 60);
this.batchPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Threads.newDaemonThreadFactory("hbase-connection-shared-executor"));
((ThreadPoolExecutor) this.batchPool)
.allowCoreThreadTimeOut(true);
}
this.cleanupPool = true;
}
}
return this.batchPool;
}

protected ExecutorService getCurrentBatchPool() {
return batchPool;
}

private void shutdownBatchPool() {
if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
this.batchPool.shutdown();
try {
if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) {
this.batchPool.shutdownNow();
}
} catch (InterruptedException e) {
this.batchPool.shutdownNow();
}
}
}

/**
* @return The cluster registry implementation to use.
* @throws IOException
Expand Down Expand Up @@ -2267,6 +2411,7 @@ void internalClose() {
}
delayedClosing.stop("Closing connection");
closeMaster();
shutdownBatchPool();
this.closed = true;
closeZooKeeperWatcher();
this.stubs.clear();
Expand Down
Expand Up @@ -59,6 +59,26 @@ public HConnectionWrapper(final UserGroupInformation ugi,
this.ugi = ugi;
}

@Override
public HTableInterface getTable(String tableName) throws IOException {
return hconnection.getTable(tableName);
}

@Override
public HTableInterface getTable(byte[] tableName) throws IOException {
return hconnection.getTable(tableName);
}

@Override
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
return hconnection.getTable(tableName, pool);
}

@Override
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
return hconnection.getTable(tableName, pool);
}

@Override
public void abort(String why, Throwable e) {
hconnection.abort(why, e);
Expand Down
Expand Up @@ -59,6 +59,7 @@
* <p>
* Pool will manage its own connections to the cluster. See
* {@link HConnectionManager}.
* @deprecated Use {@link HConnection#getTable(String)} instead.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
Expand Down

0 comments on commit e4c01da

Please sign in to comment.