Skip to content

Commit

Permalink
Allow build XContent directly from Writable (#73804)
Browse files Browse the repository at this point in the history
Today, writing a Writable value to XContent in Base64 format performs 
these steps: (1) create a BytesStreamOutput, (2) write Writable to that
output, (3) encode a copy of bytes from that output stream, (4) create a
string from the encoded bytes, (5) write the encoded string to XContent. 
These steps allocate/use memory 5 times than writing the encode chars
directly to the output of XContent.

This API would help reduce memory usage when storing a large response 
of an async search.

Relates #67594
  • Loading branch information
dnhatn committed Jun 6, 2021
1 parent 58c1477 commit cb11448
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 0 deletions.
Expand Up @@ -8,10 +8,12 @@

package org.elasticsearch.common.xcontent;

import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.RestApiVersion;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.FilterOutputStream;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -21,6 +23,7 @@
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -1047,6 +1050,32 @@ public XContentBuilder copyCurrentStructure(XContentParser parser) throws IOExce
return this;
}

/**
* Write the content that is written to the output stream by the {@code writer} as a string encoded in Base64 format.
* This API can be used to generate XContent directly without the intermediate results to reduce memory usage.
* Note that this method supports only JSON.
*/
public XContentBuilder directFieldAsBase64(String name, CheckedConsumer<OutputStream, IOException> writer) throws IOException {
if (contentType() != XContentType.JSON) {
assert false : "writableFieldAsBase64 supports only JSON format";
throw new UnsupportedOperationException("writableFieldAsBase64 supports only JSON format");
}
generator.writeDirectField(name, os -> {
os.write('\"');
final FilterOutputStream noClose = new FilterOutputStream(os) {
@Override
public void close() {
// We need to close the output stream that is wrapped by a Base64 encoder to flush the outstanding buffer
// of the encoder, but we must not close the underlying output stream of the XContentBuilder.
}
};
final OutputStream encodedOutput = Base64.getEncoder().wrap(noClose);
writer.accept(encodedOutput);
encodedOutput.close(); // close to flush the outstanding buffer used in the Base64 Encoder
os.write('\"');
});
return this;
}

/**
* Returns a version used for serialising a response.
Expand Down
Expand Up @@ -8,10 +8,13 @@

package org.elasticsearch.common.xcontent;

import org.elasticsearch.common.CheckedConsumer;

import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;

Expand Down Expand Up @@ -102,6 +105,12 @@ public interface XContentGenerator extends Closeable, Flushable {

void copyCurrentStructure(XContentParser parser) throws IOException;

/**
* Write a field whose value is written directly to the output stream. As the content is copied as is,
* the writer must a valid XContent value (e.g., string is properly escaped and quoted)
*/
void writeDirectField(String name, CheckedConsumer<OutputStream, IOException> writer) throws IOException;

default void copyCurrentEvent(XContentParser parser) throws IOException {
switch (parser.currentToken()) {
case START_OBJECT:
Expand Down
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.core.util.JsonGeneratorDelegate;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
Expand Down Expand Up @@ -430,6 +431,15 @@ private static void copyCurrentStructure(XContentGenerator destination, XContent
}
}

@Override
public void writeDirectField(String name, CheckedConsumer<OutputStream, IOException> writer) throws IOException {
writeStartRaw(name);
flush();
writer.accept(os);
flush();
writeEndRaw();
}

@Override
public void flush() throws IOException {
generator.flush();
Expand Down
Expand Up @@ -19,16 +19,21 @@
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
Expand All @@ -37,9 +42,13 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;

import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.instanceOf;

public class XContentBuilderTests extends ESTestCase {
public void testPrettyWithLfAtEnd() throws Exception {
Expand Down Expand Up @@ -383,4 +392,84 @@ public void testMissingEndArray() throws IOException {
assertThat(e.getMessage(), equalTo("Failed to close the XContentBuilder"));
assertThat(e.getCause().getMessage(), equalTo("Unclosed object or array found"));
}

private static class TestWritableValue {
final Map<String, Byte> values;

static TestWritableValue randomValue() {
int numKeys = randomIntBetween(0, 10);
Map<String, Byte> values = new HashMap<>();
for (int i = 0; i < numKeys; i++) {
values.put(randomAlphaOfLength(10), randomByte());
}
return new TestWritableValue(values);
}

TestWritableValue(Map<String, Byte> values) {
this.values = values;
}

TestWritableValue(InputStream in) throws IOException {
final int size = in.read();
this.values = new HashMap<>(size);
for (int i = 0; i < size; i++) {
final int keySize = in.read();
final String key = new String(in.readNBytes(keySize), StandardCharsets.ISO_8859_1);
final byte value = (byte) in.read();
values.put(key, value);
}
}

public void writeTo(OutputStream os) throws IOException {
os.write((byte) values.size());
for (Map.Entry<String, Byte> e : values.entrySet()) {
final String k = e.getKey();
os.write((byte) k.length());
os.write(k.getBytes(StandardCharsets.ISO_8859_1));
os.write(e.getValue());
}
}
}

public void testWritableValue() throws Exception {
Map<String, Object> expectedValues = new HashMap<>();
final XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
int fields = iterations(1, 10);
for (int i = 0; i < fields; i++) {
String field = "field-" + i;
if (randomBoolean()) {
final TestWritableValue value = TestWritableValue.randomValue();
builder.directFieldAsBase64(field, value::writeTo);
expectedValues.put(field, value);
} else {
Object value = randomFrom(randomInt(), randomAlphaOfLength(10));
builder.field(field, value);
expectedValues.put(field, value);
}
}
builder.endObject();
final BytesReference bytes = BytesReference.bytes(builder);
final Map<String, Object> actualValues = XContentHelper.convertToMap(bytes, true).v2();
assertThat(actualValues, aMapWithSize(fields));
for (Map.Entry<String, Object> e : expectedValues.entrySet()) {
if (e.getValue() instanceof TestWritableValue) {
final TestWritableValue expectedValue = (TestWritableValue) e.getValue();
assertThat(actualValues.get(e.getKey()), instanceOf(String.class));
final byte[] decoded = Base64.getDecoder().decode((String) actualValues.get(e.getKey()));
final TestWritableValue actualValue = new TestWritableValue(new InputStream() {
int pos = 0;

@Override
public int read() {
Objects.checkIndex(pos, decoded.length);
return decoded[pos++];
}
});
assertThat(actualValue.values, equalTo(expectedValue.values));
} else {
assertThat(actualValues, hasEntry(e.getKey(), e.getValue()));
}
}
}
}

0 comments on commit cb11448

Please sign in to comment.