Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sorted recovery write to RFiles #2117

Merged
merged 29 commits into from Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8a52be2
Change sorted WALs to write to RFiles
milleruntime May 6, 2021
5d1920a
Improve writeBuffer method
milleruntime May 26, 2021
e546835
Revert timeout increase
milleruntime May 26, 2021
aad476a
Add log path to exception
milleruntime May 26, 2021
a1ba8f3
Rework RecoveryLogsIterator and SortedLogRecoveryTest for multipart logs
milleruntime May 26, 2021
b69ebee
Only check first key once, before setting scan range
milleruntime May 27, 2021
42410a5
Clean up SortedLogRecoveryTest
milleruntime May 27, 2021
84785a6
Create scanner per WAL part for better memory usage
milleruntime May 28, 2021
ef1d2c5
Make logging consistent with old recovery
milleruntime Jun 2, 2021
91fbd0b
Refactor to pass server context to provide configuration
milleruntime Jun 2, 2021
4afab5e
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 7, 2021
7c0e9f2
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 7, 2021
26abf73
Use KeyBuilder to store qualifier bytes so I don't need base64
milleruntime Jun 8, 2021
ec1eaae
Use dummy filename for start and end keys
milleruntime Jun 8, 2021
57b6357
Format the row using bytes
milleruntime Jun 8, 2021
e0e0201
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 8, 2021
b55e14c
Fixes
milleruntime Jun 8, 2021
231397d
Updates from PR
milleruntime Jun 9, 2021
d688522
Remove setting extent in min and max keys
milleruntime Jun 9, 2021
ff53d8a
Some refactoring from PR
milleruntime Jun 16, 2021
0609f37
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 16, 2021
05014a3
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 16, 2021
f47faec
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 16, 2021
2f40e3e
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 16, 2021
7a147ae
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 16, 2021
e831472
Drop colFamToFetch
milleruntime Jun 17, 2021
352af97
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 17, 2021
6cd0ef8
Update server/tserver/src/main/java/org/apache/accumulo/tserver/logge…
milleruntime Jun 17, 2021
6fc3582
CR updates
milleruntime Jun 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1133,7 +1133,7 @@ public void minorCompactionStarted(CommitSession tablet, long lastUpdateSequence

public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries,
Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
List<Path> recoveryLogs = new ArrayList<>();
List<Path> recoveryDirs = new ArrayList<>();
List<LogEntry> sorted = new ArrayList<>(logEntries);
sorted.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));
for (LogEntry entry : sorted) {
Expand All @@ -1148,9 +1148,9 @@ public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntrie
throw new IOException(
"Unable to find recovery files for extent " + extent + " logEntry: " + entry);
}
recoveryLogs.add(recovery);
recoveryDirs.add(recovery);
}
logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver);
logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver);
}

public int createLogId() {
Expand Down
Expand Up @@ -23,16 +23,20 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.master.thrift.RecoveryStatus;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.ThreadPools;
Expand All @@ -47,11 +51,12 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

public class LogSorter {

private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
Expand Down Expand Up @@ -141,7 +146,7 @@ public void sort(VolumeManager fs, String name, Path srcPath, String destPath)
// Creating a 'finished' marker will cause recovery to proceed normally and the
// empty file will be correctly ignored downstream.
fs.mkdirs(new Path(destPath));
writeBuffer(destPath, Collections.emptyList(), part++);
writeBuffer(context, destPath, Collections.emptyList(), part++);
fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
return;
}
Expand All @@ -159,10 +164,10 @@ public void sort(VolumeManager fs, String name, Path srcPath, String destPath)
value.readFields(decryptingInput);
buffer.add(new Pair<>(key, value));
}
writeBuffer(destPath, buffer, part++);
writeBuffer(context, destPath, buffer, part++);
buffer.clear();
} catch (EOFException ex) {
writeBuffer(destPath, buffer, part++);
writeBuffer(context, destPath, buffer, part++);
break;
}
}
Expand All @@ -171,21 +176,6 @@ public void sort(VolumeManager fs, String name, Path srcPath, String destPath)
getSortTime());
}

private void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part)
throws IOException {
Path path = new Path(destPath, String.format("part-r-%05d", part));
FileSystem ns = context.getVolumeManager().getFileSystemByPath(path);

try (MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns.makeQualified(path),
MapFile.Writer.keyClass(LogFileKey.class),
MapFile.Writer.valueClass(LogFileValue.class))) {
buffer.sort(Comparator.comparing(Pair::getFirst));
for (Pair<LogFileKey,LogFileValue> entry : buffer) {
output.append(entry.getFirst(), entry.getSecond());
}
}
}

synchronized void close() throws IOException {
// If we receive an empty or malformed-header WAL, we won't
// have input streams that need closing. Avoid the NPE.
Expand Down Expand Up @@ -224,6 +214,40 @@ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
this.walBlockSize = DfsLogger.getWalBlockSize(conf);
}

@VisibleForTesting
public static void writeBuffer(ServerContext context, String destPath,
List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
String filename = String.format("part-r-%05d.rf", part);
Path path = new Path(destPath, filename);
FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
milleruntime marked this conversation as resolved.
Show resolved Hide resolved
Path fullPath = fs.makeQualified(path);

// convert the LogFileKeys to Keys, sort and collect the mutations
Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
for (Pair<LogFileKey,LogFileValue> pair : buffer) {
var logFileKey = pair.getFirst();
var logFileValue = pair.getSecond();
Key k = logFileKey.toKey();
var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
if (list != null) {
var muts = new ArrayList<>(list);
muts.addAll(logFileValue.mutations);
keyListMap.put(logFileKey.toKey(), muts);
}
keith-turner marked this conversation as resolved.
Show resolved Hide resolved
}

try (var writer = FileOperations.getInstance().newWriterBuilder()
.forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService())
.withTableConfiguration(DefaultConfiguration.getInstance()).build()) {
keith-turner marked this conversation as resolved.
Show resolved Hide resolved
writer.startDefaultLocalityGroup();
for (var entry : keyListMap.entrySet()) {
LogFileValue val = new LogFileValue();
val.mutations = entry.getValue();
writer.append(entry.getKey(), val.toValue());
}
}
}

public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool)
throws KeeperException, InterruptedException {
this.threadPool = distWorkQThreadPool;
Expand Down
Expand Up @@ -138,7 +138,7 @@ public RecoveryLogReader(VolumeManager fs, Path directory, LogFileKey start, Log
}
if (!foundFinish)
throw new IOException(
"Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found in " + directory);
"Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory);

iter = new SortCheckIterator(new RangeIterator(start, end));
}
Expand Down
Expand Up @@ -19,61 +19,82 @@
package org.apache.accumulo.tserver.log;

import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;

import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Iterators;
import com.google.common.collect.UnmodifiableIterator;

/**
* Iterates over multiple sorted recovery logs merging them into a single sorted stream.
*/
public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
public class RecoveryLogsIterator
implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);

List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
private final List<Scanner> scanners;
private final Iterator<Entry<Key,Value>> iter;

/**
* Iterates only over keys in the range [start,end].
* Scans the files in each recoveryLogDir over the range [start,end].
*/
RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
LogFileKey end) throws IOException {
RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
LogFileKey end, boolean checkFirstKey) throws IOException {

iterators = new ArrayList<>(recoveryLogPaths.size());
List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
scanners = new ArrayList<>();
Range range = LogFileKey.toRange(start, end);
var vm = context.getVolumeManager();

try {
for (Path log : recoveryLogPaths) {
LOG.debug("Opening recovery log {}", log.getName());
RecoveryLogReader rlr = new RecoveryLogReader(fs, log, start, end);
if (rlr.hasNext()) {
for (Path logDir : recoveryLogDirs) {
LOG.debug("Opening recovery log dir {}", logDir.getName());
List<Path> logFiles = getFiles(vm, logDir);
var fs = vm.getFileSystemByPath(logDir);

// only check the first key once to prevent extra iterator creation and seeking
if (checkFirstKey) {
validateFirstKey(context, fs, logFiles, logDir);
}

for (Path log : logFiles) {
var scanner = RFile.newScanner().from(log.toString()).withFileSystem(fs)
.withTableProperties(context.getConfiguration()).build();

scanner.setRange(range);
Iterator<Entry<Key,Value>> scanIter = scanner.iterator();

if (scanIter.hasNext()) {
LOG.debug("Write ahead log {} has data in range {} {}", log.getName(), start, end);
iterators.add(rlr);
iterators.add(scanIter);
scanners.add(scanner);
} else {
LOG.debug("Write ahead log {} has no data in range {} {}", log.getName(), start, end);
rlr.close();
scanner.close();
ctubbsii marked this conversation as resolved.
Show resolved Hide resolved
}
}

iter = Iterators.mergeSorted(iterators, (o1, o2) -> o1.getKey().compareTo(o2.getKey()));

} catch (RuntimeException | IOException e) {
try {
close();
} catch (Exception e2) {
e.addSuppressed(e2);
}
throw e;
}
iter = Iterators.mergeSorted(iterators, Entry.comparingByKey());
}

@Override
Expand All @@ -83,7 +104,9 @@ public boolean hasNext() {

@Override
public Entry<LogFileKey,LogFileValue> next() {
return iter.next();
Entry<Key,Value> e = iter.next();
return new AbstractMap.SimpleImmutableEntry<>(LogFileKey.fromKey(e.getKey()),
LogFileValue.fromValue(e.getValue()));
}

@Override
Expand All @@ -93,11 +116,50 @@ public void remove() {

@Override
public void close() {
for (CloseableIterator<?> reader : iterators) {
try {
reader.close();
} catch (IOException e) {
LOG.debug("Failed to close reader", e);
scanners.forEach(ScannerBase::close);
}

/**
* Check for sorting signal files (finished/failed) and get the logs in the provided directory.
*/
private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException {
boolean foundFinish = false;
List<Path> logFiles = new ArrayList<>();
for (FileStatus child : fs.listStatus(directory)) {
if (child.getPath().getName().startsWith("_"))
continue;
if (SortedLogState.isFinished(child.getPath().getName())) {
foundFinish = true;
continue;
}
if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) {
milleruntime marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
FileSystem ns = fs.getFileSystemByPath(child.getPath());
milleruntime marked this conversation as resolved.
Show resolved Hide resolved
Path fullLogPath = ns.makeQualified(child.getPath());
logFiles.add(fullLogPath);
}
if (!foundFinish)
throw new IOException(
"Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory);
return logFiles;
}

/**
* Check that the first entry in the WAL is OPEN. Only need to do this once.
*/
private void validateFirstKey(ServerContext context, FileSystem fs, List<Path> logFiles,
Path fullLogPath) {
try (var scanner =
RFile.newScanner().from(logFiles.stream().map(Path::toString).toArray(String[]::new))
.withFileSystem(fs).withTableProperties(context.getConfiguration()).build()) {
Iterator<Entry<Key,Value>> iterator = scanner.iterator();
if (iterator.hasNext()) {
Key firstKey = iterator.next().getKey();
LogFileKey key = LogFileKey.fromKey(firstKey);
if (key.event != LogEvents.OPEN) {
throw new IllegalStateException("First log entry is not OPEN " + fullLogPath);
}
}
}
}
Expand Down