Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite BytesStreamOutput on top of BigArrays/ByteArray. #5331

Merged
merged 1 commit into from Mar 4, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,38 +19,52 @@

package org.elasticsearch.common.io.stream;

import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;

import java.io.IOException;

/**
*
* A @link {@link StreamOutput} that uses{@link BigArrays} to acquire pages of
* bytes, which avoids frequent reallocation & copying of the internal data.
*/
public class BytesStreamOutput extends StreamOutput implements BytesStream {

public static final int DEFAULT_SIZE = 2 * 1024;

public static final int OVERSIZE_LIMIT = 256 * 1024;
/**
* Factory/manager for our ByteArray
*/
private final BigArrays bigarrays;

/**
* The buffer where data is stored.
* The internal list of pages.
*/
protected byte buf[];
private ByteArray bytes;

/**
* The number of valid bytes in the buffer.
*/
protected int count;
private int count;

/**
* Create a nonrecycling {@link BytesStreamOutput} with 1 initial page acquired.
*/
public BytesStreamOutput() {
this(DEFAULT_SIZE);
this(BigArrays.PAGE_SIZE_IN_BYTES);
}

public BytesStreamOutput(int size) {
this.buf = new byte[size];
/**
* Create a nonrecycling {@link BytesStreamOutput} with enough initial pages acquired
* to satisfy the capacity given by {@link expectedSize}.
*
* @param expectedSize the expected maximum size of the stream in bytes.
*/
public BytesStreamOutput(int expectedSize) {
bigarrays = BigArrays.NON_RECYCLING_INSTANCE;
bytes = bigarrays.newByteArray(expectedSize);
}

@Override
Expand All @@ -63,88 +77,90 @@ public long position() throws IOException {
return count;
}

@Override
public void seek(long position) throws IOException {
if (position > Integer.MAX_VALUE) {
throw new UnsupportedOperationException();
}
count = (int) position;
}

@Override
public void writeByte(byte b) throws IOException {
int newcount = count + 1;
if (newcount > buf.length) {
buf = grow(newcount);
}
buf[count] = b;
count = newcount;
}

public void skip(int length) {
int newcount = count + length;
if (newcount > buf.length) {
buf = grow(newcount);
}
count = newcount;
ensureCapacity(count+1);
bytes.set(count, b);
count++;
}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
// nothing to copy
if (length == 0) {
return;
}
int newcount = count + length;
if (newcount > buf.length) {
buf = grow(newcount);
}
System.arraycopy(b, offset, buf, count, length);
count = newcount;
}

private byte[] grow(int newCount) {
// try and grow faster while we are small...
if (newCount < OVERSIZE_LIMIT) {
newCount = Math.max(buf.length << 1, newCount);
// illegal args: offset and/or length exceed array size
if (b.length < (offset + length)) {
throw new IllegalArgumentException("Illegal offset " + offset + "/length " + length + " for byte[] of length " + b.length);
}
return ArrayUtil.grow(buf, newCount);
}

public void seek(int seekTo) {
count = seekTo;
// get enough pages for new size
ensureCapacity(count+length);

// bulk copy
bytes.set(count, b, offset, length);

// advance
count += length;
}

public void reset() {
count = 0;
}
// shrink list of pages
if (bytes.size() > BigArrays.PAGE_SIZE_IN_BYTES) {
bytes = bigarrays.resize(bytes, BigArrays.PAGE_SIZE_IN_BYTES);
}

public int bufferSize() {
return buf.length;
// go back to start
count = 0;
}

@Override
public void flush() throws IOException {
// nothing to do there
// nothing to do
}

@Override
public void close() throws IOException {
// nothing to do here
public void seek(long position) throws IOException {
if (position > Integer.MAX_VALUE) {
throw new IllegalArgumentException("position " + position + " > Integer.MAX_VALUE");
}

count = (int)position;
ensureCapacity(count);
}

public void skip(int length) {
count += length;
ensureCapacity(count);
}

@Override
public BytesReference bytes() {
return new BytesArray(buf, 0, count);
public void close() throws IOException {
// empty for now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at ensureOpen, I would expect it to set count to -1?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kimchy explicitly wanted close() a no-op because many callers still call close() prematurely and then something else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's remove or change ensureOpen, this looks inconsistent otherwise?

}

/**
* Returns the current size of the buffer.
*
* @return the value of the <code>count</code> field, which is the number
* of valid bytes in this output stream.
*
* @return the value of the <code>count</code> field, which is the number of valid
* bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
*/
public int size() {
return count;
}

@Override
public BytesReference bytes() {
BytesRef bref = new BytesRef();
bytes.get(0, count, bref);
return new BytesArray(bref, false);
}

private void ensureCapacity(int offset) {
bytes = bigarrays.grow(bytes, offset);
}

}
16 changes: 12 additions & 4 deletions src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
Expand Up @@ -21,7 +21,7 @@

import jsr166y.ThreadLocalRandom;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -34,6 +34,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogStreams;

import java.io.File;
Expand Down Expand Up @@ -340,18 +341,25 @@ public Location add(Operation operation) throws TranslogException {
TranslogStreams.writeTranslogOperation(out, operation);
out.flush();

// write size to beginning of stream
int size = out.size();
out.seek(0);
out.writeInt(size - 4);

Location location = current.add(out.bytes().array(), out.bytes().arrayOffset(), size);

// seek back to end
out.seek(size);

BytesReference ref = out.bytes();
byte[] refBytes = ref.array();
int refBytesOffset = ref.arrayOffset();
Location location = current.add(refBytes, refBytesOffset, size);
if (syncOnEachOperation) {
current.sync();
}
FsTranslogFile trans = this.trans;
if (trans != null) {
try {
location = trans.add(out.bytes().array(), out.bytes().arrayOffset(), size);
location = trans.add(refBytes, refBytesOffset, size);
} catch (ClosedChannelException e) {
// ignore
}
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -221,11 +222,13 @@ public void initializeSnapshot(SnapshotId snapshotId, ImmutableList<String> indi
// TODO: Can we make it atomic?
throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists");
}
snapshotsBlobContainer.writeBlob(snapshotBlobName, bStream.bytes().streamInput(), bStream.bytes().length());
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(snapshotBlobName, bRef.streamInput(), bRef.length());
// Write Global MetaData
// TODO: Check if metadata needs to be written
bStream = writeGlobalMetaData(metaData);
snapshotsBlobContainer.writeBlob(metaDataBlobName(snapshotId), bStream.bytes().streamInput(), bStream.bytes().length());
bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(metaDataBlobName(snapshotId), bRef.streamInput(), bRef.length());
for (String index : indices) {
IndexMetaData indexMetaData = metaData.index(index);
BlobPath indexPath = basePath().add("indices").add(index);
Expand All @@ -240,7 +243,8 @@ public void initializeSnapshot(SnapshotId snapshotId, ImmutableList<String> indi
IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
builder.close();
indexMetaDataBlobContainer.writeBlob(snapshotBlobName(snapshotId), bStream.bytes().streamInput(), bStream.bytes().length());
bRef = bStream.bytes();
indexMetaDataBlobContainer.writeBlob(snapshotBlobName(snapshotId), bRef.streamInput(), bRef.length());
}
} catch (IOException ex) {
throw new SnapshotCreationException(snapshotId, ex);
Expand Down Expand Up @@ -314,7 +318,8 @@ public Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int tota
updatedSnapshot.endTime(System.currentTimeMillis());
snapshot = updatedSnapshot.build();
BytesStreamOutput bStream = writeSnapshot(snapshot);
snapshotsBlobContainer.writeBlob(blobName, bStream.bytes().streamInput(), bStream.bytes().length());
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(blobName, bRef.streamInput(), bRef.length());
ImmutableList<SnapshotId> snapshotIds = snapshots();
if (!snapshotIds.contains(snapshotId)) {
snapshotIds = ImmutableList.<SnapshotId>builder().addAll(snapshotIds).add(snapshotId).build();
Expand Down Expand Up @@ -569,7 +574,8 @@ protected void writeSnapshotList(ImmutableList<SnapshotId> snapshots) throws IOE
builder.endArray();
builder.endObject();
builder.close();
snapshotsBlobContainer.writeBlob(SNAPSHOTS_FILE, bStream.bytes().streamInput(), bStream.bytes().length());
BytesReference bRef = bStream.bytes();
snapshotsBlobContainer.writeBlob(SNAPSHOTS_FILE, bRef.streamInput(), bRef.length());
}

/**
Expand Down