Skip to content

Commit

Permalink
Better support for partial buffer reads/writes in translog infrastruc…
Browse files Browse the repository at this point in the history
…ture

Some IO api can return after writing & reading only a part of the requested data. On these rare occasions, we should call the methods again to read/write the rest of the data. This has cause rare translog corruption while writing huge documents on Windows.

Noteful parts of the commit:
- A new Channels class with utility methods for reading and writing to channels
- Writing or reading to channels is added to the forbidden API list
- Added locking to SimpleFsTranslogFile
- Removed FileChannelInputStream which was not used

Closes elastic#6441 , elastic#6576
  • Loading branch information
bleskes committed Jul 2, 2014
1 parent 2805b1f commit a784884
Show file tree
Hide file tree
Showing 19 changed files with 855 additions and 271 deletions.
10 changes: 10 additions & 0 deletions core-signatures.txt
Expand Up @@ -61,3 +61,13 @@ org.apache.lucene.document.Field#<init>(java.lang.String,java.lang.String,org.ap

@defaultMessage Use XNativeFSLockFactory instead of the buggy NativeFSLockFactory see LUCENE-5738 - remove once Lucene 4.9 is released
org.apache.lucene.store.NativeFSLockFactory

@defaultMessage Use Channels.* methods to write to channels. Do not write directly.
java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
java.nio.channels.FileChannel#write(java.nio.ByteBuffer, long)
java.nio.channels.GatheringByteChannel#write(java.nio.ByteBuffer[], int, int)
java.nio.channels.GatheringByteChannel#write(java.nio.ByteBuffer[])
java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
java.nio.channels.ScatteringByteChannel#read(java.nio.ByteBuffer[])
java.nio.channels.ScatteringByteChannel#read(java.nio.ByteBuffer[], int, int)
java.nio.channels.FileChannel#read(java.nio.ByteBuffer, long)
3 changes: 3 additions & 0 deletions pom.xml
Expand Up @@ -1091,6 +1091,9 @@
<!-- start exclude for FilteredQuery -->
<exclude>org/elasticsearch/common/lucene/search/XFilteredQuery.class</exclude>
<!-- end exclude for FilteredQuery -->
<!-- start exclude for Channels utility class -->
<exclude>org/elasticsearch/common/io/Channels.class</exclude>
<!-- end exclude for Channels -->
</excludes>
<bundledSignatures>
<!-- This will automatically choose the right signatures based on 'targetVersion': -->
Expand Down
Expand Up @@ -19,6 +19,15 @@

package org.elasticsearch.common.bytes;

import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
Expand All @@ -28,15 +37,6 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;

import com.google.common.base.Charsets;

/**
*/
public class ByteBufferBytesReference implements BytesReference {
Expand Down Expand Up @@ -86,7 +86,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(buffer);
Channels.writeToChannel(buffer, channel);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/elasticsearch/common/bytes/BytesArray.java
Expand Up @@ -23,14 +23,14 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;

Expand Down Expand Up @@ -109,7 +109,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes, offset, length()));
Channels.writeToChannel(bytes, offset, length(), channel);
}

@Override
Expand Down
Expand Up @@ -20,14 +20,14 @@

import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
import org.jboss.netty.buffer.ChannelBuffer;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;

/**
*/
Expand Down Expand Up @@ -66,7 +66,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
buffer.getBytes(buffer.readerIndex(), channel, length());
Channels.writeToChannel(buffer, buffer.readerIndex(), length(), channel);
}

@Override
Expand Down
Expand Up @@ -21,14 +21,14 @@
import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;

/**
Expand Down Expand Up @@ -77,7 +77,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes));
Channels.writeToChannel(bytes, 0, bytes.length, channel);
}

@Override
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
Expand All @@ -32,7 +33,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;

Expand Down Expand Up @@ -95,7 +95,7 @@ public void writeTo(OutputStream os) throws IOException {

BytesRef ref = new BytesRef();
int written = 0;

// are we a slice?
if (offset != 0) {
// remaining size of page fragment at offset
Expand All @@ -122,53 +122,21 @@ public void writeTo(GatheringByteChannel channel) throws IOException {
return;
}

ByteBuffer[] buffers;
ByteBuffer currentBuffer = null;
int currentLength = length;
int currentOffset = offset;
BytesRef ref = new BytesRef();
int pos = 0;

// are we a slice?
if (offset != 0) {
// remaining size of page fragment at offset
int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
bytearray.get(offset, fragmentSize, ref);
currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, fragmentSize);
pos += fragmentSize;
}

// we only have a single page
if (pos == length && currentBuffer != null) {
channel.write(currentBuffer);
return;
}

// a slice > pagesize will likely require extra buffers for initial/trailing fragments
int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos);

buffers = new ByteBuffer[numBuffers];
int bufferSlot = 0;

if (currentBuffer != null) {
buffers[bufferSlot] = currentBuffer;
bufferSlot++;
}

// handle remainder of pages + trailing fragment
while (pos < length) {
int remaining = length - pos;
int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
bytearray.get(offset + pos, bulkSize, ref);
currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, bulkSize);
buffers[bufferSlot] = currentBuffer;
bufferSlot++;
pos += bulkSize;
while (currentLength > 0) {
// try to align to the underlying pages while writing, so no new arrays will be created.
int fragmentSize = Math.min(currentLength, PAGE_SIZE - (currentOffset % PAGE_SIZE));
boolean newArray = bytearray.get(currentOffset, fragmentSize, ref);
assert !newArray : "PagedBytesReference failed to align with underlying bytearray. offset [" + currentOffset + "], size [" + fragmentSize + "]";
Channels.writeToChannel(ref.bytes, ref.offset, ref.length, channel);
currentLength -= ref.length;
currentOffset += ref.length;
}

// this would indicate that our numBuffer calculation is off by one.
assert (numBuffers == bufferSlot);

// finally write all buffers
channel.write(buffers);
assert currentLength == 0;
}

@Override
Expand Down Expand Up @@ -205,8 +173,7 @@ public BytesArray copyBytesArray() {
if (copied) {
// BigArray has materialized for us, no need to do it again
return new BytesArray(ref.bytes, ref.offset, ref.length);
}
else {
} else {
// here we need to copy the bytes even when shared
byte[] copy = Arrays.copyOfRange(ref.bytes, ref.offset, ref.offset + ref.length);
return new BytesArray(copy);
Expand All @@ -223,7 +190,7 @@ public ChannelBuffer toChannelBuffer() {
ChannelBuffer[] buffers;
ChannelBuffer currentBuffer = null;
BytesRef ref = new BytesRef();
int pos = 0;
int pos = 0;

// are we a slice?
if (offset != 0) {
Expand Down Expand Up @@ -349,10 +316,10 @@ public boolean equals(Object obj) {
}

if (!(obj instanceof PagedBytesReference)) {
return BytesReference.Helper.bytesEqual(this, (BytesReference)obj);
return BytesReference.Helper.bytesEqual(this, (BytesReference) obj);
}

PagedBytesReference other = (PagedBytesReference)obj;
PagedBytesReference other = (PagedBytesReference) obj;
if (length != other.length) {
return false;
}
Expand Down Expand Up @@ -422,7 +389,7 @@ public void readBytes(byte[] b, int bOffset, int len) throws IOException {

@Override
public int read() throws IOException {
return (pos < length) ? bytearray.get(offset + pos++) : -1;
return (pos < length) ? bytearray.get(offset + pos++) : -1;
}

@Override
Expand All @@ -445,7 +412,7 @@ public int read(final byte[] b, final int bOffset, final int len) throws IOExcep

while (copiedBytes < numBytesToCopy) {
long pageFragment = PAGE_SIZE - (byteArrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
int bulkSize = (int)Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
int bulkSize = (int) Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
boolean copied = bytearray.get(byteArrayOffset, bulkSize, ref); // get the fragment
assert (copied == false); // we should never ever get back a materialized byte[]
System.arraycopy(ref.bytes, ref.offset, b, bOffset + copiedBytes, bulkSize); // copy fragment contents
Expand Down

0 comments on commit a784884

Please sign in to comment.