Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better support for partial buffer reads/writes in translog infrastructure #6576

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions core-signatures.txt
Expand Up @@ -61,3 +61,13 @@ org.apache.lucene.document.Field#<init>(java.lang.String,java.lang.String,org.ap

@defaultMessage Use XNativeFSLockFactory instead of the buggy NativeFSLockFactory see LUCENE-5738 - remove once Lucene 4.9 is released
org.apache.lucene.store.NativeFSLockFactory

@defaultMessage Use Channels.* methods to write to channels. Do not write directly.
java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
java.nio.channels.FileChannel#write(java.nio.ByteBuffer, long)
java.nio.channels.GatheringByteChannel#write(java.nio.ByteBuffer[], int, int)
java.nio.channels.GatheringByteChannel#write(java.nio.ByteBuffer[])
java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
java.nio.channels.ScatteringByteChannel#read(java.nio.ByteBuffer[])
java.nio.channels.ScatteringByteChannel.read(java.nio.ByteBuffer[], int, int)
java.nio.channels.FileChannel#read(java.nio.ByteBuffer, long)
3 changes: 3 additions & 0 deletions pom.xml
Expand Up @@ -1167,6 +1167,9 @@
<!-- start exclude for FilteredQuery -->
<exclude>org/elasticsearch/common/lucene/search/XFilteredQuery.class</exclude>
<!-- end exclude for FilteredQuery -->
<!-- start exclude for Channels utility class -->
<exclude>org/elasticsearch/common/io/Channels.class</exclude>
<!-- end exclude for Channels -->
</excludes>
<bundledSignatures>
<!-- This will automatically choose the right signatures based on 'targetVersion': -->
Expand Down
Expand Up @@ -19,6 +19,15 @@

package org.elasticsearch.common.bytes;

import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
Expand All @@ -28,15 +37,6 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;

import com.google.common.base.Charsets;

/**
*/
public class ByteBufferBytesReference implements BytesReference {
Expand Down Expand Up @@ -86,7 +86,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(buffer);
Channels.writeToChannel(buffer, channel);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/elasticsearch/common/bytes/BytesArray.java
Expand Up @@ -23,14 +23,14 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;

Expand Down Expand Up @@ -109,7 +109,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes, offset, length()));
Channels.writeToChannel(bytes, offset, length(), channel);
}

@Override
Expand Down
Expand Up @@ -20,14 +20,14 @@

import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
import org.jboss.netty.buffer.ChannelBuffer;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;

/**
*/
Expand Down Expand Up @@ -66,7 +66,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
buffer.getBytes(buffer.readerIndex(), channel, length());
Channels.writeToChannel(buffer, buffer.readerIndex(), length(), channel);
}

@Override
Expand Down
Expand Up @@ -21,14 +21,14 @@
import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;

/**
Expand Down Expand Up @@ -77,7 +77,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes));
Channels.writeToChannel(bytes, 0, bytes.length, channel);
}

@Override
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
Expand All @@ -32,7 +33,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;

Expand Down Expand Up @@ -95,7 +95,7 @@ public void writeTo(OutputStream os) throws IOException {

BytesRef ref = new BytesRef();
int written = 0;

// are we a slice?
if (offset != 0) {
// remaining size of page fragment at offset
Expand All @@ -122,53 +122,21 @@ public void writeTo(GatheringByteChannel channel) throws IOException {
return;
}

ByteBuffer[] buffers;
ByteBuffer currentBuffer = null;
int currentLength = length;
int currentOffset = offset;
BytesRef ref = new BytesRef();
int pos = 0;

// are we a slice?
if (offset != 0) {
// remaining size of page fragment at offset
int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
bytearray.get(offset, fragmentSize, ref);
currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, fragmentSize);
pos += fragmentSize;
}

// we only have a single page
if (pos == length && currentBuffer != null) {
channel.write(currentBuffer);
return;
}

// a slice > pagesize will likely require extra buffers for initial/trailing fragments
int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos);

buffers = new ByteBuffer[numBuffers];
int bufferSlot = 0;

if (currentBuffer != null) {
buffers[bufferSlot] = currentBuffer;
bufferSlot++;
}

// handle remainder of pages + trailing fragment
while (pos < length) {
int remaining = length - pos;
int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
bytearray.get(offset + pos, bulkSize, ref);
currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, bulkSize);
buffers[bufferSlot] = currentBuffer;
bufferSlot++;
pos += bulkSize;
while (currentLength > 0) {
// try to align to the underlying pages while writing, so no new arrays will be created.
int fragmentSize = Math.min(currentLength, PAGE_SIZE - (currentOffset % PAGE_SIZE));
boolean newArray = bytearray.get(currentOffset, fragmentSize, ref);
assert !newArray : "PagedBytesReference failed to align with underlying bytearray. offset [" + currentOffset + "], size [" + fragmentSize + "]";
Channels.writeToChannel(ref.bytes, ref.offset, ref.length, channel);
currentLength -= ref.length;
currentOffset += ref.length;
}

// this would indicate that our numBuffer calculation is off by one.
assert (numBuffers == bufferSlot);

// finally write all buffers
channel.write(buffers);
assert currentLength == 0;
}

@Override
Expand Down Expand Up @@ -205,8 +173,7 @@ public BytesArray copyBytesArray() {
if (copied) {
// BigArray has materialized for us, no need to do it again
return new BytesArray(ref.bytes, ref.offset, ref.length);
}
else {
} else {
// here we need to copy the bytes even when shared
byte[] copy = Arrays.copyOfRange(ref.bytes, ref.offset, ref.offset + ref.length);
return new BytesArray(copy);
Expand All @@ -223,7 +190,7 @@ public ChannelBuffer toChannelBuffer() {
ChannelBuffer[] buffers;
ChannelBuffer currentBuffer = null;
BytesRef ref = new BytesRef();
int pos = 0;
int pos = 0;

// are we a slice?
if (offset != 0) {
Expand Down Expand Up @@ -349,10 +316,10 @@ public boolean equals(Object obj) {
}

if (!(obj instanceof PagedBytesReference)) {
return BytesReference.Helper.bytesEqual(this, (BytesReference)obj);
return BytesReference.Helper.bytesEqual(this, (BytesReference) obj);
}

PagedBytesReference other = (PagedBytesReference)obj;
PagedBytesReference other = (PagedBytesReference) obj;
if (length != other.length) {
return false;
}
Expand Down Expand Up @@ -422,7 +389,7 @@ public void readBytes(byte[] b, int bOffset, int len) throws IOException {

@Override
public int read() throws IOException {
return (pos < length) ? bytearray.get(offset + pos++) : -1;
return (pos < length) ? bytearray.get(offset + pos++) : -1;
}

@Override
Expand All @@ -445,7 +412,7 @@ public int read(final byte[] b, final int bOffset, final int len) throws IOExcep

while (copiedBytes < numBytesToCopy) {
long pageFragment = PAGE_SIZE - (byteArrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
int bulkSize = (int)Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
int bulkSize = (int) Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
boolean copied = bytearray.get(byteArrayOffset, bulkSize, ref); // get the fragment
assert (copied == false); // we should never ever get back a materialized byte[]
System.arraycopy(ref.bytes, ref.offset, b, bOffset + copiedBytes, bulkSize); // copy fragment contents
Expand Down