Skip to content

Commit

Permalink
Add translog checksums
Browse files Browse the repository at this point in the history
Switches TranslogStreams to check a header in the file to determine the
translog format, delegating to the version-specific stream.

Version 1 of the translog format writes a header using Lucene's
CodecUtil at the beginning of the file and appends a checksum for each
translog operation written.

Also refactors much of the translog operations, such as merging
.hasNext() and .next() in FsChannelSnapshot

Relates to #6554
  • Loading branch information
dakrone committed Aug 27, 2014
1 parent b745b01 commit eaf3921
Show file tree
Hide file tree
Showing 29 changed files with 1,127 additions and 124 deletions.
Expand Up @@ -72,7 +72,6 @@
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -348,14 +347,13 @@ public GetResult get(Get get) throws EngineException {
if (!get.loadSource()) {
return new GetResult(true, versionValue.version(), null);
}
byte[] data = translog.read(versionValue.translogLocation());
if (data != null) {
try {
Translog.Source source = TranslogStreams.readSource(data);
try {
Translog.Source source = translog.readSource(versionValue.translogLocation());
if (source != null) {
return new GetResult(true, versionValue.version(), source);
} catch (IOException e) {
// switched on us, read it from the reader
}
} catch (IOException e) {
// switched on us, read it from the reader
}
}
}
Expand Down
Expand Up @@ -23,11 +23,11 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -42,6 +42,7 @@
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStream;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.indices.recovery.RecoveryState;
Expand All @@ -50,7 +51,6 @@

import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
Expand All @@ -63,6 +63,8 @@
*/
public class LocalIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {

private static final int RECOVERY_TRANSLOG_RENAME_RETRIES = 3;

private final ThreadPool threadPool;
private final MappingUpdatedAction mappingUpdatedAction;
private final IndexService indexService;
Expand Down Expand Up @@ -198,7 +200,7 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
if (!tmpRecoveringFile.exists()) {
File tmpTranslogFile = new File(translogLocation, translogName);
if (tmpTranslogFile.exists()) {
for (int i = 0; i < 3; i++) {
for (int i = 0; i < RECOVERY_TRANSLOG_RENAME_RETRIES; i++) {
if (tmpTranslogFile.renameTo(tmpRecoveringFile)) {
recoveringTranslogFile = tmpRecoveringFile;
break;
Expand Down Expand Up @@ -228,17 +230,15 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro

recoveryState.getTranslog().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
FileInputStream fs = null;
TranslogStream stream = null;

final Set<String> typesToUpdate = Sets.newHashSet();
try {
fs = new FileInputStream(recoveringTranslogFile);
InputStreamStreamInput si = new InputStreamStreamInput(fs);
stream = TranslogStreams.translogStreamFor(recoveringTranslogFile);
while (true) {
Translog.Operation operation;
try {
int opSize = si.readInt();
operation = TranslogStreams.readTranslogOperation(si);
operation = stream.read();
} catch (EOFException e) {
// ignore, not properly written the last op
break;
Expand Down Expand Up @@ -269,7 +269,7 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
throw new IndexShardGatewayRecoveryException(shardId, "failed to recover shard", e);
} finally {
try {
fs.close();
IOUtils.close(stream);
} catch (IOException e) {
// ignore
}
Expand Down
@@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.translog;

import org.apache.lucene.store.BufferedChecksum;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

/**
* Similar to Lucene's BufferedChecksumIndexInput, however this wraps a
* {@link StreamInput} so anything read will update the checksum
*/
public final class BufferedChecksumStreamInput extends StreamInput {
private final StreamInput in;
private final Checksum digest;

public BufferedChecksumStreamInput(StreamInput in) {
this.in = in;
this.digest = new BufferedChecksum(new CRC32());
}

public long getChecksum() {
return this.digest.getValue();
}

@Override
public byte readByte() throws IOException {
final byte b = in.readByte();
digest.update(b);
return b;
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readBytes(b, offset, len);
digest.update(b, offset, len);
}

@Override
public void reset() throws IOException {
in.reset();
digest.reset();
}

@Override
public int read() throws IOException {
return readByte() & 0xFF;
}

@Override
public void close() throws IOException {
in.close();
}
}
@@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.translog;

import org.apache.lucene.store.BufferedChecksum;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

/**
* Similar to Lucene's BufferedChecksumIndexOutput, however this wraps a
* {@link StreamOutput} so anything written will update the checksum
*/
public final class BufferedChecksumStreamOutput extends StreamOutput {
private final StreamOutput out;
private final Checksum digest;

public BufferedChecksumStreamOutput(StreamOutput out) {
this.out = out;
this.digest = new BufferedChecksum(new CRC32());
}

public long getChecksum() {
return this.digest.getValue();
}

@Override
public void writeByte(byte b) throws IOException {
out.writeByte(b);
digest.update(b);
}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
out.writeBytes(b, offset, length);
digest.update(b, offset, length);
}

@Override
public void flush() throws IOException {
out.flush();
}

@Override
public void close() throws IOException {
out.close();
}

@Override
public void reset() throws IOException {
out.reset();
digest.reset();
}
}

0 comments on commit eaf3921

Please sign in to comment.