Skip to content

Commit

Permalink
HBASE-18845 TestReplicationSmallTests fails after HBASE-14004
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Sep 29, 2017
1 parent afce850 commit 239e687
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 61 deletions.
Expand Up @@ -26,7 +26,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
Expand All @@ -39,13 +38,13 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
Expand All @@ -57,10 +56,14 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
Expand All @@ -73,8 +76,8 @@
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -162,7 +165,7 @@ public void testDeleteTypes() throws Exception {
htable1.put(put);

Get get = new Get(row);
get.setMaxVersions();
get.readAllVersions();
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
Expand All @@ -184,7 +187,7 @@ public void testDeleteTypes() throws Exception {
htable1.delete(d);

get = new Get(row);
get.setMaxVersions();
get.readAllVersions();
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
Expand Down Expand Up @@ -327,7 +330,7 @@ private void loadData(String prefix, byte[] row) throws IOException {
public void testDisableEnable() throws Exception {

// Test disabling replication
admin.disablePeer(PEER_ID);
hbaseAdmin.disableReplicationPeer(PEER_ID);

byte[] rowkey = Bytes.toBytes("disable enable");
Put put = new Put(rowkey);
Expand All @@ -346,7 +349,7 @@ public void testDisableEnable() throws Exception {
}

// Test enable replication
admin.enablePeer(PEER_ID);
hbaseAdmin.enableReplicationPeer(PEER_ID);

for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
Expand All @@ -370,7 +373,7 @@ public void testDisableEnable() throws Exception {
@Test(timeout=300000)
public void testAddAndRemoveClusters() throws Exception {
LOG.info("testAddAndRemoveClusters");
admin.removePeer(PEER_ID);
hbaseAdmin.removeReplicationPeer(PEER_ID);
Thread.sleep(SLEEP_TIME);
byte[] rowKey = Bytes.toBytes("Won't be replicated");
Put put = new Put(rowKey);
Expand All @@ -392,7 +395,7 @@ public void testAddAndRemoveClusters() throws Exception {
}
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
admin.addPeer(PEER_ID, rpc, null);
hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
Thread.sleep(SLEEP_TIME);
rowKey = Bytes.toBytes("do rep");
put = new Put(rowKey);
Expand Down Expand Up @@ -525,13 +528,11 @@ public void testVerifyRepJobWithRawOptions() throws Exception {
Table lHtable2 = null;

try {
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(familyname);
fam.setMaxVersions(100);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
.setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(fam).build();
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (HColumnDescriptor f : table.getColumnFamilies()) {
for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
scopes.put(f.getName(), f.getScope());
}

Expand Down Expand Up @@ -631,7 +632,7 @@ public void testHBase14905() throws Exception {
htable1.put(put);

Scan scan = new Scan();
scan.setMaxVersions(100);
scan.readVersions(100);
ResultScanner scanner1 = htable1.getScanner(scan);
Result[] res1 = scanner1.next(1);
scanner1.close();
Expand All @@ -641,7 +642,7 @@ public void testHBase14905() throws Exception {

for (int i = 0; i < NB_RETRIES; i++) {
scan = new Scan();
scan.setMaxVersions(100);
scan.readVersions(100);
scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(1);
scanner1.close();
Expand All @@ -668,7 +669,7 @@ public void testHBase14905() throws Exception {
htable2.put(put);

scan = new Scan();
scan.setMaxVersions(100);
scan.readVersions(100);
scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(NB_ROWS_IN_BATCH);
scanner1.close();
Expand All @@ -695,7 +696,7 @@ public void testVersionMismatchHBase14905() throws Exception {
htable1.put(put);

Scan scan = new Scan();
scan.setMaxVersions(100);
scan.readVersions(100);
ResultScanner scanner1 = htable1.getScanner(scan);
Result[] res1 = scanner1.next(1);
scanner1.close();
Expand All @@ -705,7 +706,7 @@ public void testVersionMismatchHBase14905() throws Exception {

for (int i = 0; i < NB_RETRIES; i++) {
scan = new Scan();
scan.setMaxVersions(100);
scan.readVersions(100);
scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(1);
scanner1.close();
Expand All @@ -728,13 +729,13 @@ public void testVersionMismatchHBase14905() throws Exception {

try {
// Disabling replication and modifying the particular version of the cell to validate the feature.
admin.disablePeer(PEER_ID);
hbaseAdmin.disableReplicationPeer(PEER_ID);
Put put2 = new Put(Bytes.toBytes("r1"));
put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
htable2.put(put2);

scan = new Scan();
scan.setMaxVersions(100);
scan.readVersions(100);
scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(NB_ROWS_IN_BATCH);
scanner1.close();
Expand All @@ -745,7 +746,7 @@ public void testVersionMismatchHBase14905() throws Exception {
runVerifyReplication(args, 0, 1);
}
finally {
admin.enablePeer(PEER_ID);
hbaseAdmin.enableReplicationPeer(PEER_ID);
}
}

Expand Down Expand Up @@ -786,21 +787,20 @@ public void testVerifyListReplicatedTable() throws Exception {

// Create Tables
for (int i = 0; i < numOfTables; i++) {
HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
HColumnDescriptor cfd = new HColumnDescriptor(colFam);
cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
ht.addFamily(cfd);
hadmin.createTable(ht);
hadmin.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(tName + i))
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(colFam))
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build());
}

// verify the result
List<HashMap<String, String>> replicationColFams = admin.listReplicated();
List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs();
int[] match = new int[numOfTables]; // array of 3 with init value of zero

for (int i = 0; i < replicationColFams.size(); i++) {
HashMap<String, String> replicationEntry = replicationColFams.get(i);
String tn = replicationEntry.get(ReplicationAdmin.TNAME);
if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
TableCFs replicationEntry = replicationColFams.get(i);
String tn = replicationEntry.getTable().getNameAsString();
if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) {
int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
match[m]++; // should only increase once
}
Expand Down Expand Up @@ -831,7 +831,7 @@ public void testReplicationInReplay() throws Exception {
HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
RegionInfo hri = region.getRegionInfo();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : htable1.getTableDescriptor().getFamiliesKeys()) {
for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
scopes.put(fam, 1);
}
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
Expand Down Expand Up @@ -918,14 +918,14 @@ public void testVerifyReplicationWithSnapshotSupport() throws Exception {
Path rootDir = FSUtils.getRootDir(conf1);
FileSystem fs = rootDir.getFileSystem(conf1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
new String(famName), sourceSnapshotName, rootDir, fs, true);

// Take target snapshot
Path peerRootDir = FSUtils.getRootDir(conf2);
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
new String(famName), peerSnapshotName, peerRootDir, peerFs, true);

String peerFSAddress = peerFs.getUri().toString();
Expand Down Expand Up @@ -963,11 +963,11 @@ public void testVerifyReplicationWithSnapshotSupport() throws Exception {
htable2.delete(delete);

sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
new String(famName), sourceSnapshotName, rootDir, fs, true);

peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
new String(famName), peerSnapshotName, peerRootDir, peerFs, true);

args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
Expand Down Expand Up @@ -1006,27 +1006,23 @@ public void testEmptyWALRecovery() throws Exception {
emptyWalPaths.add(emptyWalPath);
}

// inject our empty wal into the replication queue
// inject our empty wal into the replication queue, and then roll the original wal, which
// enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
// determine if the file being replicated currently is still opened for write, so just inject a
// new wal to the replication queue does not mean the previous file is closed.
for (int i = 0; i < numRs; i++) {
Replication replicationService =
(Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
Replication replicationService = (Replication) hrs.getReplicationSourceService();
replicationService.preLogRoll(null, emptyWalPaths.get(i));
replicationService.postLogRoll(null, emptyWalPaths.get(i));
}

// wait for ReplicationSource to start reading from our empty wal
waitForLogAdvance(numRs, emptyWalPaths, false);

// roll the original wal, which enqueues a new wal behind our empty wal
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
WAL wal = hrs.getWAL(regionInfo);
wal.rollWriter(true);
}

// ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs, emptyWalPaths, true);
waitForLogAdvance(numRs);

// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
Expand All @@ -1035,26 +1031,25 @@ public void testEmptyWALRecovery() throws Exception {
}

/**
* Waits for the ReplicationSource to start reading from the given paths
* Waits until there is only one log(the current writing one) in the replication queue
* @param numRs number of regionservers
* @param emptyWalPaths path for each regionserver
* @param invert if true, waits until ReplicationSource is NOT reading from the given paths
*/
private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
final boolean invert) throws Exception {
private void waitForLogAdvance(int numRs) throws Exception {
Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
Replication replicationService = (Replication) utility1.getHBaseCluster()
.getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
return false;
}
if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
if (!currentFile.equals(source.getCurrentPath())) {
return false;
}
}
Expand Down
Expand Up @@ -63,7 +63,7 @@ public class TestReplicationBase {
protected static ZooKeeperWatcher zkw2;

protected static ReplicationAdmin admin;
private static Admin hbaseAdmin;
protected static Admin hbaseAdmin;

protected static Table htable1;
protected static Table htable2;
Expand Down

0 comments on commit 239e687

Please sign in to comment.