Skip to content

Commit

Permalink
Create scanner per WAL part for better memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
milleruntime committed Jun 2, 2021
1 parent 8b5b8cc commit 475d1ad
Showing 1 changed file with 25 additions and 21 deletions.
Expand Up @@ -56,7 +56,7 @@ public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoClo
/**
* Scans the files in each recoveryLogDir over the range [start,end].
*/
RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogDirs, LogFileKey start,
RecoveryLogsIterator(VolumeManager vm, List<Path> recoveryLogDirs, LogFileKey start,
LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {

List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
Expand All @@ -67,26 +67,30 @@ public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoClo

for (Path logDir : recoveryLogDirs) {
LOG.debug("Opening recovery log dir {}", logDir.getName());
var scanner = RFile.newScanner().from(getFiles(fs, logDir))
.withFileSystem(fs.getFileSystemByPath(logDir)).build();
String[] logFiles = getFiles(vm, logDir);
var fs = vm.getFileSystemByPath(logDir);

// only check the first key once to prevent extra iterator creation and seeking
if (checkFirstKey)
validateFirstKey(scanner.iterator(), logDir);

for (var cf : colFamToFetch)
scanner.fetchColumnFamily(new Text(cf.name()));
scanner.setRange(range);
LOG.debug("Get iterator for Key Range = {} to {}", startKey, endKey);
var scanIter = scanner.iterator();

if (scanIter.hasNext()) {
LOG.debug("Write ahead log {} has data in range {} {}", logDir.getName(), start, end);
iterators.add(scanIter);
scanners.add(scanner);
} else {
LOG.debug("Write ahead log {} has no data in range {} {}", logDir.getName(), start, end);
scanner.close();
if (checkFirstKey) {
validateFirstKey(RFile.newScanner().from(logFiles).withFileSystem(fs).build(), logDir);
}

for (String log : logFiles) {
var scanner = RFile.newScanner().from(log).withFileSystem(fs).build();
for (var cf : colFamToFetch)
scanner.fetchColumnFamily(new Text(cf.name()));
scanner.setRange(range);
LOG.debug("Get iterator for Key Range = {} to {}", startKey, endKey);
Iterator<Entry<Key,Value>> scanIter = scanner.iterator();

if (scanIter.hasNext()) {
LOG.debug("Write ahead log {} has data in range {} {}", log, start, end);
iterators.add(scanIter);
scanners.add(scanner);
} else {
LOG.debug("Write ahead log {} has no data in range {} {}", log, start, end);
scanner.close();
}
}
}
iter = Iterators.mergeSorted(iterators, Entry.comparingByKey());
Expand Down Expand Up @@ -141,8 +145,8 @@ private String[] getFiles(VolumeManager fs, Path directory) throws IOException {
/**
* Check that the first entry in the WAL is OPEN. Only need to do this once.
*/
private void validateFirstKey(Iterator<Entry<Key,Value>> iterator, Path fullLogPath)
throws IOException {
private void validateFirstKey(Scanner scanner, Path fullLogPath) throws IOException {
Iterator<Entry<Key,Value>> iterator = scanner.iterator();
if (iterator.hasNext()) {
Key firstKey = iterator.next().getKey();
LogFileKey key = LogFileKey.fromKey(firstKey);
Expand Down

0 comments on commit 475d1ad

Please sign in to comment.