Skip to content

Commit

Permalink
Fix reading of legacy streams and improve handling of translog trunca…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
bleskes committed Apr 16, 2015
1 parent da284d6 commit fa27693
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 33 deletions.
14 changes: 7 additions & 7 deletions src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Expand Up @@ -49,7 +49,6 @@
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TruncatedTranslogException;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -159,8 +158,13 @@ public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogReco
}
}
int opsRecovered = recoverFromTranslog();
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]", opsRecovered);
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
if (opsRecovered > 0 ||
(committedTranslogId != null && translog.currentId() != committedTranslogId)
) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, committedTranslogId, translog.currentId());
flush(true, true);
}
}
Expand All @@ -184,7 +188,6 @@ public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogReco
/**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null.
*
*/
private Long loadCommittedTranslogId(IndexWriter writer, Translog translog) throws IOException {
// commit on a just opened writer will commit even if there are no changes done to it
Expand Down Expand Up @@ -1125,9 +1128,6 @@ protected int recoverFromTranslog() throws IOException {
}
}
}
} catch (TruncatedTranslogException e) {
// file is empty or header has been half-written and should be ignored
logger.trace("ignoring truncation exception, the translog is either empty or half-written", e);
} catch (Throwable e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
Expand Down
Expand Up @@ -70,12 +70,12 @@ public Translog.Operation read(StreamInput inStream) throws IOException {
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
operation = TranslogStreams.newOperationFromType(type);
operation.readFrom(in);
verifyChecksum(in);
} catch (EOFException e) {
throw new TruncatedTranslogException("reached premature end of file, translog is truncated", e);
} catch (AssertionError|Exception e) {
throw new TranslogCorruptedException("translog corruption while reading from stream", e);
}
verifyChecksum(in);
return operation;
}

Expand Down
Expand Up @@ -38,6 +38,10 @@ public class LegacyTranslogStream implements TranslogStream {

@Override
public Translog.Operation read(StreamInput in) throws IOException {
// read the opsize before an operation.
// Note that this was written & read out side of the stream when this class was used, but it makes things more consistent
// to read this here
in.readInt();
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
Translog.Operation operation = TranslogStreams.newOperationFromType(type);
operation.readFrom(in);
Expand All @@ -46,6 +50,7 @@ public Translog.Operation read(StreamInput in) throws IOException {

@Override
public void write(StreamOutput out, Translog.Operation op) throws IOException {
// nocommit: do we want to throw an UnsupportedOperationException?
out.writeByte(op.opType().id());
op.writeTo(out);
}
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -64,7 +63,7 @@ public long firstPosition() {

abstract public int estimatedTotalOperations();

public Translog.Operation readOp(Translog.Location location) {
public Translog.Operation readOp(Translog.Location location) throws IOException {
assert location.translogId == id : "read location's translog id [" + location.translogId + "] is not [" + id + "]";
ByteBuffer buffer = ByteBuffer.allocate(location.size);
return readOp(buffer, location.translogLocation, location.size);
Expand All @@ -74,7 +73,7 @@ public Translog.Operation readOp(Translog.Location location) {
* reads an operation from the given position and changes to point at the start of the next
* operation
*/
public Translog.Operation readOpAndAdvancePosition(ByteBuffer reusableBuffer, AtomicLong position) {
public Translog.Operation readOpAndAdvancePosition(ByteBuffer reusableBuffer, AtomicLong position) throws IOException {
final long pos = position.get();
int opSize = readOpSize(reusableBuffer, pos);
Translog.Operation op = readOp(reusableBuffer, pos, opSize);
Expand All @@ -101,8 +100,7 @@ protected int readOpSize(ByteBuffer reusableBuffer, long position) {
* reads an operation at the given position and returns it. The buffer length is equal to the number
* of bytes reads.
*/
protected Translog.Operation readOp(ByteBuffer reusableBuffer, long position, int opSize) {
try {
protected Translog.Operation readOp(ByteBuffer reusableBuffer, long position, int opSize) throws IOException {
final ByteBuffer buffer;
if (reusableBuffer.capacity() >= opSize) {
buffer = reusableBuffer;
Expand All @@ -113,10 +111,7 @@ protected Translog.Operation readOp(ByteBuffer reusableBuffer, long position, in
buffer.limit(opSize);
readBytes(buffer, position);
BytesArray bytesArray = new BytesArray(buffer.array(), 0, buffer.limit());
return TranslogStreams.readTranslogOperation(bytesArray.streamInput());
} catch (IOException e) {
throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.channelReference.file(), e);
}
return channelReference.stream().read(bytesArray.streamInput());
}

/**
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.index.translog.Translog;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -44,12 +45,15 @@ public FsChannelSnapshot(FsChannelReader reader) {
this.positon = new AtomicLong(reader.firstPosition());
}

public long translogId() {
return reader.translogId();
}

public int estimatedTotalOperations() {
return reader.estimatedTotalOperations();
}

public Translog.Operation next(ByteBuffer reusableBuffer) {
public Translog.Operation next(ByteBuffer reusableBuffer) throws IOException {
if (positon.get() >= reader.sizeInBytes()) {
return null;
}
Expand Down
Expand Up @@ -414,6 +414,8 @@ public Translog.Operation read(Location location) {
}
}
return reader == null ? null : reader.readOp(location);
} catch (IOException e) {
throw new ElasticsearchException("failed to read source from translog location " + location, e);
}
}

Expand Down Expand Up @@ -456,7 +458,7 @@ private Snapshot createdSnapshot(FsChannelReader... translogs) {
for (FsChannelReader translog : translogs) {
channelSnapshots.add(translog.snapshot());
}
Snapshot snapshot = new FsTranslogSnapshot(channelSnapshots);
Snapshot snapshot = new FsTranslogSnapshot(channelSnapshots, logger);
success = true;
return snapshot;
} finally {
Expand Down
Expand Up @@ -21,7 +21,9 @@

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TruncatedTranslogException;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -31,19 +33,18 @@
public class FsTranslogSnapshot implements Translog.Snapshot {

private final List<FsChannelSnapshot> orderedTranslogs;

private final ESLogger logger;
private final ByteBuffer cacheBuffer;

private AtomicBoolean closed = new AtomicBoolean(false);

private final int estimatedTotalOperations;

/**
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot.
*/
public FsTranslogSnapshot(List<FsChannelSnapshot> orderedTranslogs) {
public FsTranslogSnapshot(List<FsChannelSnapshot> orderedTranslogs, ESLogger logger) {
this.orderedTranslogs = orderedTranslogs;
this.logger = logger;
int ops = 0;
for (FsChannelSnapshot translog : orderedTranslogs) {

Expand All @@ -65,10 +66,16 @@ public int estimatedTotalOperations() {
}

@Override
public Translog.Operation next() {
public Translog.Operation next() throws IOException {
while (orderedTranslogs.isEmpty() == false) {
final FsChannelSnapshot current = orderedTranslogs.get(0);
Translog.Operation op = current.next(cacheBuffer);
Translog.Operation op = null;
try {
op = current.next(cacheBuffer);
} catch (TruncatedTranslogException e) {
// file is empty or header has been half-written and should be ignored
logger.trace("ignoring truncation exception, the translog [{}] is either empty or half-written", e, current.translogId());
}
if (op != null) {
return op;
}
Expand Down
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.index.merge.NoMergePolicyProvider;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
Expand All @@ -60,13 +59,7 @@
import java.io.InputStream;
import java.lang.reflect.Modifier;
import java.net.URL;
import java.nio.file.DirectoryStream;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;

Expand Down Expand Up @@ -384,6 +377,7 @@ void assertDeleteByQueryWorked(String indexName, Version version) throws Excepti
// TODO: remove this once #10262 is fixed
return;
}
// these documents are supposed to be deleted by a delete by query operation in the translog
SearchRequestBuilder searchReq = client().prepareSearch(indexName).setQuery(QueryBuilders.queryStringQuery("long_sort:[10 TO 20]"));
assertEquals(0, searchReq.get().getHits().getTotalHits());
}
Expand Down

0 comments on commit fa27693

Please sign in to comment.