Skip to content

Commit 9cf7fd9

Browse files
committed
HBASE-29359 VerifyReplication needs to join on its verification tasks
1 parent c4d8b00 commit 9cf7fd9

File tree

2 files changed

+39
-41
lines changed

2 files changed

+39
-41
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ public enum Counters {
156156
private int sleepMsBeforeReCompare;
157157
private String delimiter = "";
158158
private boolean verbose = false;
159-
private int batch = -1;
160159

161160
/**
162161
* Map method that compares every scanned row with the equivalent from a distant cluster.
@@ -178,7 +177,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
178177
}
179178
delimiter = conf.get(NAME + ".delimiter", "");
180179
verbose = conf.getBoolean(NAME + ".verbose", false);
181-
batch = conf.getInt(NAME + ".batch", -1);
180+
int batch = conf.getInt(NAME + ".batch", -1);
182181
final Scan scan = new Scan();
183182
if (batch > 0) {
184183
scan.setBatch(batch);
@@ -200,7 +199,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
200199
setRowPrefixFilter(scan, rowPrefixes);
201200
scan.setTimeRange(startTime, endTime);
202201
int versions = conf.getInt(NAME + ".versions", -1);
203-
LOG.info("Setting number of version inside map as: " + versions);
202+
LOG.info("Setting number of version inside map as: {}", versions);
204203
if (versions >= 0) {
205204
scan.readVersions(versions);
206205
}
@@ -245,9 +244,9 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
245244
String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
246245
FileSystem.setDefaultUri(peerConf, peerFSAddress);
247246
CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
248-
LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
249-
+ peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf)
250-
+ " peerFSAddress:" + peerFSAddress);
247+
LOG.info("Using peer snapshot:{} with temp dir:{} peer root uri:{} peerFSAddress:{}",
248+
peerSnapshotName, peerSnapshotTmpDir, CommonFSUtils.getRootDir(peerConf),
249+
peerFSAddress);
251250

252251
replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf),
253252
new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
@@ -269,8 +268,8 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
269268
Result.compareResults(value, currentCompareRowInPeerTable, false);
270269
context.getCounter(Counters.GOODROWS).increment(1);
271270
if (verbose) {
272-
LOG.info(
273-
"Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
271+
LOG.info("Good row key: {}",
272+
delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
274273
}
275274
} catch (Exception e) {
276275
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value,
@@ -291,7 +290,6 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
291290
}
292291
}
293292

294-
@SuppressWarnings("FutureReturnValueIgnored")
295293
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row,
296294
Result replicatedRow) {
297295
byte[] rowKey = getRow(row, replicatedRow);
@@ -311,7 +309,11 @@ private void logFailRowAndIncreaseCounter(Context context, Counters counter, Res
311309
return;
312310
}
313311

314-
reCompareExecutor.submit(runnable);
312+
try {
313+
reCompareExecutor.submit(runnable).get();
314+
} catch (Exception e) {
315+
throw new RuntimeException(e);
316+
}
315317
}
316318

317319
@Override
@@ -389,18 +391,16 @@ protected void cleanup(Context context) {
389391

390392
private static Pair<ReplicationPeerConfig, Configuration>
391393
getPeerQuorumConfig(final Configuration conf, String peerId) throws IOException {
392-
ZKWatcher localZKW = null;
393-
try {
394-
localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
395-
@Override
396-
public void abort(String why, Throwable e) {
397-
}
394+
try (ZKWatcher localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
395+
@Override
396+
public void abort(String why, Throwable e) {
397+
}
398398

399-
@Override
400-
public boolean isAborted() {
401-
return false;
402-
}
403-
});
399+
@Override
400+
public boolean isAborted() {
401+
return false;
402+
}
403+
})) {
404404
ReplicationPeerStorage storage =
405405
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf);
406406
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
@@ -409,10 +409,6 @@ public boolean isAborted() {
409409
} catch (ReplicationException e) {
410410
throw new IOException("An error occurred while trying to connect to the remote peer cluster",
411411
e);
412-
} finally {
413-
if (localZKW != null) {
414-
localZKW.close();
415-
}
416412
}
417413
}
418414

@@ -471,25 +467,25 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
471467
peerConfigPair = getPeerQuorumConfig(conf, peerId);
472468
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
473469
peerQuorumAddress = peerConfig.getClusterKey();
474-
LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: "
475-
+ peerConfig.getConfiguration());
470+
LOG.info("Peer Quorum Address: {}, Peer Configuration: {}", peerQuorumAddress,
471+
peerConfig.getConfiguration());
476472
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
477473
HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
478474
peerConfig.getConfiguration().entrySet());
479475
} else {
480476
assert this.peerQuorumAddress != null;
481477
peerQuorumAddress = this.peerQuorumAddress;
482-
LOG.info("Peer Quorum Address: " + peerQuorumAddress);
478+
LOG.info("Peer Quorum Address: {}", peerQuorumAddress);
483479
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
484480
}
485481

486482
if (peerTableName != null) {
487-
LOG.info("Peer Table Name: " + peerTableName);
483+
LOG.info("Peer Table Name: {}", peerTableName);
488484
conf.set(NAME + ".peerTableName", peerTableName);
489485
}
490486

491487
conf.setInt(NAME + ".versions", versions);
492-
LOG.info("Number of version: " + versions);
488+
LOG.info("Number of version: {}", versions);
493489

494490
conf.setInt(NAME + ".recompareTries", reCompareTries);
495491
conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent);
@@ -524,7 +520,7 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
524520
}
525521
if (versions >= 0) {
526522
scan.readVersions(versions);
527-
LOG.info("Number of versions set to " + versions);
523+
LOG.info("Number of versions set to {}", versions);
528524
}
529525
if (families != null) {
530526
String[] fams = families.split(",");
@@ -537,8 +533,8 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
537533

538534
if (sourceSnapshotName != null) {
539535
Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
540-
LOG.info(
541-
"Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
536+
LOG.info("Using source snapshot-{} with temp dir:{}", sourceSnapshotName,
537+
sourceSnapshotTmpDir);
542538
TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
543539
null, job, true, snapshotTempPath);
544540
restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
@@ -819,7 +815,7 @@ private boolean isPeerQuorumAddress(String cmd) {
819815
* @param errorMsg Error message. Can be null.
820816
*/
821817
private static void printUsage(final String errorMsg) {
822-
if (errorMsg != null && errorMsg.length() > 0) {
818+
if (errorMsg != null && !errorMsg.isEmpty()) {
823819
System.err.println("ERROR: " + errorMsg);
824820
}
825821
System.err.println("Usage: verifyrep [--starttime=X]"
@@ -914,7 +910,8 @@ private static void printUsage(final String errorMsg) {
914910
+ "2181:/cluster-b \\\n" + " TestTable");
915911
}
916912

917-
private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) {
913+
private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads,
914+
Mapper<?, ?, ?, ?>.Context context) {
918915
if (maxThreads == 0) {
919916
return null;
920917
}
@@ -923,7 +920,7 @@ private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.
923920
buildRejectedReComparePolicy(context));
924921
}
925922

926-
private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) {
923+
private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper<?, ?, ?, ?>.Context context) {
927924
return new CallerRunsPolicy() {
928925
@Override
929926
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
@@ -938,9 +935,10 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
938935
@Override
939936
public int run(String[] args) throws Exception {
940937
Configuration conf = this.getConf();
941-
Job job = createSubmittableJob(conf, args);
942-
if (job != null) {
943-
return job.waitForCompletion(true) ? 0 : 1;
938+
try (Job job = createSubmittableJob(conf, args)) {
939+
if (job != null) {
940+
return job.waitForCompletion(true) ? 0 : 1;
941+
}
944942
}
945943
return 1;
946944
}

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class VerifyReplicationRecompareRunnable implements Runnable {
3434
private static final Logger LOG =
3535
LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class);
3636

37-
private final Mapper.Context context;
37+
private final Mapper<?, ?, ?, ?>.Context context;
3838
private final VerifyReplication.Verifier.Counters originalCounter;
3939
private final String delimiter;
4040
private final byte[] row;
@@ -50,7 +50,7 @@ public class VerifyReplicationRecompareRunnable implements Runnable {
5050
private Result sourceResult;
5151
private Result replicatedResult;
5252

53-
public VerifyReplicationRecompareRunnable(Mapper.Context context, Result sourceResult,
53+
public VerifyReplicationRecompareRunnable(Mapper<?, ?, ?, ?>.Context context, Result sourceResult,
5454
Result replicatedResult, VerifyReplication.Verifier.Counters originalCounter, String delimiter,
5555
Scan tableScan, Table sourceTable, Table replicatedTable, int reCompareTries,
5656
int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) {

0 commit comments

Comments
 (0)