Skip to content

Commit

Permalink
Replace Lucene DataInput/DataOutput with Elasticsearch StreamInput/St…
Browse files Browse the repository at this point in the history
…reamOutput (#77118)

In a number of places, we read and write binary data into byte arrays using lucene's
DataInput and DataOutput abstractions. In lucene 9 these abstractions are changing
the endianness of their read/writeInt methods. To avoid dealing with this formatting
change, this commit changes things to use elasticsearch StreamInput/StreamOutput
abstractions instead, which have basically the same API but will preserve endianness.

Relates to #73324
  • Loading branch information
romseygeek committed Sep 1, 2021
1 parent 2396bad commit 5d48fdc
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.RejectedExecutionHandler)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.TwoPhaseIterator;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.index.mapper.RangeType;

import java.io.IOException;
Expand Down Expand Up @@ -61,7 +61,7 @@ public Scorer scorer(LeafReaderContext context) throws IOException {

final TwoPhaseIterator iterator = new TwoPhaseIterator(values) {

ByteArrayDataInput in = new ByteArrayDataInput();
ByteArrayStreamInput in = new ByteArrayStreamInput();
BytesRef otherFrom = new BytesRef();
BytesRef otherTo = new BytesRef();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.elasticsearch.action.search;

import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.RAMOutputStream;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -56,7 +56,7 @@ static String buildScrollId(AtomicArray<? extends SearchPhaseResult> searchPhase
static ParsedScrollId parseScrollId(String scrollId) {
try {
byte[] bytes = Base64.getUrlDecoder().decode(scrollId);
ByteArrayDataInput in = new ByteArrayDataInput(bytes);
ByteArrayStreamInput in = new ByteArrayStreamInput(bytes);
final boolean includeContextUUID;
final String type;
final String firstChunk = in.readString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public ByteArrayStreamInput() {
reset(BytesRef.EMPTY_BYTES);
}

public ByteArrayStreamInput(byte[] bytes) {
reset(bytes);
}

@Override
public int read() throws IOException {
return readByte() & 0xFF;
Expand Down
44 changes: 0 additions & 44 deletions server/src/main/java/org/elasticsearch/common/util/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.elasticsearch.common.util;

import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;


/** Utility methods to do byte-level encoding. These methods are biased towards little-endian byte order because it is the most
* common byte order and reading several bytes at once may be optimizable in the future with the help of sun.mist.Unsafe. */
Expand Down Expand Up @@ -85,45 +82,4 @@ public static float readFloatLE(byte[] arr, int offset) {
return Float.intBitsToFloat(readIntLE(arr, offset));
}

/** Same as DataOutput#writeVLong but accepts negative values (written on 9 bytes). */
public static void writeVLong(ByteArrayDataOutput out, long i) {
for (int k = 0; k < 8 && (i & ~0x7FL) != 0L; ++k) {
out.writeByte((byte)((i & 0x7FL) | 0x80L));
i >>>= 7;
}
out.writeByte((byte)i);
}

/** Same as DataOutput#readVLong but can read negative values (read on 9 bytes). */
public static long readVLong(ByteArrayDataInput in) {
// unwinded because of hotspot bugs, see Lucene's impl
byte b = in.readByte();
if (b >= 0) return b;
long i = b & 0x7FL;
b = in.readByte();
i |= (b & 0x7FL) << 7;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 14;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 21;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 28;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 35;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 42;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 49;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0xFFL) << 56;
return i;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
package org.elasticsearch.index.fielddata.plain;

import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.index.fielddata.LeafFieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;

Expand Down Expand Up @@ -42,7 +42,7 @@ public SortedBinaryDocValues getBytesValues() {
return new SortedBinaryDocValues() {

int count;
final ByteArrayDataInput in = new ByteArrayDataInput();
final ByteArrayStreamInput in = new ByteArrayStreamInput();
final BytesRef scratch = new BytesRef();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

import org.apache.lucene.document.StoredField;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.IndexFieldData;
Expand Down Expand Up @@ -204,16 +204,15 @@ public BytesRef binaryValue() {
try {
CollectionUtils.sortAndDedup(bytesList);
int size = bytesList.size();
final byte[] bytes = new byte[totalSize + (size + 1) * 5];
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
BytesStreamOutput out = new BytesStreamOutput(totalSize + (size + 1) * 5);
out.writeVInt(size); // write total number of values
for (int i = 0; i < size; i ++) {
final byte[] value = bytesList.get(i);
int valueLength = value.length;
out.writeVInt(valueLength);
out.writeBytes(value, 0, valueLength);
}
return new BytesRef(bytes, 0, out.getPosition());
return out.bytes().toBytesRef();
} catch (IOException e) {
throw new ElasticsearchException("Failed to get binary value", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
package org.elasticsearch.index.mapper;

import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;

import java.io.IOException;
import java.net.InetAddress;
Expand All @@ -28,8 +28,7 @@ enum BinaryRangeUtil {
;

static BytesRef encodeIPRanges(Set<RangeFieldMapper.Range> ranges) throws IOException {
final byte[] encoded = new byte[5 + (16 * 2) * ranges.size()];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
BytesStreamOutput out = new BytesStreamOutput(5 + (16 * 2) * ranges.size());
out.writeVInt(ranges.size());
for (RangeFieldMapper.Range range : ranges) {
InetAddress fromValue = (InetAddress) range.from;
Expand All @@ -40,10 +39,10 @@ static BytesRef encodeIPRanges(Set<RangeFieldMapper.Range> ranges) throws IOExce
byte[] encodedToValue = InetAddressPoint.encode(toValue);
out.writeBytes(encodedToValue, 0, encodedToValue.length);
}
return new BytesRef(encoded, 0, out.getPosition());
return out.bytes().toBytesRef();
}

static List<RangeFieldMapper.Range> decodeIPRanges(BytesRef encodedRanges) {
static List<RangeFieldMapper.Range> decodeIPRanges(BytesRef encodedRanges) throws IOException {
return decodeRanges(encodedRanges, RangeType.IP, BinaryRangeUtil::decodeIP);
}

Expand All @@ -59,19 +58,18 @@ static BytesRef encodeLongRanges(Set<RangeFieldMapper.Range> ranges) throws IOEx
Comparator<RangeFieldMapper.Range> toComparator = Comparator.comparingLong(range -> ((Number) range.to).longValue());
sortedRanges.sort(fromComparator.thenComparing(toComparator));

final byte[] encoded = new byte[5 + (9 * 2) * sortedRanges.size()];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
BytesStreamOutput out = new BytesStreamOutput(5 + (9 * 2) * sortedRanges.size());
out.writeVInt(sortedRanges.size());
for (RangeFieldMapper.Range range : sortedRanges) {
byte[] encodedFrom = encodeLong(((Number) range.from).longValue());
out.writeBytes(encodedFrom, encodedFrom.length);
byte[] encodedTo = encodeLong(((Number) range.to).longValue());
out.writeBytes(encodedTo, encodedTo.length);
}
return new BytesRef(encoded, 0, out.getPosition());
return out.bytes().toBytesRef();
}

static List<RangeFieldMapper.Range> decodeLongRanges(BytesRef encodedRanges) {
static List<RangeFieldMapper.Range> decodeLongRanges(BytesRef encodedRanges) throws IOException {
return decodeRanges(encodedRanges, RangeType.LONG,
BinaryRangeUtil::decodeLong);
}
Expand All @@ -82,36 +80,35 @@ static BytesRef encodeDoubleRanges(Set<RangeFieldMapper.Range> ranges) throws IO
Comparator<RangeFieldMapper.Range> toComparator = Comparator.comparingDouble(range -> ((Number) range.to).doubleValue());
sortedRanges.sort(fromComparator.thenComparing(toComparator));

final byte[] encoded = new byte[5 + (8 * 2) * sortedRanges.size()];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
BytesStreamOutput out = new BytesStreamOutput(5 + (8 * 2) * sortedRanges.size());
out.writeVInt(sortedRanges.size());
for (RangeFieldMapper.Range range : sortedRanges) {
byte[] encodedFrom = encodeDouble(((Number) range.from).doubleValue());
out.writeBytes(encodedFrom, encodedFrom.length);
byte[] encodedTo = encodeDouble(((Number) range.to).doubleValue());
out.writeBytes(encodedTo, encodedTo.length);
}
return new BytesRef(encoded, 0, out.getPosition());
return out.bytes().toBytesRef();
}

static List<RangeFieldMapper.Range> decodeDoubleRanges(BytesRef encodedRanges) {
static List<RangeFieldMapper.Range> decodeDoubleRanges(BytesRef encodedRanges) throws IOException {
return decodeRanges(encodedRanges, RangeType.DOUBLE,
BinaryRangeUtil::decodeDouble);
}

static List<RangeFieldMapper.Range> decodeFloatRanges(BytesRef encodedRanges) {
static List<RangeFieldMapper.Range> decodeFloatRanges(BytesRef encodedRanges) throws IOException {
return decodeRanges(encodedRanges, RangeType.FLOAT,
BinaryRangeUtil::decodeFloat);
}

static List<RangeFieldMapper.Range> decodeRanges(BytesRef encodedRanges, RangeType rangeType,
TriFunction<byte[], Integer, Integer, Object> decodeBytes) {
TriFunction<byte[], Integer, Integer, Object> decodeBytes) throws IOException {

RangeType.LengthType lengthType = rangeType.lengthType;
ByteArrayDataInput in = new ByteArrayDataInput();
ByteArrayStreamInput in = new ByteArrayStreamInput();
in.reset(encodedRanges.bytes, encodedRanges.offset, encodedRanges.length);
int numRanges = in.readVInt();

int numRanges = in.readVInt();
List<RangeFieldMapper.Range> ranges = new ArrayList<>(numRanges);

final byte[] bytes = encodedRanges.bytes;
Expand All @@ -137,16 +134,15 @@ static BytesRef encodeFloatRanges(Set<RangeFieldMapper.Range> ranges) throws IOE
Comparator<RangeFieldMapper.Range> toComparator = Comparator.comparingDouble(range -> ((Number) range.to).floatValue());
sortedRanges.sort(fromComparator.thenComparing(toComparator));

final byte[] encoded = new byte[5 + (4 * 2) * sortedRanges.size()];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
BytesStreamOutput out = new BytesStreamOutput(5 + (4 * 2) * sortedRanges.size());
out.writeVInt(sortedRanges.size());
for (RangeFieldMapper.Range range : sortedRanges) {
byte[] encodedFrom = encodeFloat(((Number) range.from).floatValue());
out.writeBytes(encodedFrom, encodedFrom.length);
byte[] encodedTo = encodeFloat(((Number) range.to).floatValue());
out.writeBytes(encodedTo, encodedTo.length);
}
return new BytesRef(encoded, 0, out.getPosition());
return out.bytes().toBytesRef();
}

static byte[] encodeDouble(double number) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public BytesRef encodeRanges(Set<RangeFieldMapper.Range> ranges) throws IOExcept
}

@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return LONG.decodeRanges(bytes);
}

Expand Down Expand Up @@ -293,7 +293,7 @@ public BytesRef encodeRanges(Set<RangeFieldMapper.Range> ranges) throws IOExcept
}

@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return BinaryRangeUtil.decodeFloatRanges(bytes);
}

Expand Down Expand Up @@ -364,7 +364,7 @@ public BytesRef encodeRanges(Set<RangeFieldMapper.Range> ranges) throws IOExcept
}

@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return BinaryRangeUtil.decodeDoubleRanges(bytes);
}

Expand Down Expand Up @@ -438,7 +438,7 @@ public BytesRef encodeRanges(Set<RangeFieldMapper.Range> ranges) throws IOExcept
}

@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return LONG.decodeRanges(bytes);
}

Expand Down Expand Up @@ -497,7 +497,7 @@ public BytesRef encodeRanges(Set<RangeFieldMapper.Range> ranges) throws IOExcept
}

@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return BinaryRangeUtil.decodeLongRanges(bytes);
}

Expand Down Expand Up @@ -681,7 +681,7 @@ protected final Query createRangeQuery(String field, boolean hasDocValues, Objec
// No need to take into account Range#includeFrom or Range#includeTo, because from and to have already been
// rounded up via parseFrom and parseTo methods.
public abstract BytesRef encodeRanges(Set<RangeFieldMapper.Range> ranges) throws IOException;
public abstract List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes);
public abstract List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException;

/**
* Given the Range.to or Range.from Object value from a Range instance, converts that value into a Double. Before converting, it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.common.util;

import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -57,42 +55,4 @@ public void testDouble() throws IOException {
}
}

public void testVLong() throws IOException {
final long[] data = new long[scaledRandomIntBetween(1000, 10000)];
for (int i = 0; i < data.length; ++i) {
switch (randomInt(4)) {
case 0:
data[i] = 0;
break;
case 1:
data[i] = Long.MAX_VALUE;
break;
case 2:
data[i] = Long.MIN_VALUE;
break;
case 3:
data[i] = randomInt(1 << randomIntBetween(2,30));
break;
case 4:
data[i] = randomLong();
break;
default:
throw new AssertionError();
}
}
final byte[] encoded = new byte[ByteUtils.MAX_BYTES_VLONG * data.length];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
for (int i = 0; i < data.length; ++i) {
final int pos = out.getPosition();
ByteUtils.writeVLong(out, data[i]);
if (data[i] < 0) {
assertEquals(ByteUtils.MAX_BYTES_VLONG, out.getPosition() - pos);
}
}
final ByteArrayDataInput in = new ByteArrayDataInput(encoded);
for (int i = 0; i < data.length; ++i) {
assertEquals(data[i], ByteUtils.readVLong(in));
}
}

}

0 comments on commit 5d48fdc

Please sign in to comment.