Skip to content

Commit

Permalink
Rework RecoveryLogsIterator and SortedLogRecoveryTest for multipart logs
Browse files Browse the repository at this point in the history
  • Loading branch information
milleruntime committed May 26, 2021
1 parent 744760a commit 007d581
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 46 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.tserver.logger.LogEvents;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoClo
* Iterates only over keys in the range [start,end].
*/
RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogDirs, LogFileKey start,
LogFileKey end, Text... colFamToFetch) throws IOException {
LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {

List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
scanners = new ArrayList<>();
Expand All @@ -68,23 +69,22 @@ public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoClo

for (Path logDir : recoveryLogDirs) {
LOG.debug("Opening recovery log dir {}", logDir.getName());
for (Path log : getFiles(fs, logDir)) {
var scanner = RFile.newScanner().from(log.toString())
.withFileSystem(fs.getFileSystemByPath(log)).build();
for (var cf : colFamToFetch)
scanner.fetchColumnFamily(cf);
scanner.setRange(range);
LOG.debug("Get iterator for Key Range = {} to {}", startKey, endKey);
Iterator<Entry<Key,Value>> scanIter = scanner.iterator();

if (scanIter.hasNext()) {
LOG.debug("Write ahead log {} has data in range {} {}", log.getName(), start, end);
iterators.add(scanIter);
scanners.add(scanner);
} else {
LOG.debug("Write ahead log {} has no data in range {} {}", log.getName(), start, end);
scanner.close();
}
var scanner = RFile.newScanner().from(getFiles(fs, logDir))
.withFileSystem(fs.getFileSystemByPath(logDir)).build();
for (var cf : colFamToFetch)
scanner.fetchColumnFamily(new Text(cf.name()));
scanner.setRange(range);
LOG.debug("Get iterator for Key Range = {} to {}", startKey, endKey);
PeekingIterator<Entry<Key,Value>> scanIter = new PeekingIterator<>(scanner.iterator());
if (scanIter.hasNext()) {
if (checkFirstKey)
validateFirstKey(scanIter, logDir);
LOG.debug("Write ahead log {} has data in range {} {}", logDir.getName(), start, end);
iterators.add(scanIter);
scanners.add(scanner);
} else {
LOG.debug("Write ahead log {} has no data in range {} {}", logDir.getName(), start, end);
scanner.close();
}
}
iter = Iterators.mergeSorted(iterators, Entry.comparingByKey());
Expand Down Expand Up @@ -113,9 +113,9 @@ public void close() {
/**
* Check for sorting signal files (finished/failed) and get the logs in the provided directory.
*/
private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException {
private String[] getFiles(VolumeManager fs, Path directory) throws IOException {
boolean foundFinish = false;
List<Path> logFiles = new ArrayList<>();
List<String> logFiles = new ArrayList<>();
for (FileStatus child : fs.listStatus(directory)) {
if (child.getPath().getName().startsWith("_"))
continue;
Expand All @@ -128,24 +128,21 @@ private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException
}
FileSystem ns = fs.getFileSystemByPath(child.getPath());
Path fullLogPath = ns.makeQualified(child.getPath());
try (var scanner = RFile.newScanner().from(fullLogPath.toString())
.withFileSystem(fs.getFileSystemByPath(fullLogPath)).build()) {
validateFirstKey(scanner.iterator(), fullLogPath);
}
logFiles.add(fullLogPath);
logFiles.add(fullLogPath.toString());
}
if (!foundFinish)
throw new IOException(
"Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found in " + directory);
return logFiles;
return logFiles.toArray(new String[0]);
}

/**
* Check that the first entry in the WAL is OPEN
* Check that the first entry in the WAL is OPEN. Only need to do this once.
*/
private void validateFirstKey(Iterator<Map.Entry<Key,Value>> iterator, Path fullLogPath) throws IOException {
private void validateFirstKey(PeekingIterator<Map.Entry<Key,Value>> iterator, Path fullLogPath)
throws IOException {
if (iterator.hasNext()) {
Key firstKey = iterator.next().getKey();
Key firstKey = iterator.peek().getKey();
LogFileKey key = LogFileKey.fromKey(firstKey);
if (key.event != LogEvents.OPEN) {
throw new IllegalStateException("First log entry is not OPEN " + fullLogPath);
Expand Down
Expand Up @@ -25,6 +25,7 @@
import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;

import java.io.IOException;
import java.util.AbstractMap;
Expand All @@ -50,7 +51,6 @@
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -108,9 +108,8 @@ static LogFileKey minKey(LogEvents event, int tabletId) {
private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogDirs) throws IOException {
int tabletId = -1;

try (RecoveryLogsIterator rli =
new RecoveryLogsIterator(fs, recoveryLogDirs, minKey(DEFINE_TABLET, extent),
maxKey(DEFINE_TABLET, extent), new Text(DEFINE_TABLET.name()))) {
try (var rli = new RecoveryLogsIterator(fs, recoveryLogDirs, minKey(DEFINE_TABLET, extent),
maxKey(DEFINE_TABLET, extent), true, DEFINE_TABLET, OPEN)) {

KeyExtent alternative = extent;
if (extent.isRootTablet()) {
Expand Down Expand Up @@ -210,9 +209,9 @@ private long findRecoverySeq(List<Path> recoveryLogs, Set<String> tabletFiles, i
long lastFinish = 0;
long recoverySeq = 0;

try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs,
minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId),
new Text(COMPACTION_START.name()), new Text(COMPACTION_FINISH.name()))) {
try (RecoveryLogsIterator rli =
new RecoveryLogsIterator(fs, recoveryLogs, minKey(COMPACTION_START, tabletId),
maxKey(COMPACTION_START, tabletId), false, COMPACTION_START, COMPACTION_FINISH)) {

DeduplicatingIterator ddi = new DeduplicatingIterator(rli);

Expand Down Expand Up @@ -267,8 +266,8 @@ private void playbackMutations(List<Path> recoveryLogs, MutationReceiver mr, int

LogFileKey end = maxKey(MUTATION, tabletId);

try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, start, end,
new Text(MUTATION.name()), new Text(MANY_MUTATIONS.name()))) {
try (RecoveryLogsIterator rli =
new RecoveryLogsIterator(fs, recoveryLogs, start, end, false, MUTATION, MANY_MUTATIONS)) {
while (rli.hasNext()) {
Entry<Key,Value> entry = rli.next();
LogFileKey logFileKey = LogFileKey.fromKey(entry.getKey());
Expand Down
Expand Up @@ -72,6 +72,7 @@
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
public class SortedLogRecoveryTest {

static final int bufferSize = 5;
static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
static final Text cf = new Text("cf");
static final Text cq = new Text("cq");
Expand Down Expand Up @@ -156,11 +157,11 @@ public void receive(Mutation m) {
}

private List<Mutation> recover(Map<String,KeyValue[]> logs, KeyExtent extent) throws IOException {
return recover(logs, new HashSet<>(), extent);
return recover(logs, new HashSet<>(), extent, bufferSize);
}

private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent)
throws IOException {
private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent,
int bufferSize) throws IOException {

final String workdir = tempFolder.newFolder().getAbsolutePath();
try (var fs = VolumeManagerImpl.getLocalForTesting(workdir)) {
Expand All @@ -175,12 +176,18 @@ private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, K
for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
String destPath = workdir + "/" + entry.getKey();
FileSystem ns = fs.getFileSystemByPath(new Path(destPath));
// convert test object to Pairs for LogSorter
// convert test object to Pairs for LogSorter, flushing based on bufferSize
List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
int parts = 0;
for (KeyValue pair : entry.getValue()) {
buffer.add(new Pair<>(pair.key, pair.value));
if (buffer.size() >= bufferSize) {
LogSorter.writeBuffer(context, destPath, buffer, parts++);
buffer.clear();
}
}
LogSorter.writeBuffer(context, destPath, buffer, 0);
LogSorter.writeBuffer(context, destPath, buffer, parts);

ns.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
dirs.add(new Path(destPath));
}
Expand Down Expand Up @@ -709,7 +716,7 @@ public void testNoFinish0() throws Exception {
Map<String,KeyValue[]> logs = new TreeMap<>();
logs.put("entries", entries);

List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent, bufferSize);

assertEquals(0, mutations.size());
}
Expand All @@ -732,7 +739,7 @@ public void testNoFinish1() throws Exception {
Map<String,KeyValue[]> logs = new TreeMap<>();
logs.put("entries", entries);

List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent, bufferSize);

assertEquals(1, mutations.size());
assertEquals(m, mutations.get(0));
Expand Down Expand Up @@ -837,7 +844,7 @@ private void runPathTest(boolean startMatches, String compactionStartFile, Strin

HashSet<String> filesSet = new HashSet<>();
filesSet.addAll(Arrays.asList(tabletFiles));
List<Mutation> mutations = recover(logs, filesSet, extent);
List<Mutation> mutations = recover(logs, filesSet, extent, bufferSize);

if (startMatches) {
assertEquals(1, mutations.size());
Expand Down

0 comments on commit 007d581

Please sign in to comment.