Skip to content

Commit

Permalink
HBASE-24757 : ReplicationSink should limit row count in batch mutatio…
Browse files Browse the repository at this point in the history
…n based on hbase.rpc.rows.warning.threshold

Closes #2127

Signed-off-by: stack <stack@apache.org>
  • Loading branch information
virajjasani committed Jul 24, 2020
1 parent 0b85729 commit 09e7ccd
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 41 deletions.
10 changes: 10 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,16 @@ public enum OperationStatusCode {
"hbase.regionserver.slowlog.systable.enabled";
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;

/**
* Number of rows in a batch operation above which a warning will be logged.
*/
public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";

/**
* Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
*/
public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;

private HConstants() {
// Can't be instantiated with this ctor.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;

/**
* Number of rows in a batch operation above which a warning will be logged.
*/
static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
/**
* Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;

/*
* Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
Expand Down Expand Up @@ -1265,7 +1256,8 @@ public RSRpcServices(final HRegionServer rs) throws IOException {
final Configuration conf = rs.getConfiguration();
this.ld = ld;
regionServer = rs;
rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
rowSizeWarnThreshold = conf.getInt(
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
rejectRowsWithSizeOverThreshold =
conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server);
this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -37,7 +38,6 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
Expand All @@ -54,6 +54,7 @@
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,15 +94,21 @@ public class ReplicationSink {
private SourceFSConfigurationProvider provider;
private WALEntrySinkFilter walEntrySinkFilter;

/**
* Row size threshold for multi requests above which a warning is logged
*/
private final int rowSizeWarnThreshold;

/**
* Create a sink for replication
* @param conf conf object
* @param stopper boolean to tell this thread to stop
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(Configuration conf, Stoppable stopper)
public ReplicationSink(Configuration conf)
throws IOException {
this.conf = HBaseConfiguration.create(conf);
rowSizeWarnThreshold = conf.getInt(
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
decorateConf();
this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter();
Expand Down Expand Up @@ -211,11 +218,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
} else {
Expand Down Expand Up @@ -250,7 +253,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
if (!rowMap.isEmpty()) {
LOG.debug("Started replicating mutations.");
for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
batch(entry.getKey(), entry.getValue().values());
batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
}
LOG.debug("Finished replicating mutations.");
}
Expand Down Expand Up @@ -366,16 +369,8 @@ private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
*/
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1,
K2 key2, V value) {
Map<K2,List<V>> innerMap = map.get(key1);
if (innerMap == null) {
innerMap = new HashMap<>();
map.put(key1, innerMap);
}
List<V> values = innerMap.get(key2);
if (values == null) {
values = new ArrayList<>();
innerMap.put(key2, values);
}
Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
values.add(value);
return values;
}
Expand Down Expand Up @@ -403,13 +398,24 @@ public void stopReplicationSinkServices() {
* Do the changes and handle the pool
* @param tableName table to insert into
* @param allRows list of actions
* @param batchRowSizeThreshold rowSize threshold for batch mutation
*/
private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
throws IOException {
if (allRows.isEmpty()) {
return;
}
AsyncTable<?> table = getConnection().getTable(tableName);
List<Future<?>> futures = allRows.stream().map(table::batchAll).collect(Collectors.toList());
List<Future<?>> futures = new ArrayList<>();
for (List<Row> rows : allRows) {
List<List<Row>> batchRows;
if (rows.size() > batchRowSizeThreshold) {
batchRows = Lists.partition(rows, batchRowSizeThreshold);
} else {
batchRows = Collections.singletonList(rows);
}
futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
}
for (Future<?> future : futures) {
try {
FutureUtils.get(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand Down Expand Up @@ -80,8 +81,8 @@ public void setupTest() throws Exception {
final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = new HBaseTestingUtility();
CONF = TEST_UTIL.getConfiguration();
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.security.SecureRandom;
import java.util.ArrayList;
Expand Down Expand Up @@ -55,7 +54,7 @@
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand All @@ -78,7 +77,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;

@Category({ReplicationTests.class, MediumTests.class})
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink {

@ClassRule
Expand Down Expand Up @@ -127,9 +126,8 @@ public void stop(String why) {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());

TEST_UTIL.startMiniCluster(3);
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
Expand Down Expand Up @@ -202,6 +200,40 @@ public void testMixedPutDelete() throws Exception {
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
}

@Test
public void testLargeEditsPutDelete() throws Exception {
List<WALEntry> entries = new ArrayList<>();
List<Cell> cells = new ArrayList<>();
for (int i = 0; i < 5510; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);

ResultScanner resultScanner = table1.getScanner(new Scan());
int totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5510, totalRows);

entries = new ArrayList<>();
cells = new ArrayList<>();
for (int i = 0; i < 11000; i++) {
entries.add(
createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);
resultScanner = table1.getScanner(new Scan());
totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5500, totalRows);
}

/**
* Insert to 2 different tables
* @throws Exception
Expand All @@ -220,7 +252,11 @@ public void testMixedPutTables() throws Exception {
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
assertEquals(0, Bytes.toInt(res.getRow()) % 2);
}
scanRes = table1.getScanner(scan);
for(Result res : scanRes) {
assertEquals(1, Bytes.toInt(res.getRow()) % 2);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testWALEntryFilter() throws IOException {
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL,
DevNullAsyncClusterConnection.class, AsyncClusterConnection.class);
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
ReplicationSink sink = new ReplicationSink(conf);
// Create some dumb walentries.
List<AdminProtos.WALEntry> entries = new ArrayList<>();
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
Expand Down

0 comments on commit 09e7ccd

Please sign in to comment.