From c0103fe57598e8ac25151bc8e7967338e014d666 Mon Sep 17 00:00:00 2001 From: emeroad Date: Tue, 8 Mar 2022 20:05:58 +0900 Subject: [PATCH] [#8691] Separate hbase connection for batch operation --- .../applicationContext-collector-hbase.xml | 22 ++++++++-- .../hbase/HBaseAsyncOperationFactory.java | 39 +++++++++--------- .../common/hbase/HBaseAsyncTemplate.java | 4 -- .../hbase/batch/AsyncTemplateWriter.java | 22 ++++++++++ .../batch/DelegateSimpleBatchWriter.java | 34 ---------------- .../batch/SimpleBatchWriterFactoryBean.java | 40 +++++++++++++++++++ .../hbase/batch/SimpleBufferWriter.java | 20 ++++++++++ 7 files changed, 121 insertions(+), 60 deletions(-) create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/AsyncTemplateWriter.java delete mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/DelegateSimpleBatchWriter.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBatchWriterFactoryBean.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBufferWriter.java diff --git a/collector/src/main/resources/applicationContext-collector-hbase.xml b/collector/src/main/resources/applicationContext-collector-hbase.xml index 2b815e741f824..83db63bdaaff1 100644 --- a/collector/src/main/resources/applicationContext-collector-hbase.xml +++ b/collector/src/main/resources/applicationContext-collector-hbase.xml @@ -63,14 +63,30 @@ + + + + + + + + + + + + + + + + - - + + @@ -93,7 +109,7 @@ - + diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseAsyncOperationFactory.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseAsyncOperationFactory.java index 4d3bdf61e85aa..463b001653393 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseAsyncOperationFactory.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseAsyncOperationFactory.java @@ -20,13 +20,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HTableMultiplexer; +import org.springframework.beans.factory.FactoryBean; -import java.io.IOException; +import java.util.Objects; /** * @author Taejin Koo */ -public class HBaseAsyncOperationFactory { +public class HBaseAsyncOperationFactory implements FactoryBean { public static final String ENABLE_ASYNC_METHOD = "hbase.client.async.enable"; public static final boolean DEFAULT_ENABLE_ASYNC_METHOD = false; @@ -40,26 +41,16 @@ public class HBaseAsyncOperationFactory { public static final String ASYNC_MAX_RETRIES_IN_QUEUE = HTableMultiplexer.TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE; public static final int DEFAULT_ASYNC_RETRY_COUNT = 10000; - public static HBaseAsyncOperation create(Configuration configuration) throws IOException { - boolean enableAsyncMethod = configuration.getBoolean(ENABLE_ASYNC_METHOD, DEFAULT_ENABLE_ASYNC_METHOD); - if (!enableAsyncMethod) { - return DisabledHBaseAsyncOperation.INSTANCE; - } - - int queueSize = configuration.getInt(ASYNC_IN_QUEUE_SIZE, DEFAULT_ASYNC_IN_QUEUE_SIZE); - - if (configuration.get(ASYNC_PERIODIC_FLUSH_TIME, null) == null) { - configuration.setInt(ASYNC_PERIODIC_FLUSH_TIME, DEFAULT_ASYNC_PERIODIC_FLUSH_TIME); - } - - if (configuration.get(ASYNC_MAX_RETRIES_IN_QUEUE, null) == null) { - configuration.setInt(ASYNC_MAX_RETRIES_IN_QUEUE, DEFAULT_ASYNC_RETRY_COUNT); - } + private final Connection connection; + private final Configuration configuration; - return new HBaseAsyncTemplate(configuration, queueSize); + public HBaseAsyncOperationFactory(Connection connection, Configuration configuration) { + this.connection = Objects.requireNonNull(connection, "connection"); + this.configuration = Objects.requireNonNull(configuration, "configuration"); } - public static HBaseAsyncOperation create(Connection connection, Configuration configuration) throws IOException { + @Override + public HBaseAsyncOperation getObject() throws Exception { boolean enableAsyncMethod = configuration.getBoolean(ENABLE_ASYNC_METHOD, DEFAULT_ENABLE_ASYNC_METHOD); if (!enableAsyncMethod) { return DisabledHBaseAsyncOperation.INSTANCE; @@ -78,4 +69,14 @@ public static HBaseAsyncOperation create(Connection connection, Configuration co return new HBaseAsyncTemplate(connection, configuration, queueSize); } + + @Override + public Class getObjectType() { + return HBaseAsyncOperation.class; + } + + @Override + public boolean isSingleton() { + return FactoryBean.super.isSingleton(); + } } \ No newline at end of file diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseAsyncTemplate.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseAsyncTemplate.java index 2611f42ed4737..152bb75c6da30 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseAsyncTemplate.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseAsyncTemplate.java @@ -38,10 +38,6 @@ public class HBaseAsyncTemplate implements HBaseAsyncOperation { private final LongAdder opsCount = new LongAdder(); private final LongAdder opsRejectCount = new LongAdder(); - public HBaseAsyncTemplate(Configuration conf, int perRegionServerBufferQueueSize) throws IOException { - this.hTableMultiplexer = new HTableMultiplexer(conf, perRegionServerBufferQueueSize); - } - public HBaseAsyncTemplate(Connection connection, Configuration conf, int perRegionServerBufferQueueSize) throws IOException { this.hTableMultiplexer = new HTableMultiplexer(connection, conf, perRegionServerBufferQueueSize); } diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/AsyncTemplateWriter.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/AsyncTemplateWriter.java new file mode 100644 index 0000000000000..6690f80134f70 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/AsyncTemplateWriter.java @@ -0,0 +1,22 @@ +package com.navercorp.pinpoint.common.hbase.batch; + +import com.navercorp.pinpoint.common.hbase.HbaseOperations2; +import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; + +import java.util.Objects; + +public class AsyncTemplateWriter implements SimpleBatchWriter { + private final HbaseOperations2 hbaseOperations2; + + public AsyncTemplateWriter(HbaseOperations2 hbaseOperations2) { + this.hbaseOperations2 = Objects.requireNonNull(hbaseOperations2, "hbaseOperations2"); + + } + + @Override + public boolean write(TableName tableName, Put mutation) { + return hbaseOperations2.asyncPut(tableName, mutation); + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/DelegateSimpleBatchWriter.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/DelegateSimpleBatchWriter.java deleted file mode 100644 index 786cdf48b6b75..0000000000000 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/DelegateSimpleBatchWriter.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.navercorp.pinpoint.common.hbase.batch; - -import com.navercorp.pinpoint.common.hbase.HbaseOperations2; -import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.springframework.beans.factory.annotation.Qualifier; - -import java.util.Objects; - -public class DelegateSimpleBatchWriter implements SimpleBatchWriter { - private final boolean batchWriter; - - private final HbaseBatchWriter hbaseBatchWriter; - private final HbaseOperations2 hbaseTemplate; - - public DelegateSimpleBatchWriter(BufferedMutatorConfiguration configuration, - HbaseBatchWriter hbaseBatchWriter, - @Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate) { - this.batchWriter = configuration.isBatchWriter(); - this.hbaseBatchWriter = hbaseBatchWriter; - this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); - } - - - @Override - public boolean write(TableName tableName, Put put) { - if (batchWriter) { - return hbaseBatchWriter.write(tableName, put); - } else { - return hbaseTemplate.asyncPut(tableName, put); - } - } -} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBatchWriterFactoryBean.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBatchWriterFactoryBean.java new file mode 100644 index 0000000000000..486276986cd4c --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBatchWriterFactoryBean.java @@ -0,0 +1,40 @@ +package com.navercorp.pinpoint.common.hbase.batch; + +import com.navercorp.pinpoint.common.hbase.HbaseOperations2; +import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.annotation.Qualifier; + +import java.util.Objects; + +public class SimpleBatchWriterFactoryBean implements FactoryBean { + + private final SimpleBatchWriter batchWriter; + + public SimpleBatchWriterFactoryBean(BufferedMutatorConfiguration configuration, + HbaseBatchWriter hbaseBatchWriter, + @Qualifier("asyncPutHbaseTemplate") HbaseOperations2 hbaseTemplate) { + if (configuration.isBatchWriter()) { + this.batchWriter = new SimpleBufferWriter(hbaseBatchWriter); + } else { + this.batchWriter = new AsyncTemplateWriter(hbaseTemplate); + } + } + + @Override + public SimpleBatchWriter getObject() throws Exception { + return batchWriter; + } + + @Override + public Class getObjectType() { + return SimpleBatchWriter.class; + } + + @Override + public boolean isSingleton() { + return true; + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBufferWriter.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBufferWriter.java new file mode 100644 index 0000000000000..19183ff47be20 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/batch/SimpleBufferWriter.java @@ -0,0 +1,20 @@ +package com.navercorp.pinpoint.common.hbase.batch; + +import com.navercorp.pinpoint.common.hbase.SimpleBatchWriter; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; + +import java.util.Objects; + +public class SimpleBufferWriter implements SimpleBatchWriter { + private final HbaseBatchWriter batchWriter; + + public SimpleBufferWriter(HbaseBatchWriter batchWriter) { + this.batchWriter = Objects.requireNonNull(batchWriter, "batchWriter"); + } + + @Override + public boolean write(TableName tableName, Put mutation) { + return batchWriter.write(tableName, mutation); + } +}