Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BytesReference to write to translog files #5463

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,7 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
Expand Down Expand Up @@ -83,6 +84,11 @@ public void writeTo(OutputStream os) throws IOException {
}
}

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(buffer);
}

@Override
public byte[] toBytes() {
if (!buffer.hasRemaining()) {
Expand Down
16 changes: 11 additions & 5 deletions src/main/java/org/elasticsearch/common/bytes/BytesArray.java
Expand Up @@ -19,10 +19,7 @@

package org.elasticsearch.common.bytes;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;

import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
Expand All @@ -31,7 +28,11 @@
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Arrays;

public class BytesArray implements BytesReference {

Expand Down Expand Up @@ -106,6 +107,11 @@ public void writeTo(OutputStream os) throws IOException {
os.write(bytes, offset, length);
}

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes, offset, length()));
}

@Override
public byte[] toBytes() {
if (offset == 0 && bytes.length == length) {
Expand Down
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.GatheringByteChannel;

/**
* A reference to bytes.
Expand Down Expand Up @@ -95,6 +96,11 @@ public static int bytesHashCode(BytesReference a) {
*/
void writeTo(OutputStream os) throws IOException;

/**
* Writes the bytes directly to the channel.
*/
void writeTo(GatheringByteChannel channel) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be just WritableByteChannel instead of GatheringByteChannel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

netty channel buffer uses GatheringByteChannel for its getBytes version, and its nice to have the option to write ByteBuffer[]. All relevant actual implementations end up implementing GatheringByteChannel so I thought it just makes sense.


/**
* Returns the bytes as a single byte array.
*/
Expand Down
Expand Up @@ -26,6 +26,8 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;

/**
*/
Expand Down Expand Up @@ -62,6 +64,11 @@ public void writeTo(OutputStream os) throws IOException {
buffer.getBytes(buffer.readerIndex(), os, length());
}

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
buffer.getBytes(buffer.readerIndex(), channel, length());
}

@Override
public byte[] toBytes() {
return copyBytesArray().toBytes();
Expand Down
Expand Up @@ -28,6 +28,8 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;

/**
* A bytes array reference that caches the hash code.
Expand Down Expand Up @@ -73,6 +75,11 @@ public void writeTo(OutputStream os) throws IOException {
os.write(bytes);
}

@Override
public void writeTo(GatheringByteChannel channel) throws IOException {
channel.write(ByteBuffer.wrap(bytes));
}

@Override
public byte[] toBytes() {
return bytes;
Expand Down
Expand Up @@ -19,11 +19,13 @@

package org.elasticsearch.index.translog.fs;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -47,6 +49,7 @@ public class BufferingFsTranslogFile implements FsTranslogFile {

private byte[] buffer;
private int bufferCount;
private WrapperOutputStream bufferOs = new WrapperOutputStream();

public BufferingFsTranslogFile(ShardId shardId, long id, RafReference raf, int bufferSize) throws IOException {
this.shardId = shardId;
Expand All @@ -69,27 +72,26 @@ public long translogSizeInBytes() {
}

@Override
public Translog.Location add(byte[] data, int from, int size) throws IOException {
public Translog.Location add(BytesReference data) throws IOException {
rwl.writeLock().lock();
try {
operationCounter++;
long position = lastPosition;
if (size >= buffer.length) {
if (data.length() >= buffer.length) {
flushBuffer();
// we use the channel to write, since on windows, writing to the RAF might not be reflected
// when reading through the channel
raf.channel().write(ByteBuffer.wrap(data, from, size));
lastWrittenPosition += size;
lastPosition += size;
return new Translog.Location(id, position, size);
data.writeTo(raf.channel());
lastWrittenPosition += data.length();
lastPosition += data.length();
return new Translog.Location(id, position, data.length());
}
if (size > buffer.length - bufferCount) {
if (data.length() > buffer.length - bufferCount) {
flushBuffer();
}
System.arraycopy(data, from, buffer, bufferCount, size);
bufferCount += size;
lastPosition += size;
return new Translog.Location(id, position, size);
data.writeTo(bufferOs);
lastPosition += data.length();
return new Translog.Location(id, position, data.length());
} finally {
rwl.writeLock().unlock();
}
Expand Down Expand Up @@ -211,4 +213,19 @@ public void updateBufferSize(int bufferSize) {
rwl.writeLock().unlock();
}
}

class WrapperOutputStream extends OutputStream {

@Override
public void write(int b) throws IOException {
buffer[bufferCount++] = (byte) b;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
// we do safety checked when we decide to use this stream...
System.arraycopy(b, off, buffer, bufferCount, len);
bufferCount += len;
}
}
}
Expand Up @@ -349,21 +349,15 @@ public Location add(Operation operation) throws TranslogException {
// seek back to end
out.seek(size);

BytesReference ref = out.bytes();
// TODO: pass the BytesReference to the FsTranslogFile and have them optimize writing
if (!ref.hasArray()) {
ref = ref.toBytesArray();
}
byte[] refBytes = ref.array();
int refBytesOffset = ref.arrayOffset();
Location location = current.add(refBytes, refBytesOffset, size);
BytesReference bytes = out.bytes();
Location location = current.add(bytes);
if (syncOnEachOperation) {
current.sync();
}
FsTranslogFile trans = this.trans;
if (trans != null) {
try {
location = trans.add(refBytes, refBytesOffset, size);
location = trans.add(bytes);
} catch (ClosedChannelException e) {
// ignore
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.translog.fs;

import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
Expand Down Expand Up @@ -61,7 +62,7 @@ public static Type fromString(String type) throws ElasticsearchIllegalArgumentEx

long translogSizeInBytes();

Translog.Location add(byte[] data, int from, int size) throws IOException;
Translog.Location add(BytesReference data) throws IOException;

byte[] read(Translog.Location location) throws IOException;

Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.translog.fs;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
Expand Down Expand Up @@ -60,12 +61,12 @@ public long translogSizeInBytes() {
return lastWrittenPosition.get();
}

public Translog.Location add(byte[] data, int from, int size) throws IOException {
long position = lastPosition.getAndAdd(size);
raf.channel().write(ByteBuffer.wrap(data, from, size), position);
lastWrittenPosition.getAndAdd(size);
public Translog.Location add(BytesReference data) throws IOException {
long position = lastPosition.getAndAdd(data.length());
data.writeTo(raf.channel());
lastWrittenPosition.getAndAdd(data.length());
operationCounter.incrementAndGet();
return new Translog.Location(id, position, size);
return new Translog.Location(id, position, data.length());
}

public byte[] read(Translog.Location location) throws IOException {
Expand Down