Skip to content

Commit

Permalink
pinpoint-apm#320 change htablePool api
Browse files Browse the repository at this point in the history
 - hbase timeout option
 - hbase thread pool option
  • Loading branch information
emeroad committed May 28, 2015
1 parent a0553e8 commit e2da730
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 49 deletions.
25 changes: 20 additions & 5 deletions collector/src/main/resources/applicationContext-hbase.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,33 @@
<props>
<prop key="hbase.zookeeper.quorum">${hbase.client.host}</prop>
<prop key="hbase.zookeeper.property.clientPort">${hbase.client.port}</prop>
<prop key="hbase.htable.threads.max">${hbase.htable.threads.max}</prop>
<prop key="hbase.ipc.client.tcpnodelay">true</prop>
<prop key="hbase.rpc.timeout">30000</prop>
<prop key="ipc.socket.timeout">10000</prop>

<!-- hbase default:true -->
<prop key="hbase.ipc.client.tcpnodelay">${hbase.ipc.client.tcpnodelay}</prop>
<!-- hbase default:60000 -->
<prop key="hbase.rpc.timeout">${hbase.rpc.timeout}</prop>
<!-- hbase default:Integer.MAX_VALUE -->
<prop key="hbase.client.operation.timeout">${hbase.client.operation.timeout}</prop>

<!-- hbase socket read timeout. default: 200000-->
<prop key="hbase.ipc.client.socket.timeout.read">${hbase.ipc.client.socket.timeout.read}</prop>
<!-- socket write timeout. hbase default: 600000-->
<prop key="hbase.ipc.client.socket.timeout.write">${hbase.ipc.client.socket.timeout.write}</prop>
</props>
</property>
<property name="deleteConnection" value="false"/>
</bean>

<bean id="connectionFactory" class="com.navercorp.pinpoint.common.hbase.PooledHTableFactory">
<constructor-arg ref="hbaseConfiguration"/>
<constructor-arg value="${hbase.client.thread.max}"/>
<constructor-arg value="${hbase.client.threadPool.queueSize}"/>
<constructor-arg value="${hbase.client.threadPool.prestart}"/>
</bean>

<bean id="hbaseTemplate" class="com.navercorp.pinpoint.common.hbase.HbaseTemplate2">
<property name="configuration" ref="hbaseConfiguration"/>
<property name="poolSize" value="${hbase.hTablePoolSize}"/>
<property name="tableFactory" ref="connectionFactory"/>
</bean>

<bean id="applicationTraceIndexDistributor" class="com.sematext.hbase.wd.RowKeyDistributorByHashPrefix">
Expand Down
21 changes: 20 additions & 1 deletion collector/src/main/resources/hbase.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
hbase.client.host=localhost
hbase.client.port=2181
hbase.htable.threads.max=4

# hbase timeout option==================================================================================
# hbase default:true
hbase.ipc.client.tcpnodelay=true
# hbase default:60000
hbase.rpc.timeout=10000
# hbase default:Integer.MAX_VALUE
hbase.client.operation.timeout=10000

# hbase socket read timeout. default: 200000
hbase.ipc.client.socket.timeout.read=20000
# socket write timeout. hbase default: 600000
hbase.ipc.client.socket.timeout.write=60000

# ==================================================================================
# hbase client thread pool option
hbase.client.thread.max=1024
hbase.client.threadPool.queueSize=5120
# prestartAllCoreThreads
hbase.client.threadPool.prestart=false
2 changes: 0 additions & 2 deletions collector/src/main/resources/pinpoint-collector.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
hbase.hTablePoolSize=1024

# tcp listen ip
collector.tcpListenIp=0.0.0.0
collector.tcpListenPort=9994
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ public class HbaseTemplate2 extends HbaseTemplate implements HbaseOperations2, I

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private PooledHTableFactory pooledHTableFactory;
private int poolSize = PooledHTableFactory.DEFAULT_POOL_SIZE;

private ExecutorService executor = newCachedThreadPool();
private final ExecutorService executor = newCachedThreadPool();

public HbaseTemplate2() {
}
Expand All @@ -65,35 +62,21 @@ public ExecutorService newCachedThreadPool() {

public HbaseTemplate2(Configuration configuration) {
Assert.notNull(configuration);
setConfiguration(configuration);
}

public HbaseTemplate2(Configuration configuration, int poolSize) {
Assert.notNull(configuration);
this.poolSize = poolSize;
}

public int getPoolSize() {
return poolSize;
}

public void setPoolSize(int hTablePoolSize) {
this.poolSize = hTablePoolSize;
}

@Override
public void afterPropertiesSet() {
Configuration configuration = getConfiguration();
Assert.notNull(configuration, "configuration is required");
this.pooledHTableFactory = new PooledHTableFactory(configuration, poolSize);
this.setTableFactory(pooledHTableFactory);
Assert.notNull(getTableFactory(), "tableFactory is required");
}

@Override
public void destroy() throws Exception {
if (pooledHTableFactory != null) {
this.pooledHTableFactory.destroy();
}

logger.info("HbaseTemplate2.destroy()");
final ExecutorService executor = this.executor;
if (executor != null) {
executor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

import com.navercorp.pinpoint.common.util.ExecutorFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTableInterfaceFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
Expand All @@ -43,33 +40,35 @@ public class PooledHTableFactory implements HTableInterfaceFactory, DisposableBe

public static final int DEFAULT_POOL_SIZE = 256;
public static final int DEFAULT_WORKER_QUEUE_SIZE = 1024*5;
public static final boolean DEFAULT_PRESTART_THREAD_POOL = false;

private ExecutorService executor;
private HConnection connection;
private final ExecutorService executor;
private final HConnection connection;


public PooledHTableFactory(Configuration config) {
this(config, DEFAULT_POOL_SIZE, DEFAULT_WORKER_QUEUE_SIZE);
}
public PooledHTableFactory(Configuration config, int poolSize) {
this(config, poolSize, DEFAULT_WORKER_QUEUE_SIZE);
this(config, DEFAULT_POOL_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_PRESTART_THREAD_POOL);
}

public PooledHTableFactory(Configuration config, int poolSize, int workerQueueSize) {
this.executor = getExecutorService(poolSize, workerQueueSize);
public PooledHTableFactory(Configuration config, int poolSize, int workerQueueSize, boolean prestartThreadPool) {
this.executor = createExecutorService(poolSize, workerQueueSize, prestartThreadPool);
try {
this.connection = HConnectionManager.createConnection(config, executor);
this.connection = (HConnection)ConnectionFactory.createConnection(config, executor);
} catch (IOException e) {
throw new HbaseSystemException(e);
}
}

private ExecutorService getExecutorService(int poolSize, int workQueueMaxSize) {
private ExecutorService createExecutorService(int poolSize, int workQueueMaxSize, boolean prestartThreadPool) {

logger.info("create HConnectionThreadPoolExecutor poolSize:{}, workerQueueMaxSize:{}", poolSize, workQueueMaxSize);

ThreadPoolExecutor threadPoolExecutor = ExecutorFactory.newFixedThreadPool(poolSize, workQueueMaxSize, "Pinpoint-HConnectionExecutor", true);
threadPoolExecutor.prestartAllCoreThreads();
if (prestartThreadPool) {
logger.info("prestartAllCoreThreads");
threadPoolExecutor.prestartAllCoreThreads();
}

return threadPoolExecutor;
}

Expand All @@ -93,8 +92,13 @@ public void releaseHTableInterface(HTableInterface table) throws IOException {

@Override
public void destroy() throws Exception {
logger.info("PooledHTableFactory.destroy()");
if (connection != null) {
this.connection.close();
try {
this.connection.close();
} catch (IOException ex) {
logger.warn("Connection.close() error:" + ex.getMessage(), ex);
}
}

if (this.executor != null) {
Expand Down
24 changes: 22 additions & 2 deletions web/src/main/resources/applicationContext-hbase.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,33 @@
<props>
<prop key="hbase.zookeeper.quorum">${hbase.client.host}</prop>
<prop key="hbase.zookeeper.property.clientPort">${hbase.client.port}</prop>
<prop key="hbase.htable.threads.max">${hbase.htable.threads.max}</prop>

<!-- hbase default:true -->
<prop key="hbase.ipc.client.tcpnodelay">${hbase.ipc.client.tcpnodelay}</prop>
<!-- hbase default:60000 -->
<prop key="hbase.rpc.timeout">${hbase.rpc.timeout}</prop>
<!-- hbase default:Integer.MAX_VALUE -->
<prop key="hbase.client.operation.timeout">${hbase.client.operation.timeout}</prop>

<!-- hbase socket read timeout. default: 200000-->
<prop key="hbase.ipc.client.socket.timeout.read">${hbase.ipc.client.socket.timeout.read}</prop>
<!-- socket write timeout. hbase default: 600000-->
<prop key="hbase.ipc.client.socket.timeout.write">${hbase.ipc.client.socket.timeout.write}</prop>
</props>
</property>
<property name="deleteConnection" value="false"/>
</bean>

<bean id="connectionFactory" class="com.navercorp.pinpoint.common.hbase.PooledHTableFactory">
<constructor-arg ref="hbaseConfiguration"/>
<constructor-arg value="${hbase.client.thread.max}"/>
<constructor-arg value="${hbase.client.threadPool.queueSize}"/>
<constructor-arg value="${hbase.client.threadPool.prestart}"/>
</bean>

<bean id="hbaseTemplate" class="com.navercorp.pinpoint.common.hbase.HbaseTemplate2">
<property name="configuration" ref="hbaseConfiguration"></property>
<property name="configuration" ref="hbaseConfiguration"/>
<property name="tableFactory" ref="connectionFactory"/>
</bean>

<bean class="org.apache.hadoop.util.ShutdownHookManagerProxy"/>
Expand Down
21 changes: 20 additions & 1 deletion web/src/main/resources/hbase.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
hbase.client.host=localhost
hbase.client.port=2181
hbase.htable.threads.max=4

# hbase timeout option==================================================================================
# hbase default:true
hbase.ipc.client.tcpnodelay=true
# hbase default:60000
hbase.rpc.timeout=10000
# hbase default:Integer.MAX_VALUE
hbase.client.operation.timeout=10000

# hbase socket read timeout. default: 200000
hbase.ipc.client.socket.timeout.read=20000
# socket write timeout. hbase default: 600000
hbase.ipc.client.socket.timeout.write=30000

#==================================================================================
# hbase client thread pool option
hbase.client.thread.max=1024
hbase.client.threadPool.queueSize=5120
# prestartAllCoreThreads
hbase.client.threadPool.prestart=false
8 changes: 7 additions & 1 deletion web/src/main/resources/log4j.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,14 @@
<level value="INFO"/>
<appender-ref ref="console" />
</logger>
<logger name="org.apache.hadoop" additivity="false">
<level value="INFO"/>
<appender-ref ref="console" />
</logger>


<root>
<level value="DEBUG"/>
<level value="INFO"/>
<appender-ref ref="console" />
</root>
</log4j:configuration>

0 comments on commit e2da730

Please sign in to comment.