Skip to content

Commit

Permalink
Revert "Closing a ReleasableBytesStreamOutput closes the underlying B…
Browse files Browse the repository at this point in the history
…igArray (#23572)"

This reverts commit 6bfecdf.
  • Loading branch information
jasontedor committed Apr 5, 2017
1 parent d31d2ca commit afd45c1
Show file tree
Hide file tree
Showing 17 changed files with 88 additions and 265 deletions.
Expand Up @@ -30,17 +30,13 @@
*/
public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable {

private final Releasable releasable;

public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length,
Releasable releasable) {
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
super(bigarrays, byteArray, length);
this.releasable = releasable;
}

@Override
public void close() {
Releasables.close(releasable);
Releasables.close(byteArray);
}

}
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.common.compress;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand All @@ -32,9 +31,5 @@ public interface Compressor {

StreamInput streamInput(StreamInput in) throws IOException;

/**
* Creates a new stream output that compresses the contents and writes to the provided stream
* output. Closing the returned {@link StreamOutput} will close the provided stream output.
*/
StreamOutput streamOutput(StreamOutput out) throws IOException;
}
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.common.compress;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -46,7 +47,7 @@ public class DeflateCompressor implements Compressor {
// It needs to be different from other compressors and to not be specific
// enough so that no stream starting with these bytes could be detected as
// a XContent
private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'};
private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' };
// 3 is a good trade-off between speed and compression ratio
private static final int LEVEL = 3;
// We use buffering on the input and output of in/def-laters in order to
Expand Down Expand Up @@ -87,7 +88,6 @@ public StreamInput streamInput(StreamInput in) throws IOException {
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
return new InputStreamStreamInput(decompressedIn) {
final AtomicBoolean closed = new AtomicBoolean(false);

public void close() throws IOException {
try {
super.close();
Expand All @@ -107,11 +107,10 @@ public StreamOutput streamOutput(StreamOutput out) throws IOException {
final boolean nowrap = true;
final Deflater deflater = new Deflater(LEVEL, nowrap);
final boolean syncFlush = true;
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE);
return new OutputStreamStreamOutput(compressedOut) {
final AtomicBoolean closed = new AtomicBoolean(false);

public void close() throws IOException {
try {
super.close();
Expand Down
Expand Up @@ -17,11 +17,11 @@
* under the License.
*/

package org.elasticsearch.common.io.stream;
package org.elasticsearch.common.io;

import org.elasticsearch.common.bytes.BytesReference;

public abstract class BytesStream extends StreamOutput {
public interface BytesStream {

public abstract BytesReference bytes();
}
BytesReference bytes();
}
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.io;

import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;

/**
* A bytes stream that requires its bytes to be released once no longer used.
*/
public interface ReleasableBytesStream extends BytesStream {

@Override
ReleasablePagedBytesReference bytes();

}
55 changes: 0 additions & 55 deletions core/src/main/java/org/elasticsearch/common/io/Streams.java
Expand Up @@ -20,9 +20,6 @@
package org.elasticsearch.common.io;

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Callback;

import java.io.BufferedReader;
Expand Down Expand Up @@ -239,56 +236,4 @@ public static void readAllLines(InputStream input, Callback<String> callback) th
}
}
}

/**
* Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
* close is called.
*/
public static BytesStream flushOnCloseStream(BytesStream os) {
return new FlushOnCloseOutputStream(os);
}

/**
* A wrapper around a {@link BytesStream} that makes the close operation a flush. This is
* needed as sometimes a stream will be closed but the bytes that the stream holds still need
* to be used and the stream cannot be closed until the bytes have been consumed.
*/
private static class FlushOnCloseOutputStream extends BytesStream {

private final BytesStream delegate;

private FlushOnCloseOutputStream(BytesStream bytesStreamOutput) {
this.delegate = bytesStreamOutput;
}

@Override
public void writeByte(byte b) throws IOException {
delegate.writeByte(b);
}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length);
}

@Override
public void flush() throws IOException {
delegate.flush();
}

@Override
public void close() throws IOException {
flush();
}

@Override
public void reset() throws IOException {
delegate.reset();
}

@Override
public BytesReference bytes() {
return delegate.bytes();
}
}
}
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.PagedBytesReference;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;

Expand All @@ -30,7 +31,7 @@
* A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of
* bytes, which avoids frequent reallocation &amp; copying of the internal data.
*/
public class BytesStreamOutput extends BytesStream {
public class BytesStreamOutput extends StreamOutput implements BytesStream {

protected final BigArrays bigArrays;

Expand All @@ -49,7 +50,7 @@ public BytesStreamOutput() {
/**
* Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired
* to satisfy the capacity given by expected size.
*
*
* @param expectedSize the expected maximum size of the stream in bytes.
*/
public BytesStreamOutput(int expectedSize) {
Expand Down Expand Up @@ -128,7 +129,7 @@ public void close() {

/**
* Returns the current size of the buffer.
*
*
* @return the value of the <code>count</code> field, which is the number of valid
* bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
Expand All @@ -150,7 +151,7 @@ public long ramBytesUsed() {
return bytes.ramBytesUsed();
}

void ensureCapacity(long offset) {
private void ensureCapacity(long offset) {
if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
}
Expand Down
Expand Up @@ -20,56 +20,29 @@
package org.elasticsearch.common.io.stream;

import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.io.ReleasableBytesStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;

/**
* An bytes stream output that allows providing a {@link BigArrays} instance
* expecting it to require releasing its content ({@link #bytes()}) once done.
* <p>
* Please note, closing this stream will release the bytes that are in use by any
* {@link ReleasablePagedBytesReference} returned from {@link #bytes()}, so this
* stream should only be closed after the bytes have been output or copied
* elsewhere.
* Please note, its is the responsibility of the caller to make sure the bytes
* reference do not "escape" and are released only once.
*/
public class ReleasableBytesStreamOutput extends BytesStreamOutput
implements Releasable {

private Releasable releasable;
public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream {

public ReleasableBytesStreamOutput(BigArrays bigarrays) {
this(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
}

public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
super(expectedSize, bigArrays);
this.releasable = Releasables.releaseOnce(this.bytes);
}

/**
* Returns a {@link Releasable} implementation of a
* {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of
* the bytes in the stream.
*/
@Override
public ReleasablePagedBytesReference bytes() {
return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable);
}

@Override
public void close() {
Releasables.close(releasable);
return new ReleasablePagedBytesReference(bigArrays, bytes, count);
}

@Override
void ensureCapacity(long offset) {
final ByteArray prevBytes = this.bytes;
super.ensureCapacity(offset);
if (prevBytes != this.bytes) {
// re-create the releasable with the new reference
releasable = Releasables.releaseOnce(this.bytes);
}
}
}
Expand Up @@ -22,7 +22,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.text.Text;
Expand Down Expand Up @@ -53,7 +53,7 @@
/**
* A utility to build XContent (ie json).
*/
public final class XContentBuilder implements Releasable, Flushable {
public final class XContentBuilder implements BytesStream, Releasable, Flushable {

/**
* Create a new {@link XContentBuilder} using the given {@link XContent} content.
Expand Down Expand Up @@ -1041,6 +1041,7 @@ public XContentGenerator generator() {
return this.generator;
}

@Override
public BytesReference bytes() {
close();
return ((BytesStream) bos).bytes();
Expand Down
Expand Up @@ -439,7 +439,7 @@ public Location add(final Operation operation) throws IOException {
}
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {
Releasables.close(out);
Releasables.close(out.bytes());
}
}

Expand Down Expand Up @@ -1332,7 +1332,7 @@ public static void writeOperations(StreamOutput outStream, List<Operation> toWri
bytes.writeTo(outStream);
}
} finally {
Releasables.close(out);
Releasables.close(out.bytes());
}

}
Expand Down
19 changes: 3 additions & 16 deletions core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java
Expand Up @@ -20,14 +20,12 @@

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -99,9 +97,7 @@ public XContentBuilder newBuilder(@Nullable XContentType requestContentType, boo
excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet());
}

OutputStream unclosableOutputStream = Streams.flushOnCloseStream(bytesOutput());
XContentBuilder builder =
new XContentBuilder(XContentFactory.xContent(responseContentType), unclosableOutputStream, includes, excludes);
XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(responseContentType), bytesOutput(), includes, excludes);
if (pretty) {
builder.prettyPrint().lfAtEnd();
}
Expand All @@ -111,9 +107,8 @@ public XContentBuilder newBuilder(@Nullable XContentType requestContentType, boo
}

/**
* A channel level bytes output that can be reused. The bytes output is lazily instantiated
* by a call to {@link #newBytesOutput()}. Once the stream is created, it gets reset on each
* call to this method.
* A channel level bytes output that can be reused. It gets reset on each call to this
* method.
*/
@Override
public final BytesStreamOutput bytesOutput() {
Expand All @@ -125,14 +120,6 @@ public final BytesStreamOutput bytesOutput() {
return bytesOut;
}

/**
* An accessor to the raw value of the channel bytes output. This method will not instantiate
* a new stream if one does not exist and this method will not reset the stream.
*/
protected final BytesStreamOutput bytesOutputOrNull() {
return bytesOut;
}

protected BytesStreamOutput newBytesOutput() {
return new BytesStreamOutput();
}
Expand Down

0 comments on commit afd45c1

Please sign in to comment.