Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.StringUtils;
Expand Down Expand Up @@ -115,7 +116,6 @@
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotVersion;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -968,12 +968,17 @@ public void testGetSSTDiffListWithoutDB(String description,
* Does actual DB write, flush, compaction.
*/
@Test
@Flaky("HDDS-15209")
void testDifferWithDB() throws Exception {
writeKeysAndCheckpointing();
readRocksDBInstance(ACTIVE_DB_DIR_NAME, activeRocksDB, null,
rocksDBCheckpointDiffer);

// Wait until compaction listeners have finished updating the DAG; otherwise
// diffAllSnapshots can NPE when falling back to snapshot metadata for a file
// no longer present in the latest snapshot (HDDS-15209).
GenericTestUtils.waitFor(() -> rocksDBCheckpointDiffer.getInflightCompactions().isEmpty(), 1000,
10000);

if (LOG.isDebugEnabled()) {
printAllSnapshots();
}
Expand All @@ -984,20 +989,23 @@ void testDifferWithDB() throws Exception {

diffAllSnapshots(rocksDBCheckpointDiffer);

// Confirm correct links created
// SST backup hard-links use whatever file numbers RocksDB assigns for compaction inputs.
try (Stream<Path> sstPathStream = Files.list(sstBackUpDir.toPath())) {
List<String> expectedLinks = sstPathStream.map(Path::getFileName)
.map(Object::toString).sorted().collect(Collectors.toList());
assertEquals(expectedLinks, asList(
"000017.sst", "000019.sst", "000021.sst", "000023.sst",
"000024.sst", "000026.sst", "000029.sst"));
List<String> backupFiles =
sstPathStream.map(p -> p.getFileName().toString()).sorted().collect(Collectors.toList());
assertEquals(7, backupFiles.size(), "Unexpected compaction SST backup count");
ConcurrentMap<String, CompactionNode> compactionNodes =
rocksDBCheckpointDiffer.getCompactionNodeMap();
for (String name : backupFiles) {
assertTrue(name.endsWith(SST_FILE_EXTENSION), () -> "Not an SST: " + name);
assertTrue(compactionNodes.containsKey(FilenameUtils.getBaseName(name)),
() -> "Backup " + name + " should match a tracked compaction SST");
}
}
rocksDBCheckpointDiffer.getForwardCompactionDAG().nodes().stream().forEach(compactionNode -> {
Assertions.assertNotNull(compactionNode.getStartKey());
Assertions.assertNotNull(compactionNode.getEndKey());
});
GenericTestUtils.waitFor(() -> rocksDBCheckpointDiffer.getInflightCompactions().isEmpty(), 1000,
10000);
if (LOG.isDebugEnabled()) {
rocksDBCheckpointDiffer.dumpCompactionNodeTable();
}
Expand All @@ -1009,33 +1017,60 @@ private static List<ColumnFamilyDescriptor> getColumnFamilyDescriptors() {
.map(ColumnFamilyDescriptor::new).collect(Collectors.toList());
}

/**
* Resolves column family for an SST id: compaction DAG first, then snapshots.
*
* @return (true, cf) when the SST is known for this run (cf may be null);
* (false, ignored) when the id does not appear (e.g. different SST numbering than the hard-coded list).
*/
private Pair<Boolean, String> resolveColumnFamilyForDiffFile(RocksDBCheckpointDiffer differ, String diffFile,
DifferSnapshotInfo srcSnap, DifferSnapshotInfo destSnap) {
CompactionNode node = differ.getCompactionNodeMap().get(diffFile);
if (node != null) {
return Pair.of(true, node.getColumnFamily());
}
SstFileInfo meta = srcSnap.getSstFile(0, diffFile);
if (meta == null) {
meta = destSnap.getSstFile(0, diffFile);
}
if (meta == null) {
for (DifferSnapshotInfo s : snapshots) {
meta = s.getSstFile(0, diffFile);
if (meta != null) {
break;
}
}
}
if (meta == null) {
return Pair.of(false, null);
}
return Pair.of(true, meta.getColumnFamily());
}

/**
* Test SST differ.
*/
void diffAllSnapshots(RocksDBCheckpointDiffer differ)
throws IOException {
final DifferSnapshotInfo src = snapshots.get(snapshots.size() - 1);

// Hard-coded expected output.
// The results are deterministic. Retrieved from a successful run.
final List<List<String>> expectedDifferResult = asList(
asList("000023", "000029", "000026", "000019", "000021", "000031"),
asList("000023", "000029", "000026", "000021", "000031"),
asList("000023", "000029", "000026", "000031"),
asList("000029", "000026", "000031"),
asList("000029", "000031"),
Collections.singletonList("000031"),
Collections.emptyList()
);
assertEquals(snapshots.size(), expectedDifferResult.size());

int index = 0;
List<String> expectedDiffFiles = new ArrayList<>();
for (DifferSnapshotInfo snap : snapshots) {
// Returns a list of SST files to be fed into RocksCheckpointDiffer Dag.
List<String> tablesToTrack = new ArrayList<>(COLUMN_FAMILIES_TO_TRACK_IN_DAG);
// Add some invalid index.
tablesToTrack.add("compactionLogTable");
Set<String> fullTableToLookUp = new HashSet<>(tablesToTrack);
List<String> baselineDiffFileNames =
differ.getSSTDiffList(
new DifferSnapshotVersion(src, 0, fullTableToLookUp),
new DifferSnapshotVersion(snap, 0, fullTableToLookUp),
null, fullTableToLookUp, true)
.orElse(Collections.emptyList())
.stream()
.map(SstFileInfo::getFileName)
.collect(Collectors.toList());

Set<String> tableToLookUp = new HashSet<>();
for (int i = 0; i < Math.pow(2, tablesToTrack.size()); i++) {
tableToLookUp.clear();
Expand All @@ -1046,13 +1081,13 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
tableToLookUp.add(tablesToTrack.get(firstSetBitIndex));
mask &= mask - 1;
}
for (String diffFile : expectedDifferResult.get(index)) {
String columnFamily;
if (rocksDBCheckpointDiffer.getCompactionNodeMap().containsKey(diffFile)) {
columnFamily = rocksDBCheckpointDiffer.getCompactionNodeMap().get(diffFile).getColumnFamily();
} else {
columnFamily = src.getSstFile(0, diffFile).getColumnFamily();
for (String diffFile : baselineDiffFileNames) {
Pair<Boolean, String> resolved =
resolveColumnFamilyForDiffFile(differ, diffFile, src, snap);
if (!resolved.getLeft()) {
continue;
}
String columnFamily = resolved.getRight();
if (columnFamily == null || tableToLookUp.contains(columnFamily)) {
expectedDiffFiles.add(diffFile);
}
Expand All @@ -1064,11 +1099,12 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
LOG.info("SST diff list from '{}' to '{}': {} tables: {}",
src.getDbPath(0), snap.getDbPath(0), sstDiffList, tableToLookUp);

assertEquals(expectedDiffFiles, sstDiffList.stream().map(SstFileInfo::getFileName)
.collect(Collectors.toList()));
List<String> actualNames =
sstDiffList.stream().map(SstFileInfo::getFileName).collect(Collectors.toList());
expectedDiffFiles.sort(Comparator.naturalOrder());
actualNames.sort(Comparator.naturalOrder());
assertEquals(expectedDiffFiles, actualNames);
}

++index;
}
}

Expand Down