Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when sp… #4748

Merged
merged 1 commit into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
Expand All @@ -64,6 +65,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
protected void customizeClusterConf(Configuration conf) {
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
conf.set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
Expand All @@ -82,19 +84,20 @@ public void testSyncUpTool() throws Exception {
setupReplication();

/**
* Prepare 16 random hfile ranges required for creating hfiles
* Prepare 24 random hfile ranges required for creating hfiles
*/
Set<String> randomHFileRanges = new HashSet<>(16);
for (int i = 0; i < 16; i++) {
Set<String> randomHFileRanges = new HashSet<>(24);
for (int i = 0; i < 24; i++) {
randomHFileRanges.add(HBaseTestingUtility.getRandomUUID().toString());
}
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
Collections.sort(randomHFileRangeList);
Iterator<String> randomHFileRangeListIterator = randomHFileRangeList.iterator();

/**
* at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
* into cf1, and 3 rows into norep verify correctly replicated to slave
* at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows
* into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3
* rows into norep verify correctly replicated to slave
*/
loadAndReplicateHFiles(true, randomHFileRangeListIterator);

Expand Down Expand Up @@ -175,23 +178,35 @@ private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
Iterator<String> randomHFileRangeListIterator) throws Exception {
LOG.debug("loadAndReplicateHFiles");

// Load 100 + 3 hfiles to t1_syncup.
// Load 50 + 50 + 3 hfiles to t1_syncup.
byte[][][] hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100);
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);

hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
hfileRanges, 50);

hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
hfileRanges, 3);

// Load 200 + 3 hfiles to t2_syncup.
// Load 100 + 100 + 3 hfiles to t2_syncup.
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);

hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200);
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
hfileRanges, 100);

hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Expand Down Expand Up @@ -229,6 +244,26 @@ private void loadAndValidateHFileReplication(String testName, byte[] row, byte[]
loader.bulkLoad(tableName, dir);
}

private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
Path dir = UTIL2.getDataTestDirOnTestFS(testName);
FileSystem fs = UTIL2.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(fam));

int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
}

final TableName tableName = source.getName();
BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
loader.bulkLoad(tableName, dir);
}

private void wait(Table target, int expectedCount, String msg)
throws IOException, InterruptedException {
for (int i = 0; i < NB_RETRIES; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.CacheEvictionStats;
Expand Down Expand Up @@ -2416,7 +2417,7 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
filePaths.add(familyPath.getPath());
}
// Check if the batch of files exceeds the current quota
sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
}
}

Expand Down Expand Up @@ -2521,6 +2522,15 @@ public CoprocessorServiceResponse execService(final RpcController controller,
}
}

private FileSystem getFileSystem(List<String> filePaths) throws IOException {
if (filePaths.isEmpty()) {
// local hdfs
return regionServer.getFileSystem();
}
// source hdfs
return new Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration());
}

private com.google.protobuf.Message execServiceOnRegion(HRegion region,
final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
// ignore the passed in controller (from the serialized call)
Expand Down