Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -501,124 +501,127 @@ public void testRollbackException() throws Exception {
followersInfo.add(Pair.of(followerEnvironment, new NodeInfo(nodeName, nodeHostPort, nodeDir)));
}

Pair<BDBEnvironment, NodeInfo> masterPair = findMaster(followersInfo);
String beginDbName = String.valueOf(0L);
Database masterDb = masterPair.first.openDatabase(beginDbName);
DatabaseEntry key = new DatabaseEntry(randomBytes());
DatabaseEntry value = new DatabaseEntry(randomBytes());
Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value));
Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size());
LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir);

for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterPair.second.dir)) {
LOG.info("skip {}", entryPair.second.name);
return;
}
try {
Pair<BDBEnvironment, NodeInfo> masterPair = findMaster(followersInfo);
String beginDbName = String.valueOf(0L);
Database masterDb = masterPair.first.openDatabase(beginDbName);
DatabaseEntry key = new DatabaseEntry(randomBytes());
DatabaseEntry value = new DatabaseEntry(randomBytes());
Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value));
Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size());
LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir);

Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size());
Database followerDb = entryPair.first.openDatabase(beginDbName);
DatabaseEntry readValue = new DatabaseEntry();
Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new String(readValue.getData()));
followerDb.close();
}
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterPair.second.dir)) {
LOG.info("skip {}", entryPair.second.name);
return;
}

masterDb.close();
masterEnvironment.getEpochDB().close();
Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size());
Database followerDb = entryPair.first.openDatabase(beginDbName);
DatabaseEntry readValue = new DatabaseEntry();
Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new String(readValue.getData()));
followerDb.close();
}

followersInfo.stream().forEach(entryPair -> {
entryPair.first.close();
LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir);
});
masterDb.close();
masterEnvironment.getEpochDB().close();

// all follower closed
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
String followerCopyDir = entryPair.second.dir + "_copy";
LOG.info("Copy from {} to {}", entryPair.second.dir, followerCopyDir);
FileUtils.copyDirectory(new File(entryPair.second.dir), new File(followerCopyDir));
}
followersInfo.stream().forEach(entryPair -> {
entryPair.first.close();
LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir);
});

followersInfo.stream().forEach(entryPair -> {
entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir));
LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir);
});
// all follower closed
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
String followerCopyDir = entryPair.second.dir + "_copy";
LOG.info("Copy from {} to {}", entryPair.second.dir, followerCopyDir);
FileUtils.copyDirectory(new File(entryPair.second.dir), new File(followerCopyDir));
}

masterPair = findMaster(followersInfo);
followersInfo.stream().forEach(entryPair -> {
entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir));
LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir);
});

masterDb = masterPair.first.openDatabase(String.valueOf(1L));
for (int i = 0; i < 2 * Config.txn_rollback_limit + 10; i++) {
// for (int i = 0; i < 10; i++) {
OperationStatus status = masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes()));
Assertions.assertEquals(OperationStatus.SUCCESS, status);
}
Assertions.assertEquals(2, masterPair.first.getDatabaseNames().size());
Assertions.assertEquals(0, masterPair.first.getDatabaseNames().get(0));
Assertions.assertEquals(1, masterPair.first.getDatabaseNames().get(1));
masterPair = findMaster(followersInfo);

followersInfo.stream().forEach(entryPair -> {
entryPair.first.close();
LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir);
});

// Restore follower's (not new master) bdbje dir
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterDir)) {
String masterCopyDir = entryPair.second.dir + "_copy";
FileUtils.deleteDirectory(new File(masterCopyDir));
continue;
masterDb = masterPair.first.openDatabase(String.valueOf(1L));
for (int i = 0; i < 2 * Config.txn_rollback_limit + 10; i++) {
// for (int i = 0; i < 10; i++) {
OperationStatus status = masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes()));
Assertions.assertEquals(OperationStatus.SUCCESS, status);
}
LOG.info("Delete followerDir {} ", entryPair.second.dir);
FileUtils.deleteDirectory(new File(entryPair.second.dir));
// FileUtils.moveDirectory(new File(entryPair.second.dir), new File(entryPair.second.dir + "_copy2"));
String followerCopyDir = entryPair.second.dir + "_copy";
LOG.info("Move {} to {}", followerCopyDir, entryPair.second.dir);
FileUtils.moveDirectory(new File(followerCopyDir), new File(entryPair.second.dir));
}
Assertions.assertEquals(2, masterPair.first.getDatabaseNames().size());
Assertions.assertEquals(0, masterPair.first.getDatabaseNames().get(0));
Assertions.assertEquals(1, masterPair.first.getDatabaseNames().get(1));

Thread.sleep(1000);
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterPair.second.dir)) {
LOG.info("skip open {} | {}", entryPair.second.name, entryPair.second.dir);
continue;
followersInfo.stream().forEach(entryPair -> {
entryPair.first.close();
LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir);
});

// Restore follower's (not new master) bdbje dir
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterDir)) {
String masterCopyDir = entryPair.second.dir + "_copy";
FileUtils.deleteDirectory(new File(masterCopyDir));
continue;
}
LOG.info("Delete followerDir {} ", entryPair.second.dir);
FileUtils.deleteDirectory(new File(entryPair.second.dir));
// FileUtils.moveDirectory(new File(entryPair.second.dir), new File(entryPair.second.dir + "_copy2"));
String followerCopyDir = entryPair.second.dir + "_copy";
LOG.info("Move {} to {}", followerCopyDir, entryPair.second.dir);
FileUtils.moveDirectory(new File(followerCopyDir), new File(entryPair.second.dir));
}
entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir));
LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir);
}

BDBEnvironment newMasterEnvironment = null;
boolean found = false;
for (int i = 0; i < 300; i++) {
Thread.sleep(1000);
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterPair.second.dir)) {
LOG.info("skip open {} | {}", entryPair.second.name, entryPair.second.dir);
continue;
}
entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir));
LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir);
}

LOG.info("name:{} state:{} dir:{}", entryPair.first.getReplicatedEnvironment().getNodeName(),
entryPair.first.getReplicatedEnvironment().getState(),
entryPair.second.dir);
if (entryPair.first.getReplicatedEnvironment().getState().equals(ReplicatedEnvironment.State.MASTER)) {
newMasterEnvironment = entryPair.first;
found = true;
BDBEnvironment newMasterEnvironment = null;
boolean found = false;
for (int i = 0; i < 300; i++) {
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterPair.second.dir)) {
continue;
}

LOG.info("name:{} state:{} dir:{}", entryPair.first.getReplicatedEnvironment().getNodeName(),
entryPair.first.getReplicatedEnvironment().getState(),
entryPair.second.dir);
if (entryPair.first.getReplicatedEnvironment().getState().equals(ReplicatedEnvironment.State.MASTER)) {
newMasterEnvironment = entryPair.first;
found = true;
break;
}
}
if (found) {
break;
}
Thread.sleep(1000);
}
if (found) {
break;
}
Thread.sleep(1000);
Assertions.assertNotNull(newMasterEnvironment);

masterDb = newMasterEnvironment.openDatabase(beginDbName);
Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes())));
Assertions.assertEquals(1, newMasterEnvironment.getDatabaseNames().size());
// // old master
masterEnvironment.openReplicatedEnvironment(new File(masterDir));
} finally {
followersInfo.stream().forEach(entryPair -> {
entryPair.first.close();
LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir);
});
}
Assertions.assertNotNull(newMasterEnvironment);

masterDb = newMasterEnvironment.openDatabase(beginDbName);
Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes())));
Assertions.assertEquals(1, newMasterEnvironment.getDatabaseNames().size());
// // old master
masterEnvironment.openReplicatedEnvironment(new File(masterDir));
followersInfo.stream().forEach(entryPair -> {
entryPair.first.close();
LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir);
});
LOG.info("end");
}

Expand Down Expand Up @@ -670,73 +673,80 @@ public void testReadTxnIsNotMatched() throws Exception {
followersInfo.add(Pair.of(followerEnvironment, new NodeInfo(nodeName, nodeHostPort, nodeDir)));
}

Pair<BDBEnvironment, NodeInfo> masterPair = findMaster(followersInfo);
String beginDbName = String.valueOf(0L);
Database masterDb = masterPair.first.openDatabase(beginDbName);
DatabaseEntry key = new DatabaseEntry(randomBytes());
DatabaseEntry value = new DatabaseEntry(randomBytes());
Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value));
Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size());
LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir);
try {
Pair<BDBEnvironment, NodeInfo> masterPair = findMaster(followersInfo);
String beginDbName = String.valueOf(0L);
Database masterDb = masterPair.first.openDatabase(beginDbName);
DatabaseEntry key = new DatabaseEntry(randomBytes());
DatabaseEntry value = new DatabaseEntry(randomBytes());
Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value));
Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size());
LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir);

for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterPair.second.dir)) {
LOG.info("skip {}", entryPair.second.name);
continue;
}

Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size());
Database followerDb = entryPair.first.openDatabase(beginDbName);
DatabaseEntry readValue = new DatabaseEntry();
Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new String(readValue.getData()));
}
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterPair.second.dir)) {
LOG.info("skip {}", entryPair.second.name);
continue;
}

Field envImplField = ReplicatedEnvironment.class.getDeclaredField("repEnvironmentImpl");
envImplField.setAccessible(true);
RepImpl impl = (RepImpl) envImplField.get(masterPair.first.getReplicatedEnvironment());
Assertions.assertNotNull(impl);

new Expectations(impl) {{
// Below method will replicate log item to followers.
impl.registerVLSN(withNotNull());
// Below method will wait until the logs are replicated.
impl.postLogCommitHook(withNotNull(), withNotNull());
result = new InsufficientAcksException("mocked");
}};

long count = masterDb.count();
final Database oldMasterDb = masterDb;
Assertions.assertThrows(InsufficientAcksException.class, () -> {
// Since this key is not replicated to any replicas, it should not be read.
DatabaseEntry k = new DatabaseEntry(new byte[]{1, 2, 3});
DatabaseEntry v = new DatabaseEntry(new byte[]{4, 5, 6});
oldMasterDb.put(null, k, v);
});
Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size());
Database followerDb = entryPair.first.openDatabase(beginDbName);
DatabaseEntry readValue = new DatabaseEntry();
Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new String(readValue.getData()));
}

LOG.info("close old master {} | {}", masterPair.second.name, masterPair.second.dir);
masterDb.close();
masterEnvironment.getEpochDB().close();
masterEnvironment.close();
Field envImplField = ReplicatedEnvironment.class.getDeclaredField("repEnvironmentImpl");
envImplField.setAccessible(true);
RepImpl impl = (RepImpl) envImplField.get(masterPair.first.getReplicatedEnvironment());
Assertions.assertNotNull(impl);

new Expectations(impl) {{
// Below method will replicate log item to followers.
impl.registerVLSN(withNotNull());
// Below method will wait until the logs are replicated.
impl.postLogCommitHook(withNotNull(), withNotNull());
result = new InsufficientAcksException("mocked");
}};

long count = masterDb.count();
final Database oldMasterDb = masterDb;
Assertions.assertThrows(InsufficientAcksException.class, () -> {
// Since this key is not replicated to any replicas, it should not be read.
DatabaseEntry k = new DatabaseEntry(new byte[]{1, 2, 3});
DatabaseEntry v = new DatabaseEntry(new byte[]{4, 5, 6});
oldMasterDb.put(null, k, v);
});

LOG.info("close old master {} | {}", masterPair.second.name, masterPair.second.dir);
masterDb.close();
masterEnvironment.getEpochDB().close();
masterEnvironment.close();

for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterPair.second.dir)) {
LOG.info("skip {}", entryPair.second.name);
continue;
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
if (entryPair.second.dir.equals(masterPair.second.dir)) {
LOG.info("skip {}", entryPair.second.name);
continue;
}
LOG.info("close follower {} | {}", entryPair.second.name, entryPair.second.dir);
entryPair.first.close();
}
LOG.info("close follower {} | {}", entryPair.second.name, entryPair.second.dir);
entryPair.first.close();
}

masterPair.first.openReplicatedEnvironment(new File(masterPair.second.dir));
masterDb = masterPair.first.openDatabase(beginDbName);
LOG.info("open {} | {}", masterPair.second.name, masterPair.second.dir);
masterPair.first.openReplicatedEnvironment(new File(masterPair.second.dir));
masterDb = masterPair.first.openDatabase(beginDbName);
LOG.info("open {} | {}", masterPair.second.name, masterPair.second.dir);

// The local commit txn is readable!!!
Assertions.assertEquals(count + 1, masterDb.count());
// The local commit txn is readable!!!
Assertions.assertEquals(count + 1, masterDb.count());

key = new DatabaseEntry(new byte[]{1, 2, 3});
DatabaseEntry readValue = new DatabaseEntry();
Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null, key, readValue, LockMode.DEFAULT));
key = new DatabaseEntry(new byte[]{1, 2, 3});
DatabaseEntry readValue = new DatabaseEntry();
Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null, key, readValue, LockMode.DEFAULT));
} finally {
followersInfo.stream().forEach(entryPair -> {
entryPair.first.close();
LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir);
});
}
}
}
Loading