Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add translog checksums #7232

Merged
merged 1 commit into from Aug 27, 2014

Conversation

Projects
None yet
4 participants
@dakrone
Copy link
Member

dakrone commented Aug 12, 2014

Switches TranslogStreams to check a header in the file to determine the
translog format, delegating to the version-specific stream.

Version 1 of the translog format writes a 0xffff_ffff_0000_0001 header
at the beginning of the file and appends a checksum for each translog
operation written.

Also refactors much of the translog operations, such as merging
.hasNext() and .next() in FsChannelSnapshot

Relates to #6554

@dakrone dakrone added v1.4.0 labels Aug 12, 2014

@s1monw

View changes

src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java Outdated
@@ -269,7 +268,9 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
throw new IndexShardGatewayRecoveryException(shardId, "failed to recover shard", e);
} finally {
try {
fs.close();
if (stream != null) {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 20, 2014

Contributor

maybe use IOUtils.close() handles null values

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/ChecksumHelper.java Outdated
/**
* A CRC32 checksum class that has helpers for adding strings and numeric types
*/
public class ChecksumHelper {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 20, 2014

Contributor

I wonder if we should just subclass IndexOutput it has all these methods to turn stuff into bytes etc. and we can just delegate to /dev/null inside the subclass ;) hope this makes sense?

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 21, 2014

Author Member

I think I can create a CheckedOutputStream and do this delegating to the original stream, and I think that's kind of what you meant? Either way it should clean it up and use Lucene's classes for this instead of our own.

boolean hasNext();

/**
* Returns the next operation, or null when no more operations are found

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 20, 2014

Contributor

that's great!

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/Translog.java Outdated
@@ -293,6 +305,12 @@ public long estimateSize() {
return ((id.length() + type.length()) * 2) + source.length() + 12;
}

@Override
public long checksum() {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 20, 2014

Contributor

I wonder if we need to do this or if we can just wrap the StreamOutput on writeTo when we write the operation in TranslogStreams#writeTranslogOperation

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 21, 2014

Author Member

Yea I'm going to try this with CheckedOutputStream.

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreamVersion1.java Outdated
*/
public class TranslogStreamVersion1 implements VersionedTranslogStream {

public static final long HEADER = 0xffff_ffff_0000_0001L;

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 20, 2014

Contributor

can we use the CodecUtil class instead of this here? We already have a standard way in Lucene to do this

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreams.java Outdated
}

try (InputStreamStreamInput in = new InputStreamStreamInput(new FileInputStream(translogFile))) {
long header = in.readLong();

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 20, 2014

Contributor

hmm what if the old translog had this first long too? I mean there is a potential for corruption no?

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 20, 2014

Contributor

I wonder if we should enable this only for new indices that we know are created with es 1.4

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 20, 2014

Author Member

The reason I made the header 0xffff_ffff_0000_0001 was so I could ensure that no existing translog would ever start with this, since for non-checksummed translogs the very first .readByte() should never be negative (or else the translog is corrupted and will throw ElasticsearchIllegalArgumentException("No type mapped for [" + id + "]");). So I know that the old translog will never have this HEADER long as the first long value.

The prefix 0xffff_ffff is intended to be a marker meaning "this is a versioned translog stream" and the suffix _0000_0001 is the version number. The next translog version would have the header 0xffff_ffff_0000_0002.

As for enabling this only for new indices, I'm not sure what that would give us? We already create new translog files all the time (deleting the old ones) for an existing index.

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 21, 2014

Contributor

alright, this explains it. I think we can still use CodecUtil from lucene and just use the CODEC_MAGIC since its first byte has the same properties. I think we should document this though.

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 21, 2014

Author Member

The first byte doesn't have the same properties, since Lucene writes 0x3FD76C17 a non-versioned translog can read an Operation.Type of 63 (which thankfully we don't have, we only go up to 4). I personally think -1 is safer but I don't think we'll get to 63 different translog operations any time soon, so the Lucene version should work as well :)

I'll make this change.

@s1monw

This comment has been minimized.

Copy link
Contributor

s1monw commented Aug 20, 2014

I left a bunch of comments. I personally think we should add the checksum calculation on the TranslogStream level somehow instead of doing it on each operation. we should have an StreamOutput impl that calculates the checksum as well as a StreamInput that makes it entirely transparent. I am also worried about the potential collision of the header so I think we should make use of the index version to detect if we can use the new version of the translog

@rmuir

View changes

src/main/java/org/elasticsearch/index/translog/ChecksumHelper.java Outdated
* A CRC32 checksum class that has helpers for adding strings and numeric types
*/
public class ChecksumHelper {
private final Checksum checksum = new CRC32();

This comment has been minimized.

Copy link
@rmuir

rmuir Aug 20, 2014

Contributor

each invocation of CRC32 is pretty slow: if we do it byte-by-byte its a native call, and we cant use vectorized bulk processing. can you try:

import org.apache.lucene.store.BufferedChecksum;
...
private final Checksum checksum = new BufferedChecksum(new CRC32());

This should improve performance significantly.

@rmuir

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreamVersion1.java Outdated
} catch (AssertionError|Exception e) {
throw new TranslogCorruptedException("translog corruption while reading from byte array", e);
}
verifyChecksum(in.readLong(), operation.checksum());

This comment has been minimized.

Copy link
@rmuir

rmuir Aug 20, 2014

Contributor

Since we've versioned the whole thing, I think we should save 4 bytes and just write the crc32 (its only a long because of checksum interface in java). You just have to take care to not mess up with sign bit:

out.writeInt((int)checksum64);
...
checksum64 = in.readInt() & 0xFFFFFFFFL;

@s1monw s1monw removed the review label Aug 21, 2014

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreamVersion1.java Outdated

@Override
public int writeHeader(FileChannel channel) throws IOException {
OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(channel));

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 21, 2014

Contributor

should we put a comment here that we don't close on purpose?

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 21, 2014

Author Member

Yes good idea, always good to make the intent known, I'll do that.

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreams.java Outdated
} catch (CorruptIndexException e) {
// Lucene's CodecUtil throws a CorruptIndexException if there is
// no header, here we assume translog version 0
return new TranslogStreamVersion0(translogFile);

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 21, 2014

Contributor

in this case is it worth peaking at the file again and check if the first byte is valid even for version 0? maybe we should do that check first and then move to V1 and fail hard if we see a CorruptIndexExp

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreams.java Outdated
* @throws IOException
*/
public static VersionedTranslogStream translogStreamFor(File translogFile) throws IOException {
if (translogFile.exists() == false || translogFile.length() == 0) {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 21, 2014

Contributor

I wonder if we should pass a boolean to this to signal if we should create the latest stream?

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 22, 2014

Author Member

I'm not sure how helpful that would be? Ideally the the caller would never know there are any different versions. What do you have in mind as a use for the boolean?

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

I'd thorw an exception if the translog file does not exists and if the caller expected it etc?

@dakrone dakrone force-pushed the dakrone:feature/translog-checksums branch Aug 25, 2014

@dakrone dakrone added the review label Aug 25, 2014

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java Outdated
* Similar to Lucene's BufferedChecksumIndexOutput, however this wraps a
* {@link StreamOutput} so anything written will update the checksum
*/
public class BufferedChecksumStreamOutput extends StreamOutput {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

does it make sense to make these classes final?

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 25, 2014

Author Member

Yes it definitely does, I'll change that.

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java Outdated
* {@link StreamOutput} so anything written will update the checksum
*/
public class BufferedChecksumStreamOutput extends StreamOutput {
final StreamOutput out;

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

any reason why this is not private?

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 25, 2014

Author Member

No reason, it should be private, I'll change this.

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreamVersion1.java Outdated
* Version 1 of the translog file format. Writes a header to identify the
* format, also writes checksums for each operation
*/
public class TranslogStreamVersion1 implements VersionedTranslogStream {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

can we name those differently maybe ChecksummedTranslogStream and LegacyTranslogStream

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreams.java Outdated
return new TranslogStreamVersion1(translogFile);
}

InputStreamDataInput in = new InputStreamDataInput(new FileInputStream(translogFile));

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

can this be in try-with logic.... you are not closing this input stream at all

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

I wonder if we can actually pass that InputStreamDataInput directly to TranslogStreamVersionX ctors instead of opening and closing them... we can simply seek backwards no?

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreamVersion0.java Outdated
this.in = null;
}

TranslogStreamVersion0(File file) throws IOException {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

it really feels wrong to pass a file here... this should be accpting a stream instead IMO I added another comment further below too where you use these ctors...

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 25, 2014

Author Member

I see what you're saying, I need a way to tell whether the file exists however, so I'll try to come up with a better way to do this.

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/TranslogStreams.java Outdated
default:
throw new TranslogCorruptedException("No known translog stream version: " + version);
}
} else if (b1 == 0x00) {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

can you make those magic numbers constants that are documented and readable? we might also use a switch here instead (style question I just like swtich better...)

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/VersionedTranslogStream.java Outdated
* A translog stream that will read and write operations in the
* version-specific format
*/
public interface VersionedTranslogStream extends Closeable {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

can we just name this TranslogStream versions sounds like fast or smart :) or do we have an UnversionedTranslogStream?

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/VersionedTranslogStream.java Outdated
* Read the next operation from the translog file, the stream <b>must</b>
* have been created through {@link TranslogStreams#translogStreamFor(java.io.File)}
*/
public Translog.Operation readNextOperation() throws IOException;

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

can we name this read?

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/VersionedTranslogStream.java Outdated
/**
* Read the next translog operation from the input stream
*/
public Translog.Operation readTranslogOperation(StreamInput in) throws IOException;

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

can we name this read?

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/VersionedTranslogStream.java Outdated
/**
* Write the given translog operation to the output stream
*/
public void writeTranslogOperation(StreamOutput out, Translog.Operation op) throws IOException;

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

can we name this write?

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java Outdated
return false;
return TranslogStreams.readTranslogOperation(new BytesStreamInput(cacheBuffer.array(), 0, opSize, true));
} catch (IOException e) {
throw new ElasticsearchException("unexpected Exception reading from translog snapshot of " + this.raf.file(), e);

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

can we spell Exception lowercase in the error message?

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java Outdated
* the transient or current translog does not match, returns null
*/
private FsTranslogFile translogForLocation(Location location) {
if (this.trans != null && trans.id() == location.translogId) {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

I think you can remove the this qualifier for consistency?

@s1monw

View changes

src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java Outdated
public byte[] read(Location location) {
rwl.readLock().lock();
try {
FsTranslogFile trans = this.trans;
if (trans != null && trans.id() == location.translogId) {
FsTranslogFile trans = this.translogForLocation(location);

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

remove the this?

@@ -68,7 +69,12 @@ public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
operations = Lists.newArrayListWithExpectedSize(size);
for (int i = 0; i < size; i++) {
operations.add(TranslogStreams.readTranslogOperation(in));
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

so if we read from a 1.4 node but that nodes has a 1.3 translog will it still send a V1 operation?

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 26, 2014

Author Member

So even if the 1.4 node has a 1.3 translog, it can calculate and send the checksums for the operation to another 1.4 node, they just won't be in the actual file (which I think is pretty sweet!). This doesn't deal with the header check so it's okay.

public class TranslogVersionTests extends ElasticsearchTestCase {

@Test
public void testV0LegacyTranslogVersion() throws Exception {

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

can we randomize this test a bit more?

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 25, 2014

Contributor

I'd love to see a test that corrupts a random bit/byte in the file and we should detect it?

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 25, 2014

Author Member

I have a test for this in AbstractSimpleTranslogTests.testTranslogChecksums that randomly corrupts a translog file on disk and makes sure it is detected.

This comment has been minimized.

Copy link
@s1monw

s1monw Aug 26, 2014

Contributor

cool stuff I didn't see that one!

@s1monw

This comment has been minimized.

Copy link
Contributor

s1monw commented Aug 25, 2014

I left some more comments, I think it's close though... I would really really like to see a test that does corrupt a translog on a node and then checks that we handle it correctly... I am not even sure what the correct behavior is but IMO we should fail the shard?

@s1monw s1monw removed the review label Aug 25, 2014

}

@Override
public int read() throws IOException {

This comment has been minimized.

Copy link
@rmuir

rmuir Aug 25, 2014

Contributor

What is the contract of this method? Usually (e.g. java InputStream) its that it returns an integer value between 0 and 255. So I think this is missing & 0xFF ?

This comment has been minimized.

Copy link
@dakrone

dakrone Aug 26, 2014

Author Member

Okay, added this, thanks!

@dakrone dakrone added the review label Aug 26, 2014

@dakrone

This comment has been minimized.

Copy link
Member Author

dakrone commented Aug 27, 2014

@s1monw added an integration test for this.

@s1monw

This comment has been minimized.

Copy link
Contributor

s1monw commented Aug 27, 2014

@dakrone this test looks awesome. Can we maybe add that it tells you if it corrupted a replica and if so we make sure that the replica is allocated on another node? if you wanna do that afterwards I am ok with pushing this as it is...

@s1monw s1monw removed the review label Aug 27, 2014

@dakrone dakrone force-pushed the dakrone:feature/translog-checksums branch 3 times, most recently to eaf3921 Aug 27, 2014

Add translog checksums
Switches TranslogStreams to check a header in the file to determine the
translog format, delegating to the version-specific stream.

Version 1 of the translog format writes a header using Lucene's
CodecUtil at the beginning of the file and appends a checksum for each
translog operation written.

Also refactors much of the translog operations, such as merging
.hasNext() and .next() in FsChannelSnapshot

Relates to #6554

@dakrone dakrone merged commit eaf3921 into elastic:master Aug 27, 2014

@clintongormley clintongormley changed the title Add translog checksums Resiliency: Add translog checksums Sep 8, 2014

@dakrone dakrone deleted the dakrone:feature/translog-checksums branch Sep 9, 2014

@clintongormley clintongormley changed the title Resiliency: Add translog checksums Add translog checksums Jun 7, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.