Skip to content

Commit

Permalink
Speed up serialization of BytesRefArray (#106053)
Browse files Browse the repository at this point in the history
Currently, we are reading and writing byte by byte during the 
serialization and deserialization of a BytesRefArray. We can improve the
performance by reading/writing through the backing pages or the
underlying array instead. I will open a follow-up PR to utilize this
change in serializing BytesRefBlock in ESQL.
  • Loading branch information
dnhatn committed Mar 7, 2024
1 parent e59b67a commit 6b430ae
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 7 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/106053.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 106053
summary: Speed up serialization of `BytesRefArray`
area: ES|QL
type: enhancement
issues: []
23 changes: 23 additions & 0 deletions server/src/main/java/org/elasticsearch/common/util/BigArrays.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -143,6 +145,27 @@ public void fill(long fromIndex, long toIndex, byte value) {
Arrays.fill(array, (int) fromIndex, (int) toIndex, value);
}

@Override
public BytesRefIterator iterator() {
return new BytesRefIterator() {
boolean visited = false;

@Override
public BytesRef next() {
if (visited) {
return null;
}
visited = true;
return new BytesRef(array, 0, Math.toIntExact(size()));
}
};
}

@Override
public void fillWith(StreamInput in) throws IOException {
in.readBytes(array, 0, Math.toIntExact(size()));
}

@Override
public boolean hasArray() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@

import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;

import static org.elasticsearch.common.util.BigLongArray.writePages;
import static org.elasticsearch.common.util.PageCacheRecycler.BYTE_PAGE_SIZE;
import static org.elasticsearch.common.util.PageCacheRecycler.PAGE_SIZE_IN_BYTES;

/**
* Byte array abstraction able to support more than 2B values. This implementation slices data into fixed-sized blocks of
Expand Down Expand Up @@ -139,6 +142,30 @@ public byte[] array() {
throw new UnsupportedOperationException();
}

@Override
public BytesRefIterator iterator() {
return new BytesRefIterator() {
int i = 0;

@Override
public BytesRef next() {
if (i >= pages.length) {
return null;
}
int len = i == pages.length - 1 ? Math.toIntExact(size - (pages.length - 1L) * PAGE_SIZE_IN_BYTES) : PAGE_SIZE_IN_BYTES;
return new BytesRef(pages[i++], 0, len);
}
};
}

@Override
public void fillWith(StreamInput in) throws IOException {
for (int i = 0; i < pages.length - 1; i++) {
in.readBytes(pages[i], 0, PAGE_SIZE_IN_BYTES);
}
in.readBytes(pages[pages.length - 1], 0, Math.toIntExact(size - (pages.length - 1L) * PAGE_SIZE_IN_BYTES));
}

@Override
protected int numBytesPerElement() {
return 1;
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/common/util/ByteArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.common.util;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;

Expand Down Expand Up @@ -51,6 +52,17 @@ static ByteArray readFrom(StreamInput in) throws IOException {
*/
void fill(long fromIndex, long toIndex, byte value);

/**
* Fills this ByteArray with bytes from the given input stream
*/
void fillWith(StreamInput in) throws IOException;

/**
* Returns a BytesRefIterator for this ByteArray. This method allows
* access to the internal pages of this reference without copying them.
*/
BytesRefIterator iterator();

/**
* Checks if this instance is backed by a single byte array analogous to {@link ByteBuffer#hasArray()}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -64,10 +65,7 @@ public BytesRefArray(StreamInput in, BigArrays bigArrays) throws IOException {
// bytes
long sizeOfBytes = in.readVLong();
bytes = bigArrays.newByteArray(sizeOfBytes, false);

for (long i = 0; i < sizeOfBytes; ++i) {
bytes.set(i, in.readByte());
}
bytes.fillWith(in);

success = true;
} finally {
Expand Down Expand Up @@ -149,11 +147,17 @@ public void writeTo(StreamOutput out) throws IOException {
}

// bytes might be overallocated, the last bucket of startOffsets contains the real size
long sizeOfBytes = startOffsets.get(size);
final long sizeOfBytes = startOffsets.get(size);
out.writeVLong(sizeOfBytes);
for (long i = 0; i < sizeOfBytes; ++i) {
out.writeByte(bytes.get(i));
final BytesRefIterator bytesIt = bytes.iterator();
BytesRef bytesRef;
long remained = sizeOfBytes;
while (remained > 0 && (bytesRef = bytesIt.next()) != null) {
int length = Math.toIntExact(Math.min(remained, bytesRef.length));
remained -= length;
out.writeBytes(bytesRef.bytes, bytesRef.offset, length);
}
assert remained == 0 : remained;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.common.util;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
Expand Down Expand Up @@ -88,6 +89,17 @@ public byte[] array() {
throw new UnsupportedOperationException();
}

@Override
public BytesRefIterator iterator() {
assert ref.hasReferences();
return ref.iterator();
}

@Override
public void fillWith(StreamInput in) {
throw new UnsupportedOperationException("read-only ByteArray");
}

@Override
public long ramBytesUsed() {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.elasticsearch.common.util;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -275,6 +277,27 @@ public void testByteArrayBulkSet() {
array2.close();
}

public void testByteIterator() throws Exception {
final byte[] bytes = new byte[randomIntBetween(1, 4000000)];
random().nextBytes(bytes);
ByteArray array = bigArrays.newByteArray(bytes.length, randomBoolean());
array.fillWith(new ByteArrayStreamInput(bytes));
for (int i = 0; i < bytes.length; i++) {
assertEquals(bytes[i], array.get(i));
}
BytesRefIterator it = array.iterator();
BytesRef ref;
int offset = 0;
while ((ref = it.next()) != null) {
for (int i = 0; i < ref.length; i++) {
assertEquals(bytes[offset], ref.bytes[ref.offset + i]);
offset++;
}
}
assertThat(offset, equalTo(bytes.length));
array.close();
}

public void testByteArrayEquals() {
final ByteArray empty1 = byteArrayWithBytes(BytesRef.EMPTY_BYTES);
final ByteArray empty2 = byteArrayWithBytes(BytesRef.EMPTY_BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -398,6 +400,16 @@ public void fill(long fromIndex, long toIndex, byte value) {
in.fill(fromIndex, toIndex, value);
}

@Override
public BytesRefIterator iterator() {
return in.iterator();
}

@Override
public void fillWith(StreamInput streamInput) throws IOException {
in.fillWith(streamInput);
}

@Override
public boolean hasArray() {
return in.hasArray();
Expand Down

0 comments on commit 6b430ae

Please sign in to comment.