Skip to content

Commit

Permalink
Rewrite BytesStreamOutput on top of BigArrays/ByteArray.
Browse files Browse the repository at this point in the history
Fix for #5159
  • Loading branch information
Holger Hoffstätte committed Mar 4, 2014
1 parent bbc0486 commit 68cb9a8
Show file tree
Hide file tree
Showing 4 changed files with 379 additions and 87 deletions.
Expand Up @@ -19,38 +19,52 @@

package org.elasticsearch.common.io.stream;

import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;

import java.io.IOException;

/**
*
* A @link {@link StreamOutput} that uses{@link BigArrays} to acquire pages of
* bytes, which avoids frequent reallocation & copying of the internal data.
*/
public class BytesStreamOutput extends StreamOutput implements BytesStream {

public static final int DEFAULT_SIZE = 2 * 1024;

public static final int OVERSIZE_LIMIT = 256 * 1024;
/**
* Factory/manager for our ByteArray
*/
private final BigArrays bigarrays;

/**
* The buffer where data is stored.
* The internal list of pages.
*/
protected byte buf[];
private ByteArray bytes;

/**
* The number of valid bytes in the buffer.
*/
protected int count;
private int count;

/**
* Create a nonrecycling {@link BytesStreamOutput} with 1 initial page acquired.
*/
public BytesStreamOutput() {
this(DEFAULT_SIZE);
this(BigArrays.PAGE_SIZE_IN_BYTES);
}

public BytesStreamOutput(int size) {
this.buf = new byte[size];
/**
* Create a nonrecycling {@link BytesStreamOutput} with enough initial pages acquired
* to satisfy the capacity given by {@link expectedSize}.
*
* @param expectedSize the expected maximum size of the stream in bytes.
*/
public BytesStreamOutput(int expectedSize) {
bigarrays = BigArrays.NON_RECYCLING_INSTANCE;
bytes = bigarrays.newByteArray(expectedSize);
}

@Override
Expand All @@ -63,88 +77,90 @@ public long position() throws IOException {
return count;
}

@Override
public void seek(long position) throws IOException {
if (position > Integer.MAX_VALUE) {
throw new UnsupportedOperationException();
}
count = (int) position;
}

@Override
public void writeByte(byte b) throws IOException {
int newcount = count + 1;
if (newcount > buf.length) {
buf = grow(newcount);
}
buf[count] = b;
count = newcount;
}

public void skip(int length) {
int newcount = count + length;
if (newcount > buf.length) {
buf = grow(newcount);
}
count = newcount;
ensureCapacity(count+1);
bytes.set(count, b);
count++;
}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
// nothing to copy
if (length == 0) {
return;
}
int newcount = count + length;
if (newcount > buf.length) {
buf = grow(newcount);
}
System.arraycopy(b, offset, buf, count, length);
count = newcount;
}

private byte[] grow(int newCount) {
// try and grow faster while we are small...
if (newCount < OVERSIZE_LIMIT) {
newCount = Math.max(buf.length << 1, newCount);
// illegal args: offset and/or length exceed array size
if (b.length < (offset + length)) {
throw new IllegalArgumentException("Illegal offset " + offset + "/length " + length + " for byte[] of length " + b.length);
}
return ArrayUtil.grow(buf, newCount);
}

public void seek(int seekTo) {
count = seekTo;
// get enough pages for new size
ensureCapacity(count+length);

// bulk copy
bytes.set(count, b, offset, length);

// advance
count += length;
}

public void reset() {
count = 0;
}
// shrink list of pages
if (bytes.size() > BigArrays.PAGE_SIZE_IN_BYTES) {
bytes = bigarrays.resize(bytes, BigArrays.PAGE_SIZE_IN_BYTES);
}

public int bufferSize() {
return buf.length;
// go back to start
count = 0;
}

@Override
public void flush() throws IOException {
// nothing to do there
// nothing to do
}

@Override
public void close() throws IOException {
// nothing to do here
public void seek(long position) throws IOException {
if (position > Integer.MAX_VALUE) {
throw new IllegalArgumentException("position " + position + " > Integer.MAX_VALUE");
}

count = (int)position;
ensureCapacity(count);
}

public void skip(int length) {
count += length;
ensureCapacity(count);
}

@Override
public BytesReference bytes() {
return new BytesArray(buf, 0, count);
public void close() throws IOException {
// empty for now.
}

/**
* Returns the current size of the buffer.
*
* @return the value of the <code>count</code> field, which is the number
* of valid bytes in this output stream.
*
* @return the value of the <code>count</code> field, which is the number of valid
* bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
*/
public int size() {
return count;
}

@Override
public BytesReference bytes() {
BytesRef bref = new BytesRef();
bytes.get(0, count, bref);
return new BytesArray(bref, false);
}

private void ensureCapacity(int offset) {
bytes = bigarrays.grow(bytes, offset);
}

}
16 changes: 12 additions & 4 deletions src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
Expand Up @@ -21,7 +21,7 @@

import jsr166y.ThreadLocalRandom;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -34,6 +34,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogStreams;

import java.io.File;
Expand Down Expand Up @@ -340,18 +341,25 @@ public Location add(Operation operation) throws TranslogException {
TranslogStreams.writeTranslogOperation(out, operation);
out.flush();

// write size to beginning of stream
int size = out.size();
out.seek(0);
out.writeInt(size - 4);

Location location = current.add(out.bytes().array(), out.bytes().arrayOffset(), size);

// seek back to end
out.seek(size);

BytesReference ref = out.bytes();
byte[] refBytes = ref.array();
int refBytesOffset = ref.arrayOffset();
Location location = current.add(refBytes, refBytesOffset, size);
if (syncOnEachOperation) {
current.sync();
}
FsTranslogFile trans = this.trans;
if (trans != null) {
try {
location = trans.add(out.bytes().array(), out.bytes().arrayOffset(), size);
location = trans.add(refBytes, refBytesOffset, size);
} catch (ClosedChannelException e) {
// ignore
}
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -221,11 +222,13 @@ public void initializeSnapshot(SnapshotId snapshotId, ImmutableList<String> indi
// TODO: Can we make it atomic?
throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists");
}
snapshotsBlobContainer.writeBlob(snapshotBlobName, bStream.bytes().streamInput(), bStream.bytes().length());
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(snapshotBlobName, bRef.streamInput(), bRef.length());
// Write Global MetaData
// TODO: Check if metadata needs to be written
bStream = writeGlobalMetaData(metaData);
snapshotsBlobContainer.writeBlob(metaDataBlobName(snapshotId), bStream.bytes().streamInput(), bStream.bytes().length());
bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(metaDataBlobName(snapshotId), bRef.streamInput(), bRef.length());
for (String index : indices) {
IndexMetaData indexMetaData = metaData.index(index);
BlobPath indexPath = basePath().add("indices").add(index);
Expand All @@ -240,7 +243,8 @@ public void initializeSnapshot(SnapshotId snapshotId, ImmutableList<String> indi
IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
builder.close();
indexMetaDataBlobContainer.writeBlob(snapshotBlobName(snapshotId), bStream.bytes().streamInput(), bStream.bytes().length());
bRef = bStream.bytes();
indexMetaDataBlobContainer.writeBlob(snapshotBlobName(snapshotId), bRef.streamInput(), bRef.length());
}
} catch (IOException ex) {
throw new SnapshotCreationException(snapshotId, ex);
Expand Down Expand Up @@ -314,7 +318,8 @@ public Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int tota
updatedSnapshot.endTime(System.currentTimeMillis());
snapshot = updatedSnapshot.build();
BytesStreamOutput bStream = writeSnapshot(snapshot);
snapshotsBlobContainer.writeBlob(blobName, bStream.bytes().streamInput(), bStream.bytes().length());
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(blobName, bRef.streamInput(), bRef.length());
ImmutableList<SnapshotId> snapshotIds = snapshots();
if (!snapshotIds.contains(snapshotId)) {
snapshotIds = ImmutableList.<SnapshotId>builder().addAll(snapshotIds).add(snapshotId).build();
Expand Down Expand Up @@ -569,7 +574,8 @@ protected void writeSnapshotList(ImmutableList<SnapshotId> snapshots) throws IOE
builder.endArray();
builder.endObject();
builder.close();
snapshotsBlobContainer.writeBlob(SNAPSHOTS_FILE, bStream.bytes().streamInput(), bStream.bytes().length());
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(SNAPSHOTS_FILE, bRef.streamInput(), bRef.length());
}

/**
Expand Down

0 comments on commit 68cb9a8

Please sign in to comment.