Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #449 fix two bugs with WAL recovery #458

Merged
merged 4 commits into from May 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -3276,12 +3276,12 @@ public int compare(LogEntry e1, LogEntry e2) {
logger.recover(fs, extent, tconf, recoveryLogs, tabletFiles, mutationReceiver);
}

public int createLogId(KeyExtent tablet) {
AccumuloConfiguration acuTableConf = getTableConfiguration(tablet);
if (DurabilityImpl.fromString(acuTableConf.get(Property.TABLE_DURABILITY)) != Durability.NONE) {
return logIdGenerator.incrementAndGet();
public int createLogId() {
int logId = logIdGenerator.incrementAndGet();
if (logId < 0) {
throw new IllegalStateException("Log Id rolled");
}
return -1;
return logId;
}

public TableConfiguration getTableConfiguration(KeyExtent extent) {
Expand Down
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.tserver.log;

import java.io.IOException;
import java.util.Iterator;

public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
@Override
public void close() throws IOException;
}
Expand Up @@ -601,7 +601,7 @@ public synchronized void defineTablet(long seq, int tid, KeyExtent tablet) throw
final LogFileKey key = new LogFileKey();
key.event = DEFINE_TABLET;
key.seq = seq;
key.tid = tid;
key.tabletId = tid;
key.tablet = tablet;
try {
write(key, EMPTY);
Expand Down Expand Up @@ -662,7 +662,7 @@ public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IO
LogFileKey key = new LogFileKey();
key.event = MANY_MUTATIONS;
key.seq = tabletMutations.getSeq();
key.tid = tabletMutations.getTid();
key.tabletId = tabletMutations.getTid();
LogFileValue value = new LogFileValue();
value.mutations = tabletMutations.getMutations();
data.add(new Pair<>(key, value));
Expand All @@ -688,7 +688,7 @@ public LoggerOperation minorCompactionFinished(long seq, int tid, String fqfn,
LogFileKey key = new LogFileKey();
key.event = COMPACTION_FINISH;
key.seq = seq;
key.tid = tid;
key.tabletId = tid;
return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
}

Expand All @@ -697,7 +697,7 @@ public LoggerOperation minorCompactionStarted(long seq, int tid, String fqfn,
LogFileKey key = new LogFileKey();
key.event = COMPACTION_START;
key.seq = seq;
key.tid = tid;
key.tabletId = tid;
key.filename = fqfn;
return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
}
Expand Down
Expand Up @@ -18,10 +18,16 @@

import java.io.EOFException;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Objects;

import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.commons.collections.buffer.PriorityBuffer;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -32,23 +38,26 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;

/**
* Provide simple Map.Reader methods over multiple Maps.
* A class which reads sorted recovery logs produced from a single WAL.
*
* Presently only supports next() and seek() and works on all the Map directories within a
* directory. The primary purpose of this class is to merge the results of multiple Reduce jobs that
* result in Map output files.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class MultiReader {
public class RecoveryLogReader implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {

/**
* Group together the next key/value from a Reader with the Reader
*
*/
private static class Index implements Comparable<Index> {
Reader reader;
WritableComparable key;
WritableComparable<?> key;
Writable value;
boolean cached = false;

Expand All @@ -62,7 +71,7 @@ private static Object create(java.lang.Class<?> klass) {

public Index(Reader reader) {
this.reader = reader;
key = (WritableComparable) create(reader.getKeyClass());
key = (WritableComparable<?>) create(reader.getKeyClass());
value = (Writable) create(reader.getValueClass());
}

Expand Down Expand Up @@ -92,16 +101,24 @@ public int compareTo(Index o) {
return 1;
if (!o.cached)
return -1;
return key.compareTo(o.key);
@SuppressWarnings({"unchecked", "rawtypes"})
int result = ((WritableComparable) key).compareTo(o.key);
return result;
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}

private PriorityBuffer heap = new PriorityBuffer();
private Iterator<Entry<LogFileKey,LogFileValue>> iter;

public RecoveryLogReader(VolumeManager fs, Path directory) throws IOException {
this(fs, directory, null, null);
}

public MultiReader(VolumeManager fs, Path directory) throws IOException {
public RecoveryLogReader(VolumeManager fs, Path directory, LogFileKey start, LogFileKey end)
throws IOException {
boolean foundFinish = false;
for (FileStatus child : fs.listStatus(directory)) {
if (child.getPath().getName().startsWith("_"))
Expand All @@ -116,6 +133,8 @@ public MultiReader(VolumeManager fs, Path directory) throws IOException {
if (!foundFinish)
throw new IOException(
"Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found in " + directory);

iter = new SortCheckIterator(new RangeIterator(start, end));
}

private static void copy(Writable src, Writable dest) throws IOException {
Expand All @@ -127,7 +146,8 @@ private static void copy(Writable src, Writable dest) throws IOException {
dest.readFields(input);
}

public synchronized boolean next(WritableComparable key, Writable val) throws IOException {
@VisibleForTesting
synchronized boolean next(WritableComparable<?> key, Writable val) throws IOException {
Index elt = (Index) heap.remove();
try {
elt.cache();
Expand All @@ -144,13 +164,14 @@ public synchronized boolean next(WritableComparable key, Writable val) throws IO
return true;
}

public synchronized boolean seek(WritableComparable key) throws IOException {
@VisibleForTesting
synchronized boolean seek(WritableComparable<?> key) throws IOException {
PriorityBuffer reheap = new PriorityBuffer(heap.size());
boolean result = false;
for (Object obj : heap) {
Index index = (Index) obj;
try {
WritableComparable found = index.reader.getClosest(key, index.value, true);
WritableComparable<?> found = index.reader.getClosest(key, index.value, true);
if (found != null && found.equals(key)) {
result = true;
}
Expand All @@ -164,6 +185,7 @@ public synchronized boolean seek(WritableComparable key) throws IOException {
return result;
}

@Override
public void close() throws IOException {
IOException problem = null;
for (Object obj : heap) {
Expand All @@ -179,4 +201,122 @@ public void close() throws IOException {
heap = null;
}

/**
* Ensures source iterator provides data in sorted order
*/
@VisibleForTesting
static class SortCheckIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {

private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;

SortCheckIterator(Iterator<Entry<LogFileKey,LogFileValue>> source) {
this.source = Iterators.peekingIterator(source);

}

@Override
public boolean hasNext() {
return source.hasNext();
}

@Override
public Entry<LogFileKey,LogFileValue> next() {
Entry<LogFileKey,LogFileValue> next = source.next();
if (source.hasNext()) {
Preconditions.checkState(next.getKey().compareTo(source.peek().getKey()) <= 0,
"Keys not in order %s %s", next.getKey(), source.peek().getKey());
}
return next;
}

@Override
public void remove() {
throw new UnsupportedOperationException("remove");
}
}

private class RangeIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {

private LogFileKey key = new LogFileKey();
private LogFileValue value = new LogFileValue();
private boolean hasNext;
private LogFileKey end;

private boolean next(LogFileKey key, LogFileValue value) throws IOException {
try {
return RecoveryLogReader.this.next(key, value);
} catch (EOFException e) {
return false;
}
}

RangeIterator(LogFileKey start, LogFileKey end) throws IOException {
this.end = end;

if (start != null) {
hasNext = next(key, value);

if (hasNext && key.event != LogEvents.OPEN) {
throw new IllegalStateException("First log entry value is not OPEN");
}

seek(start);
}

hasNext = next(key, value);

if (hasNext && start != null && key.compareTo(start) < 0) {
throw new IllegalStateException("First key is less than start " + key + " " + start);
}

if (hasNext && end != null && key.compareTo(end) > 0) {
hasNext = false;
}
}

@Override
public boolean hasNext() {
return hasNext;
}

@Override
public Entry<LogFileKey,LogFileValue> next() {
Preconditions.checkState(hasNext);
Entry<LogFileKey,LogFileValue> entry = new AbstractMap.SimpleImmutableEntry<>(key, value);

key = new LogFileKey();
value = new LogFileValue();
try {
hasNext = next(key, value);
if (hasNext && end != null && key.compareTo(end) > 0) {
hasNext = false;
}
} catch (IOException e) {
throw new IllegalStateException(e);
}

return entry;
}

@Override
public void remove() {
throw new UnsupportedOperationException("remove");
}
}

@Override
public boolean hasNext() {
return iter.hasNext();
}

@Override
public Entry<LogFileKey,LogFileValue> next() {
return iter.next();
}

@Override
public void remove() {
throw new UnsupportedOperationException("remove");
}

}