Skip to content

Commit

Permalink
Suggested updates to apache#458 pull request
Browse files Browse the repository at this point in the history
  • Loading branch information
ctubbsii committed May 2, 2018
1 parent c403dff commit 1f9af30
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

import java.io.EOFException;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Objects;

import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.commons.collections.buffer.PriorityBuffer;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -32,23 +37,23 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import com.google.common.base.Preconditions;

/**
* Provide simple Map.Reader methods over multiple Maps.
* A class which reads sorted recovery logs produced from a single WAL.
*
* Presently only supports next() and seek() and works on all the Map directories within a
* directory. The primary purpose of this class is to merge the results of multiple Reduce jobs that
* result in Map output files.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class MultiReader {
public class RecoveryLogReader {

/**
* Group together the next key/value from a Reader with the Reader
*
*/
private static class Index implements Comparable<Index> {
Reader reader;
WritableComparable key;
WritableComparable<?> key;
Writable value;
boolean cached = false;

Expand All @@ -62,7 +67,7 @@ private static Object create(java.lang.Class<?> klass) {

public Index(Reader reader) {
this.reader = reader;
key = (WritableComparable) create(reader.getKeyClass());
key = (WritableComparable<?>) create(reader.getKeyClass());
value = (Writable) create(reader.getValueClass());
}

Expand Down Expand Up @@ -92,7 +97,9 @@ public int compareTo(Index o) {
return 1;
if (!o.cached)
return -1;
return key.compareTo(o.key);
@SuppressWarnings({"unchecked", "rawtypes"})
int result = ((WritableComparable) key).compareTo(o.key);
return result;
} catch (IOException ex) {
throw new RuntimeException(ex);
}
Expand All @@ -101,7 +108,7 @@ public int compareTo(Index o) {

private PriorityBuffer heap = new PriorityBuffer();

public MultiReader(VolumeManager fs, Path directory) throws IOException {
public RecoveryLogReader(VolumeManager fs, Path directory) throws IOException {
boolean foundFinish = false;
for (FileStatus child : fs.listStatus(directory)) {
if (child.getPath().getName().startsWith("_"))
Expand All @@ -127,7 +134,7 @@ private static void copy(Writable src, Writable dest) throws IOException {
dest.readFields(input);
}

public synchronized boolean next(WritableComparable key, Writable val) throws IOException {
public synchronized boolean next(WritableComparable<?> key, Writable val) throws IOException {
Index elt = (Index) heap.remove();
try {
elt.cache();
Expand All @@ -144,13 +151,13 @@ public synchronized boolean next(WritableComparable key, Writable val) throws IO
return true;
}

public synchronized boolean seek(WritableComparable key) throws IOException {
synchronized boolean seek(WritableComparable<?> key) throws IOException {
PriorityBuffer reheap = new PriorityBuffer(heap.size());
boolean result = false;
for (Object obj : heap) {
Index index = (Index) obj;
try {
WritableComparable found = index.reader.getClosest(key, index.value, true);
WritableComparable<?> found = index.reader.getClosest(key, index.value, true);
if (found != null && found.equals(key)) {
result = true;
}
Expand All @@ -164,7 +171,7 @@ public synchronized boolean seek(WritableComparable key) throws IOException {
return result;
}

public void close() throws IOException {
void close() throws IOException {
IOException problem = null;
for (Object obj : heap) {
Index index = (Index) obj;
Expand All @@ -179,4 +186,66 @@ public void close() throws IOException {
heap = null;
}

volatile boolean returnedIterator = false;

// TODO make this primary entry into this class, and remove volatile boolean and make rest private
Iterator<Entry<LogFileKey,LogFileValue>> getIterator(LogFileKey start, LogFileKey end)
throws IOException {
Preconditions.checkState(!returnedIterator, "Each reader can have only one iterator");
returnedIterator = true;
return new RecoveryLogReaderIterator(this, start, end);
}

private static class RecoveryLogReaderIterator
implements Iterator<Entry<LogFileKey,LogFileValue>> {

private RecoveryLogReader reader;
private LogFileKey key = new LogFileKey();
private LogFileValue value = new LogFileValue();
private boolean hasNext;
private LogFileKey end;

RecoveryLogReaderIterator(RecoveryLogReader reader, LogFileKey start, LogFileKey end)
throws IOException {
this.reader = reader;
this.end = end;

reader.seek(start);

hasNext = reader.next(key, value);

if (hasNext && key.compareTo(start) < 0) {
throw new IllegalStateException("First key is less than start " + key + " " + start);
}

if (hasNext && key.compareTo(end) > 0) {
hasNext = false;
}
}

@Override
public boolean hasNext() {
return hasNext;
}

@Override
public Entry<LogFileKey,LogFileValue> next() {
Preconditions.checkState(hasNext);
Entry<LogFileKey,LogFileValue> entry = new AbstractMap.SimpleImmutableEntry<>(key, value);

key = new LogFileKey();
value = new LogFileValue();
try {
hasNext = reader.next(key, value);
if (hasNext && key.compareTo(end) > 0) {
hasNext = false;
}
} catch (IOException e) {
throw new IllegalStateException(e);
}

return entry;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,85 +19,41 @@
import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;

import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.Path;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.UnmodifiableIterator;

/**
* Iterates over multiple recovery logs merging them into a single sorted stream.
* Iterates over multiple sorted recovery logs merging them into a single sorted stream.
*/
public class RecoveryLogsIterator
implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {

private List<MultiReader> readers;
private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;

private static class MultiReaderIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {

private MultiReader reader;
private LogFileKey key = new LogFileKey();
private LogFileValue value = new LogFileValue();
private boolean hasNext;
private LogFileKey end;

MultiReaderIterator(MultiReader reader, LogFileKey start, LogFileKey end) throws IOException {
this.reader = reader;
this.end = end;

reader.seek(start);

hasNext = reader.next(key, value);

if (hasNext && key.compareTo(start) < 0) {
throw new IllegalStateException("First key is less than start " + key + " " + start);
}

if (hasNext && key.compareTo(end) > 0) {
hasNext = false;
}
}
private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);

@Override
public boolean hasNext() {
return hasNext;
}

@Override
public Entry<LogFileKey,LogFileValue> next() {
Preconditions.checkState(hasNext);
Entry<LogFileKey,LogFileValue> entry = new AbstractMap.SimpleImmutableEntry<>(key, value);

key = new LogFileKey();
value = new LogFileValue();
try {
hasNext = reader.next(key, value);
if (hasNext && key.compareTo(end) > 0) {
hasNext = false;
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return entry;
}
}
private List<RecoveryLogReader> readers;
private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;

/**
* Ensures source iterator provides data in sorted order
*/
// TODO add unit test and move to RecoveryLogReader
@VisibleForTesting
static class SortCheckIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {

private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
Expand Down Expand Up @@ -125,67 +81,29 @@ public Entry<LogFileKey,LogFileValue> next() {
}
}

private MultiReader open(VolumeManager fs, Path log) throws IOException {
MultiReader reader = new MultiReader(fs, log);
// TODO get rid of this (push down into iterator in RecoveryLogReader)
private RecoveryLogReader open(VolumeManager fs, Path log) throws IOException {
RecoveryLogReader reader = new RecoveryLogReader(fs, log);
LogFileKey key = new LogFileKey();
LogFileValue value = new LogFileValue();
if (!reader.next(key, value)) {
reader.close();
return null;
}
if (key.event != OPEN) {
reader.close();
throw new RuntimeException("First log entry value is not OPEN");
RuntimeException e = new IllegalStateException(
"First log entry value is not OPEN (" + log + ")");
try {
reader.close();
} catch (Exception e2) {
e.addSuppressed(e2);
}
throw e;
}

return reader;
}

static LogFileKey maxKey(LogEvents event) {
LogFileKey key = new LogFileKey();
key.event = event;
key.tid = Integer.MAX_VALUE;
key.seq = Long.MAX_VALUE;
return key;
}

static LogFileKey maxKey(LogEvents event, int tid) {
LogFileKey key = maxKey(event);
key.tid = tid;
return key;
}

static LogFileKey minKey(LogEvents event) {
LogFileKey key = new LogFileKey();
key.event = event;
key.tid = 0;
key.seq = 0;
return key;
}

static LogFileKey minKey(LogEvents event, int tid) {
LogFileKey key = minKey(event);
key.tid = tid;
return key;
}

/**
* Iterates only over keys with the specified event (some events are equivalent for sorting) and
* tid type.
*/
RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogEvents event, int tid)
throws IOException {
this(fs, recoveryLogPaths, minKey(event, tid), maxKey(event, tid));
}

/**
* Iterates only over keys with the specified event (some events are equivalent for sorting).
*/
RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogEvents event)
throws IOException {
this(fs, recoveryLogPaths, minKey(event), maxKey(event));
}

/**
* Iterates only over keys between [start,end].
*/
Expand All @@ -197,11 +115,10 @@ static LogFileKey minKey(LogEvents event, int tid) {

try {
for (Path log : recoveryLogPaths) {
MultiReader reader = open(fs, log);
RecoveryLogReader reader = open(fs, log);
if (reader != null) {
readers.add(reader);
iterators.add(
new SortCheckIterator(log.getName(), new MultiReaderIterator(reader, start, end)));
iterators.add(new SortCheckIterator(log.getName(), reader.getIterator(start, end)));
}
}

Expand All @@ -213,7 +130,11 @@ public int compare(Entry<LogFileKey,LogFileValue> o1, Entry<LogFileKey,LogFileVa
});

} catch (RuntimeException | IOException e) {
close();
try {
close();
} catch (Exception e2) {
e.addSuppressed(e2);
}
throw e;
}
}
Expand All @@ -230,11 +151,11 @@ public Entry<LogFileKey,LogFileValue> next() {

@Override
public void close() {
for (MultiReader reader : readers) {
for (RecoveryLogReader reader : readers) {
try {
reader.close();
} catch (IOException e) {
Log.debug("Failed to close reader", e);
LOG.debug("Failed to close reader", e);
}
}
}
Expand Down
Loading

0 comments on commit 1f9af30

Please sign in to comment.