Skip to content

Commit

Permalink
[pinpoint-apm#8691] Separate hbase connection for batch operation
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Mar 8, 2022
1 parent 77e606c commit c0103fe
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,30 @@
<constructor-arg type="java.util.concurrent.ExecutorService" ref="hbaseThreadPool"/>
</bean>

<bean id="batchHbaseThreadPool" class="com.navercorp.pinpoint.common.server.util.PinpointThreadPoolExecutorFactoryBean">
<property name="corePoolSize" value="${hbase.client.thread.max}"/>
<property name="maxPoolSize" value="${hbase.client.thread.max}"/>
<property name="queueCapacity" value="${hbase.client.threadPool.queueSize}"/>
<property name="threadNamePrefix" value="Pinpoint-Batch-HConnectionExecutor-"/>
<property name="daemon" value="true"/>
<property name="waitForTasksToCompleteOnShutdown" value="true"/>
<property name="awaitTerminationSeconds" value="10"/>
<property name="preStartAllCoreThreads" value="true"/>
</bean>

<bean id="batchConnectionFactory" class="com.navercorp.pinpoint.common.hbase.ConnectionFactoryBean">
<constructor-arg type="org.apache.hadoop.conf.Configuration" ref="hbaseConfiguration"/>
<constructor-arg type="java.util.concurrent.ExecutorService" ref="batchHbaseThreadPool"/>
</bean>

<bean id="hbaseTableFactory" class="com.navercorp.pinpoint.common.hbase.HbaseTableFactory">
<constructor-arg ref="connectionFactory"/>
</bean>

<bean class="org.apache.hadoop.util.ShutdownHookManagerProxy"/>

<bean id="asyncOperation" class="com.navercorp.pinpoint.common.hbase.HBaseAsyncOperationFactory" factory-method="create">
<constructor-arg type="org.apache.hadoop.hbase.client.Connection" ref="connectionFactory"/>
<bean id="asyncOperation" class="com.navercorp.pinpoint.common.hbase.HBaseAsyncOperationFactory">
<constructor-arg type="org.apache.hadoop.hbase.client.Connection" ref="batchConnectionFactory"/>
<constructor-arg type="org.apache.hadoop.conf.Configuration" ref="hbaseConfiguration"/>
</bean>

Expand All @@ -93,7 +109,7 @@
<constructor-arg ref="connectionFactory"/>
</bean>

<bean id="simpleBatchWriter" class="com.navercorp.pinpoint.common.hbase.batch.DelegateSimpleBatchWriter"/>
<bean id="simpleBatchWriter" class="com.navercorp.pinpoint.common.hbase.batch.SimpleBatchWriterFactoryBean"/>

<bean id="hbaseAdminFactory" class="com.navercorp.pinpoint.common.hbase.HbaseAdminFactory">
<constructor-arg ref="connectionFactory"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTableMultiplexer;
import org.springframework.beans.factory.FactoryBean;

import java.io.IOException;
import java.util.Objects;

/**
* @author Taejin Koo
*/
public class HBaseAsyncOperationFactory {
public class HBaseAsyncOperationFactory implements FactoryBean<HBaseAsyncOperation> {

public static final String ENABLE_ASYNC_METHOD = "hbase.client.async.enable";
public static final boolean DEFAULT_ENABLE_ASYNC_METHOD = false;
Expand All @@ -40,26 +41,16 @@ public class HBaseAsyncOperationFactory {
public static final String ASYNC_MAX_RETRIES_IN_QUEUE = HTableMultiplexer.TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE;
public static final int DEFAULT_ASYNC_RETRY_COUNT = 10000;

public static HBaseAsyncOperation create(Configuration configuration) throws IOException {
boolean enableAsyncMethod = configuration.getBoolean(ENABLE_ASYNC_METHOD, DEFAULT_ENABLE_ASYNC_METHOD);
if (!enableAsyncMethod) {
return DisabledHBaseAsyncOperation.INSTANCE;
}

int queueSize = configuration.getInt(ASYNC_IN_QUEUE_SIZE, DEFAULT_ASYNC_IN_QUEUE_SIZE);

if (configuration.get(ASYNC_PERIODIC_FLUSH_TIME, null) == null) {
configuration.setInt(ASYNC_PERIODIC_FLUSH_TIME, DEFAULT_ASYNC_PERIODIC_FLUSH_TIME);
}

if (configuration.get(ASYNC_MAX_RETRIES_IN_QUEUE, null) == null) {
configuration.setInt(ASYNC_MAX_RETRIES_IN_QUEUE, DEFAULT_ASYNC_RETRY_COUNT);
}
private final Connection connection;
private final Configuration configuration;

return new HBaseAsyncTemplate(configuration, queueSize);
public HBaseAsyncOperationFactory(Connection connection, Configuration configuration) {
this.connection = Objects.requireNonNull(connection, "connection");
this.configuration = Objects.requireNonNull(configuration, "configuration");
}

public static HBaseAsyncOperation create(Connection connection, Configuration configuration) throws IOException {
@Override
public HBaseAsyncOperation getObject() throws Exception {
boolean enableAsyncMethod = configuration.getBoolean(ENABLE_ASYNC_METHOD, DEFAULT_ENABLE_ASYNC_METHOD);
if (!enableAsyncMethod) {
return DisabledHBaseAsyncOperation.INSTANCE;
Expand All @@ -78,4 +69,14 @@ public static HBaseAsyncOperation create(Connection connection, Configuration co
return new HBaseAsyncTemplate(connection, configuration, queueSize);
}


@Override
public Class<HBaseAsyncOperation> getObjectType() {
return HBaseAsyncOperation.class;
}

@Override
public boolean isSingleton() {
return FactoryBean.super.isSingleton();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ public class HBaseAsyncTemplate implements HBaseAsyncOperation {
private final LongAdder opsCount = new LongAdder();
private final LongAdder opsRejectCount = new LongAdder();

public HBaseAsyncTemplate(Configuration conf, int perRegionServerBufferQueueSize) throws IOException {
this.hTableMultiplexer = new HTableMultiplexer(conf, perRegionServerBufferQueueSize);
}

public HBaseAsyncTemplate(Connection connection, Configuration conf, int perRegionServerBufferQueueSize) throws IOException {
this.hTableMultiplexer = new HTableMultiplexer(connection, conf, perRegionServerBufferQueueSize);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.navercorp.pinpoint.common.hbase.batch;

import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;

import java.util.Objects;

public class AsyncTemplateWriter implements SimpleBatchWriter {
private final HbaseOperations2 hbaseOperations2;

public AsyncTemplateWriter(HbaseOperations2 hbaseOperations2) {
this.hbaseOperations2 = Objects.requireNonNull(hbaseOperations2, "hbaseOperations2");

}

@Override
public boolean write(TableName tableName, Put mutation) {
return hbaseOperations2.asyncPut(tableName, mutation);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.navercorp.pinpoint.common.hbase.batch;

import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Qualifier;

import java.util.Objects;

public class SimpleBatchWriterFactoryBean implements FactoryBean<SimpleBatchWriter> {

private final SimpleBatchWriter batchWriter;

public SimpleBatchWriterFactoryBean(BufferedMutatorConfiguration configuration,
HbaseBatchWriter hbaseBatchWriter,
@Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate) {
if (configuration.isBatchWriter()) {
this.batchWriter = new SimpleBufferWriter(hbaseBatchWriter);
} else {
this.batchWriter = new AsyncTemplateWriter(hbaseTemplate);
}
}

@Override
public SimpleBatchWriter getObject() throws Exception {
return batchWriter;
}

@Override
public Class<?> getObjectType() {
return SimpleBatchWriter.class;
}

@Override
public boolean isSingleton() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.navercorp.pinpoint.common.hbase.batch;

import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;

import java.util.Objects;

public class SimpleBufferWriter implements SimpleBatchWriter {
private final HbaseBatchWriter batchWriter;

public SimpleBufferWriter(HbaseBatchWriter batchWriter) {
this.batchWriter = Objects.requireNonNull(batchWriter, "batchWriter");
}

@Override
public boolean write(TableName tableName, Put mutation) {
return batchWriter.write(tableName, mutation);
}
}

0 comments on commit c0103fe

Please sign in to comment.