Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get … #699

Merged
merged 1 commit into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2570,16 +2570,19 @@ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableN
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
storeFilesSize, bulkloadSeqId, null);
storeFilesSize, bulkloadSeqId, null, true);
}

public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
Map<String, Long> storeFilesSize, long bulkloadSeqId,
List<String> clusterIds, boolean replicate) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName)
.setBulkloadSeqNum(bulkloadSeqId)
.setReplicate(replicate);
if(clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
false, null);
false, null, true);
}

/**
Expand All @@ -585,7 +585,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles,
List<String> clusterIds) {
List<String> clusterIds, boolean replicate) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);

Expand Down Expand Up @@ -626,6 +626,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
if (clusterIds != null) {
request.addAllClusterIds(clusterIds);
}
request.setReplicate(replicate);
return request.build();
}

Expand Down
1 change: 1 addition & 0 deletions hbase-protocol-shaded/src/main/protobuf/Client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
repeated string cluster_ids = 7;
optional bool replicate = 8 [default = true];

message FamilyPath {
required bytes family = 1;
Expand Down
1 change: 1 addition & 0 deletions hbase-protocol-shaded/src/main/protobuf/WAL.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ message BulkLoadDescriptor {
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
repeated string cluster_ids = 5;
optional bool replicate = 6 [default = true];
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,18 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
* Defined as default here to avoid breaking callers who rely on the bulkLoad version that does
* not expect additional clusterIds param.
* @param tableName the target table
* @param familyPaths hdfs path for the the table family dirs containg files to be loaded
* @param row row key
* @param assignSeqNum seq num for the event on WAL
* @param userToken user token
* @param bulkToken bulk load token
* @param copyFiles flag for copying the loaded hfiles
* @param familyPaths hdfs path for the the table family dirs containg files to be loaded.
* @param row row key.
* @param assignSeqNum seq num for the event on WAL.
* @param userToken user token.
* @param bulkToken bulk load token.
* @param copyFiles flag for copying the loaded hfiles.
* @param clusterIds list of cluster ids where the given bulk load has already been processed.
* @param replicate flags if the bulkload is targeted for replication.
*/
CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles,
List<String> clusterIds);
List<String> clusterIds, boolean replicate);

/**
* Clean up after finishing bulk load, no matter success or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ public CompletableFuture<String> prepareBulkLoad(TableName tableName) {
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
String bulkToken, boolean copyFiles, List<String> clusterIds) {
String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
return callerFactory.<Boolean> single().table(tableName).row(row)
.action((controller, loc, stub) -> ConnectionUtils
.<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
null,
(rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
userToken, bulkToken, copyFiles, clusterIds),
userToken, bulkToken, copyFiles, clusterIds, replicate),
(s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
.call();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,9 @@ private void bulkloadRefFile(TableName tableName, Path bulkloadDirectory, String
throws IOException {
// bulkload the ref file
try {
BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory);
BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf);
bulkLoader.disableReplication();
bulkLoader.bulkLoad(tableName, bulkloadDirectory);
} catch (Exception e) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6146,7 +6146,8 @@ private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
null, true);
}

/**
Expand Down Expand Up @@ -6197,7 +6198,7 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener,
boolean copyFile, List<String> clusterIds) throws IOException {
boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
Expand Down Expand Up @@ -6372,7 +6373,7 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
storeFiles, storeFilesSizes, seqId, clusterIds);
storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public Map<byte[], List<Path>> run() {
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
clusterIds);
clusterIds, request.getReplicate());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,19 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
if(bulkLoadsPerClusters == null) {
bulkLoadsPerClusters = new HashMap<>();
}
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
if(bld.getReplicate()) {
if (bulkLoadsPerClusters == null) {
bulkLoadsPerClusters = new HashMap<>();
}
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public Path getFilePath() {
Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Map<byte[], List<Path>> family2Files)
throws TableNotFoundException, IOException;

/**
* Disable replication for this bulkload, if bulkload replication is configured.
*/
void disableReplication();
/**
* Perform a bulk load of the given directory into the given pre-existing table.
* @param tableName the table to load into
Expand All @@ -97,4 +101,6 @@ Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
static BulkLoadHFiles create(Configuration conf) {
return new BulkLoadHFilesTool(conf);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
private String bulkToken;

private List<String> clusterIds = new ArrayList<>();
private boolean replicate = true;

public BulkLoadHFilesTool(Configuration conf) {
// make a copy, just to be sure we're not overriding someone else's config
Expand Down Expand Up @@ -379,7 +380,8 @@ protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
.collect(Collectors.toList());
CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> {
fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
(loaded, error) -> {
if (error != null) {
LOG.error("Encountered unrecoverable error from region server", error);
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
Expand Down Expand Up @@ -1052,4 +1054,9 @@ public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
System.exit(ret);
}

@Override
public void disableReplication(){
this.replicate = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public CompletableFuture<String> prepareBulkLoad(TableName tableName) {
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
String bulkToken, boolean copyFiles, List<String> clusterIds) {
String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -42,6 +47,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand All @@ -54,14 +60,22 @@
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.mob.compactions.TestPartitionedMobCompactor;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
Expand Down Expand Up @@ -137,7 +151,9 @@ private static void startThirdCluster() throws Exception {
UTIL3.startMiniCluster(NUM_SLAVES1);

TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setMobEnabled(true)
.setMobThreshold(4000)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();

Expand Down Expand Up @@ -232,6 +248,23 @@ public void testBulkLoadReplicationActiveActive() throws Exception {
assertEquals(9, BULK_LOADS_COUNT.get());
}

@Test
public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
Path path = createMobFiles(UTIL3);
ColumnFamilyDescriptor descriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
PartitionedMobCompactor compactor = new PartitionedMobCompactor(UTIL3.getConfiguration(),
UTIL3.getTestFileSystem(), tableName, descriptor, Executors.newFixedThreadPool(1));
BULK_LOAD_LATCH = new CountDownLatch(1);
BULK_LOADS_COUNT.set(0);
compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true);
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
Thread.sleep(400);
assertEquals(1, BULK_LOADS_COUNT.get());

}


private void assertBulkLoadConditions(byte[] row, byte[] value,
HBaseTestingUtility utility, Table...tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(3);
Expand Down Expand Up @@ -292,6 +325,36 @@ private String createHFileForFamilies(byte[] row, byte[] value,
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}

private Path createMobFiles(HBaseTestingUtility util) throws IOException {
Path testDir = FSUtils.getRootDir(util.getConfiguration());
Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
MobFileName mobFileName = null;
byte[] mobFileStartRow = new byte[32];
for (byte rowKey : Bytes.toBytes("01234")) {
mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
UUID.randomUUID().toString().replaceAll("-", ""));
StoreFileWriter mobFileWriter =
new StoreFileWriter.Builder(util.getConfiguration(),
new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
.withFilePath(new Path(basePath, mobFileName.getFileName())).build();
long now = System.currentTimeMillis();
try {
for (int i = 0; i < 10; i++) {
byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
byte[] dummyData = new byte[5000];
new Random().nextBytes(dummyData);
mobFileWriter.append(
new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
}
} finally {
mobFileWriter.close();
}
}
return basePath;
}

public static class BulkReplicationTestObserver implements RegionCoprocessor {

String clusterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ private static <T> CompletableFuture<T> failedFuture(Throwable error) {
private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) {
AsyncClusterConnection errConn = spy(conn);
doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn)
.bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList());
.bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(),
anyBoolean());
return errConn;
}

Expand Down