Skip to content

Commit

Permalink
Compress async search responses before storing (#74766)
Browse files Browse the repository at this point in the history
Related to #67594
  • Loading branch information
dnhatn committed Jun 30, 2021
1 parent 631c313 commit 55175de
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 11 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -582,18 +584,28 @@ boolean ensureAuthenticatedUserIsSame(Map<String, String> originHeaders, Authent
}

private void writeResponse(R response, OutputStream os) throws IOException {
os = new FilterOutputStream(os) {
@Override
public void close() {
// do not close the output
}
};
final Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion();
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(os);
out.setVersion(minNodeVersion);
Version.writeVersion(minNodeVersion, out);
response.writeTo(out);
Version.writeVersion(minNodeVersion, new OutputStreamStreamOutput(os));
if (minNodeVersion.onOrAfter(Version.V_8_0_0)) {
os = CompressorFactory.COMPRESSOR.threadLocalOutputStream(os);
}
try (OutputStreamStreamOutput out = new OutputStreamStreamOutput(os)) {
out.setVersion(minNodeVersion);
response.writeTo(out);
}
}

/**
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
*/
private R decodeResponse(CharBuffer encodedBuffer) throws IOException {
final InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() {
InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() {
@Override
public int read() {
if (encodedBuffer.hasRemaining()) {
Expand All @@ -603,9 +615,12 @@ public int read() {
}
}
});
final Version version = Version.readVersion(new InputStreamStreamInput(encodedIn));
assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT;
if (version.onOrAfter(Version.V_8_0_0)) {
encodedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(encodedIn);
}
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(encodedIn), registry)) {
final Version version = Version.readVersion(in);
assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT;
in.setVersion(version);
return reader.read(in);
}
Expand Down
Expand Up @@ -15,7 +15,9 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -39,6 +41,7 @@
import org.hamcrest.Description;
import org.junit.After;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
Expand Down Expand Up @@ -286,8 +289,10 @@ public StoredAsyncResponse<EqlSearchResponse> getStoredRecord(String id) throws
if (doc.isExists()) {
String value = doc.getSource().get("result").toString();
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
in.setVersion(Version.readVersion(in));
final Version version = Version.readVersion(buf);
final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
in.setVersion(version);
return new StoredAsyncResponse<>(EqlSearchResponse::new, in);
}
}
Expand Down
Expand Up @@ -14,7 +14,9 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -37,6 +39,7 @@
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.junit.After;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
Expand Down Expand Up @@ -285,8 +288,10 @@ public StoredAsyncResponse<SqlQueryResponse> getStoredRecord(String id) throws E
if (doc.isExists()) {
String value = doc.getSource().get("result").toString();
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
in.setVersion(Version.readVersion(in));
final Version version = Version.readVersion(buf);
final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
in.setVersion(version);
return new StoredAsyncResponse<>(SqlQueryResponse::new, in);
}
}
Expand Down

0 comments on commit 55175de

Please sign in to comment.