Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better support for partial buffer reads/writes in translog infrastructure #6576

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,6 +19,15 @@

package org.elasticsearch.common.bytes;

import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
Expand All @@ -28,15 +37,6 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;

import com.google.common.base.Charsets;

/**
*/
public class ByteBufferBytesReference implements BytesReference {
Expand Down Expand Up @@ -86,7 +86,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(buffer);
Channels.writeToChannel(buffer, channel);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/elasticsearch/common/bytes/BytesArray.java
Expand Up @@ -23,14 +23,14 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;

Expand Down Expand Up @@ -109,7 +109,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes, offset, length()));
Channels.writeToChannel(bytes, offset, length(), channel);
}

@Override
Expand Down
Expand Up @@ -20,14 +20,14 @@

import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
import org.jboss.netty.buffer.ChannelBuffer;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;

/**
*/
Expand Down Expand Up @@ -66,7 +66,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
buffer.getBytes(buffer.readerIndex(), channel, length());
Channels.writeToChannel(buffer, buffer.readerIndex(), length(), channel);
}

@Override
Expand Down
Expand Up @@ -21,14 +21,14 @@
import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;

/**
Expand Down Expand Up @@ -77,7 +77,7 @@ public void writeTo(OutputStream os) throws IOException {

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes));
Channels.writeToChannel(bytes, 0, bytes.length, channel);
}

@Override
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
Expand All @@ -32,7 +33,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;

Expand Down Expand Up @@ -95,7 +95,7 @@ public void writeTo(OutputStream os) throws IOException {

BytesRef ref = new BytesRef();
int written = 0;

// are we a slice?
if (offset != 0) {
// remaining size of page fragment at offset
Expand All @@ -122,53 +122,21 @@ public void writeTo(GatheringByteChannel channel) throws IOException {
return;
}

ByteBuffer[] buffers;
ByteBuffer currentBuffer = null;
int currentLength = length;
int currentOffset = offset;
BytesRef ref = new BytesRef();
int pos = 0;

// are we a slice?
if (offset != 0) {
// remaining size of page fragment at offset
int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
bytearray.get(offset, fragmentSize, ref);
currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, fragmentSize);
pos += fragmentSize;
}

// we only have a single page
if (pos == length && currentBuffer != null) {
channel.write(currentBuffer);
return;
}

// a slice > pagesize will likely require extra buffers for initial/trailing fragments
int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos);

buffers = new ByteBuffer[numBuffers];
int bufferSlot = 0;

if (currentBuffer != null) {
buffers[bufferSlot] = currentBuffer;
bufferSlot++;
}

// handle remainder of pages + trailing fragment
while (pos < length) {
int remaining = length - pos;
int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
bytearray.get(offset + pos, bulkSize, ref);
currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, bulkSize);
buffers[bufferSlot] = currentBuffer;
bufferSlot++;
pos += bulkSize;
while (currentLength > 0) {
// try to align to the underlying pages while writing, so no new arrays will be created.
int fragmentSize = Math.min(currentLength, PAGE_SIZE - (offset % PAGE_SIZE));
boolean newArray = bytearray.get(currentOffset, fragmentSize, ref);
assert !newArray : "PagedBytesReference failed to align with underlying. offset [" + currentOffset + "], size [" + fragmentSize + "]";
Channels.writeToChannel(ref.bytes, ref.offset, ref.length, channel);
currentLength -= ref.length;
currentOffset += ref.length;
}

// this would indicate that our numBuffer calculation is off by one.
assert (numBuffers == bufferSlot);

// finally write all buffers
channel.write(buffers);
assert currentLength == 0;
}

@Override
Expand Down Expand Up @@ -205,8 +173,7 @@ public BytesArray copyBytesArray() {
if (copied) {
// BigArray has materialized for us, no need to do it again
return new BytesArray(ref.bytes, ref.offset, ref.length);
}
else {
} else {
// here we need to copy the bytes even when shared
byte[] copy = Arrays.copyOfRange(ref.bytes, ref.offset, ref.offset + ref.length);
return new BytesArray(copy);
Expand All @@ -223,7 +190,7 @@ public ChannelBuffer toChannelBuffer() {
ChannelBuffer[] buffers;
ChannelBuffer currentBuffer = null;
BytesRef ref = new BytesRef();
int pos = 0;
int pos = 0;

// are we a slice?
if (offset != 0) {
Expand Down Expand Up @@ -349,10 +316,10 @@ public boolean equals(Object obj) {
}

if (!(obj instanceof PagedBytesReference)) {
return BytesReference.Helper.bytesEqual(this, (BytesReference)obj);
return BytesReference.Helper.bytesEqual(this, (BytesReference) obj);
}

PagedBytesReference other = (PagedBytesReference)obj;
PagedBytesReference other = (PagedBytesReference) obj;
if (length != other.length) {
return false;
}
Expand Down Expand Up @@ -422,7 +389,7 @@ public void readBytes(byte[] b, int bOffset, int len) throws IOException {

@Override
public int read() throws IOException {
return (pos < length) ? bytearray.get(offset + pos++) : -1;
return (pos < length) ? bytearray.get(offset + pos++) : -1;
}

@Override
Expand All @@ -445,7 +412,7 @@ public int read(final byte[] b, final int bOffset, final int len) throws IOExcep

while (copiedBytes < numBytesToCopy) {
long pageFragment = PAGE_SIZE - (byteArrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
int bulkSize = (int)Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
int bulkSize = (int) Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
boolean copied = bytearray.get(byteArrayOffset, bulkSize, ref); // get the fragment
assert (copied == false); // we should never ever get back a materialized byte[]
System.arraycopy(ref.bytes, ref.offset, b, bOffset + copiedBytes, bulkSize); // copy fragment contents
Expand Down
106 changes: 106 additions & 0 deletions src/main/java/org/elasticsearch/common/io/Channels.java
@@ -0,0 +1,106 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.io;

import org.jboss.netty.buffer.ChannelBuffer;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;

public abstract class Channels {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a real unittest for this class? I mean it's tested well but I'd love to have a dedicated test for that really hammers it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this class final with a default private ctor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final I get. Wondering why the default private ctor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - I get it :) final to prevent extending, private ctor to replace the abstract. Coming up.

/**
* The maximum chunk size for reads in bytes
*/
private static final int READ_CHUNK_SIZE = 16384;
/**
* The maximum chunk size for writes in bytes
*/
private static final int WRITE_CHUNK_SIZE = 8192;

/**
* read <i>length</i> bytes from <i>position</i> of a file channel
*/
public static byte[] readFromFileChannel(FileChannel channel, long position, int length) throws IOException {
byte[] res = new byte[length];
readFromFileChannel(channel, position, res, 0, length);
return res;

}

/**
* read <i>length</i> bytes from <i>position</i> of a file channel
*
* @param channel channel to read from
* @param channelPosition position to read from
* @param dest destination byte array to put data in
* @param destOffset offset in dest to read into
* @param length number of bytes to read
*/
public static void readFromFileChannel(FileChannel channel, long channelPosition, byte[] dest, int destOffset, int length) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(dest, destOffset, length);
while (length > 0) {
final int toRead = Math.min(READ_CHUNK_SIZE, length);
buffer.limit(buffer.position() + toRead);
assert buffer.remaining() == toRead;
final int i = channel.read(buffer, channelPosition);
if (i < 0) { // be defensive here, even though we checked before hand, something could have changed
throw new EOFException("read past EOF. pos [" + channelPosition + "] chunkLen: [" + toRead + "] end: [" + channel.size() + "]");
}
assert i > 0 : "FileChannel.read with non zero-length bb.remaining() must always read at least one byte (FileChannel is in blocking mode, see spec of ReadableByteChannel)";
channelPosition += i;
length -= i;
}
assert length == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you debug this you really wanna know the length.... maybe add a msg to it?

}

public static void writeToChannel(ChannelBuffer source, int sourceIndex, int length, GatheringByteChannel channel) throws IOException {
while (length > 0) {
int written = source.getBytes(sourceIndex, channel, length);
sourceIndex += written;
length -= written;
}
assert length == 0;
}

public static void writeToChannel(byte[] source, int offset, int length, WritableByteChannel channel) throws IOException {
int toWrite = Math.min(length, WRITE_CHUNK_SIZE);
ByteBuffer buffer = ByteBuffer.wrap(source, offset, toWrite);
int written = channel.write(buffer);
length -= written;
while (length > 0) {
toWrite = Math.min(length, WRITE_CHUNK_SIZE);
buffer.limit(buffer.position() + toWrite);
written = channel.write(buffer);
length -= written;
}
assert length == 0;
}

public static void writeToChannel(ByteBuffer byteBuffer, WritableByteChannel channel) throws IOException {
do {
channel.write(byteBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, but I think we need to chunk this write as well, or the JVM could allocate a big threadlocal buffer that hangs around...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. If the ByteBuffer is not a direct buffer, we should chunk. Maybe check if it is direct and chunk only then.

}
while (byteBuffer.position() != byteBuffer.limit());
}
}