Skip to content

Commit 15709dd

Browse files
Add binary doc value compression with variable doc count blocks (#137139)
Add compression for binary doc values using Zstd and blocks with a variable number of values. Block-wise LZ4 compression for binary doc values was previously added to Lucene in LUCENE-9211. This was subsequently removed in LUCENE-9378 due to query performance issues. We investigated adding to adding the original Lucene implementation to ES in #112416 and #105301. This previous approach used a constant number of values per block (specifically 32 values). This is nice because it makes it very easy to map a given value index (eg docId for dense values) to the block containing it with blockId = docId / 32. Unfortunately, if values are very large we cannot reduce the number of values per block and (de)compressing a block could cause an OOM. Also, since this is a concern, we have to keep the number of values lower than ideal. This PR instead stores a variable number of documents per block. It stores a minimum of 1 document per block and stops adding values when the size of a block exceeds a threshold. Like the previous version it stores an array of address for the start of each block. Additionally, it stores a parallel array with the value index at the start of each block. When looking up a given value index, if it is not in the current block, we binary search the array of value index starts to find the blockId containing the value. Then look up the address of the block.
1 parent 59f7904 commit 15709dd

File tree

17 files changed

+1183
-116
lines changed

17 files changed

+1183
-116
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.metadata.DataStream;
2828
import org.elasticsearch.common.logging.LogConfigurator;
2929
import org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec;
30+
import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode;
3031
import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
3132
import org.openjdk.jmh.annotations.Benchmark;
3233
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -257,7 +258,13 @@ private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeE
257258
);
258259
config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
259260
config.setMergePolicy(new LogByteSizeMergePolicy());
260-
var docValuesFormat = new ES819TSDBDocValuesFormat(4096, 512, optimizedMergeEnabled);
261+
var docValuesFormat = new ES819TSDBDocValuesFormat(
262+
4096,
263+
512,
264+
optimizedMergeEnabled,
265+
BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1,
266+
true
267+
);
261268
config.setCodec(new Elasticsearch92Lucene103Codec() {
262269
@Override
263270
public DocValuesFormat getDocValuesFormatForField(String field) {

server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
public class PerFieldFormatSupplier {
4242

4343
private static final Set<String> INCLUDE_META_FIELDS;
44+
private static final Set<String> EXCLUDE_MAPPER_TYPES;
4445

4546
static {
4647
// TODO: should we just allow all fields to use tsdb doc values codec?
@@ -53,6 +54,7 @@ public class PerFieldFormatSupplier {
5354
// Don't the include _recovery_source_size and _recovery_source fields, since their values can be trimmed away in
5455
// RecoverySourcePruneMergePolicy, which leads to inconsistencies between merge stats and actual values.
5556
INCLUDE_META_FIELDS = Collections.unmodifiableSet(includeMetaField);
57+
EXCLUDE_MAPPER_TYPES = Set.of("geo_shape");
5658
}
5759

5860
private static final DocValuesFormat docValuesFormat = new Lucene90DocValuesFormat();
@@ -145,6 +147,10 @@ boolean useTSDBDocValuesFormat(final String field) {
145147
return false;
146148
}
147149

150+
if (excludeMapperTypes(field)) {
151+
return false;
152+
}
153+
148154
return mapperService != null
149155
&& mapperService.getIndexSettings().useTimeSeriesDocValuesFormat()
150156
&& mapperService.getIndexSettings().isES87TSDBCodecEnabled();
@@ -154,4 +160,29 @@ private boolean excludeFields(String fieldName) {
154160
return fieldName.startsWith("_") && INCLUDE_META_FIELDS.contains(fieldName) == false;
155161
}
156162

163+
private boolean excludeMapperTypes(String fieldName) {
164+
var typeName = getMapperType(fieldName);
165+
if (typeName == null) {
166+
return false;
167+
}
168+
return EXCLUDE_MAPPER_TYPES.contains(getMapperType(fieldName));
169+
}
170+
171+
private boolean isTimeSeriesModeIndex() {
172+
return mapperService != null && IndexMode.TIME_SERIES == mapperService.getIndexSettings().getMode();
173+
}
174+
175+
private boolean isLogsModeIndex() {
176+
return mapperService != null && IndexMode.LOGSDB == mapperService.getIndexSettings().getMode();
177+
}
178+
179+
String getMapperType(final String field) {
180+
if (mapperService != null) {
181+
Mapper mapper = mapperService.mappingLookup().getMapper(field);
182+
if (mapper != null) {
183+
return mapper.typeName();
184+
}
185+
}
186+
return null;
187+
}
157188
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb;
11+
12+
import org.apache.lucene.codecs.compressing.CompressionMode;
13+
import org.elasticsearch.index.codec.zstd.ZstdCompressionMode;
14+
15+
public enum BinaryDVCompressionMode {
16+
17+
NO_COMPRESS((byte) 0, null),
18+
COMPRESSED_ZSTD_LEVEL_1((byte) 1, new ZstdCompressionMode(1));
19+
20+
public final byte code;
21+
private final CompressionMode compressionMode;
22+
23+
private static final BinaryDVCompressionMode[] values = new BinaryDVCompressionMode[values().length];
24+
static {
25+
for (BinaryDVCompressionMode mode : values()) {
26+
values[mode.code] = mode;
27+
}
28+
}
29+
30+
BinaryDVCompressionMode(byte code, CompressionMode compressionMode) {
31+
this.code = code;
32+
this.compressionMode = compressionMode;
33+
}
34+
35+
public static BinaryDVCompressionMode fromMode(byte code) {
36+
if (code < 0 || code >= values.length) {
37+
throw new IllegalStateException("unknown compression mode [" + code + "]");
38+
}
39+
return values[code];
40+
}
41+
42+
public CompressionMode compressionMode() {
43+
if (compressionMode == null) {
44+
throw new UnsupportedOperationException("BinaryDVCompressionMode [" + code + "] does not support compression");
45+
}
46+
return compressionMode;
47+
}
48+
49+
public record BlockHeader(boolean isCompressed) {
50+
static final byte IS_COMPRESSED = 0x1;
51+
52+
public static BlockHeader fromByte(byte header) {
53+
boolean isCompressed = (header & IS_COMPRESSED) != 0;
54+
return new BlockHeader(isCompressed);
55+
}
56+
57+
public byte toByte() {
58+
byte header = 0;
59+
if (isCompressed) {
60+
header |= IS_COMPRESSED;
61+
}
62+
return header;
63+
}
64+
}
65+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb.es819;
11+
12+
import org.apache.lucene.codecs.CodecUtil;
13+
import org.apache.lucene.store.ChecksumIndexInput;
14+
import org.apache.lucene.store.Directory;
15+
import org.apache.lucene.store.IOContext;
16+
import org.apache.lucene.store.IndexOutput;
17+
import org.apache.lucene.util.packed.DirectMonotonicWriter;
18+
import org.elasticsearch.core.IOUtils;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
23+
final class BlockMetadataAccumulator implements Closeable {
24+
25+
private final DelayedOffsetAccumulator blockAddressAcc;
26+
private final DelayedOffsetAccumulator blockDocRangeAcc;
27+
28+
BlockMetadataAccumulator(Directory dir, IOContext context, IndexOutput data, long addressesStart) throws IOException {
29+
boolean success = false;
30+
try {
31+
blockDocRangeAcc = new DelayedOffsetAccumulator(dir, context, data, "block-doc-ranges", 0);
32+
blockAddressAcc = new DelayedOffsetAccumulator(dir, context, data, "block-addresses", addressesStart);
33+
success = true;
34+
} finally {
35+
if (success == false) {
36+
IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't
37+
}
38+
}
39+
}
40+
41+
public void addDoc(long numDocsInBlock, long blockLenInBytes) throws IOException {
42+
blockDocRangeAcc.addDoc(numDocsInBlock);
43+
blockAddressAcc.addDoc(blockLenInBytes);
44+
}
45+
46+
public void build(IndexOutput meta, IndexOutput data) throws IOException {
47+
long dataAddressesStart = data.getFilePointer();
48+
blockAddressAcc.build(meta, data);
49+
long dataDocRangeStart = data.getFilePointer();
50+
long addressesLength = dataDocRangeStart - dataAddressesStart;
51+
meta.writeLong(addressesLength);
52+
53+
meta.writeLong(dataDocRangeStart);
54+
blockDocRangeAcc.build(meta, data);
55+
long docRangesLen = data.getFilePointer() - dataDocRangeStart;
56+
meta.writeLong(docRangesLen);
57+
}
58+
59+
@Override
60+
public void close() throws IOException {
61+
IOUtils.closeWhileHandlingException(blockAddressAcc, blockDocRangeAcc);
62+
}
63+
64+
/**
65+
* Like OffsetsAccumulator builds offsets and stores in a DirectMonotonicWriter. But write to temp file
66+
* rather than directly to a DirectMonotonicWriter because the number of values is unknown.
67+
*/
68+
static final class DelayedOffsetAccumulator implements Closeable {
69+
70+
private final Directory dir;
71+
private final long startOffset;
72+
73+
private int numValues = 0;
74+
private final IndexOutput tempOutput;
75+
private final String suffix;
76+
77+
DelayedOffsetAccumulator(Directory dir, IOContext context, IndexOutput data, String suffix, long startOffset) throws IOException {
78+
this.dir = dir;
79+
this.startOffset = startOffset;
80+
this.suffix = suffix;
81+
82+
boolean success = false;
83+
try {
84+
tempOutput = dir.createTempOutput(data.getName(), suffix, context);
85+
CodecUtil.writeHeader(tempOutput, ES819TSDBDocValuesFormat.META_CODEC + suffix, ES819TSDBDocValuesFormat.VERSION_CURRENT);
86+
success = true;
87+
} finally {
88+
if (success == false) {
89+
IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't
90+
}
91+
}
92+
}
93+
94+
void addDoc(long delta) throws IOException {
95+
tempOutput.writeVLong(delta);
96+
numValues++;
97+
}
98+
99+
void build(IndexOutput meta, IndexOutput data) throws IOException {
100+
CodecUtil.writeFooter(tempOutput);
101+
IOUtils.close(tempOutput);
102+
103+
// write the offsets info to the meta file by reading from temp file
104+
try (ChecksumIndexInput tempInput = dir.openChecksumInput(tempOutput.getName());) {
105+
CodecUtil.checkHeader(
106+
tempInput,
107+
ES819TSDBDocValuesFormat.META_CODEC + suffix,
108+
ES819TSDBDocValuesFormat.VERSION_CURRENT,
109+
ES819TSDBDocValuesFormat.VERSION_CURRENT
110+
);
111+
Throwable priorE = null;
112+
try {
113+
final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(
114+
meta,
115+
data,
116+
numValues + 1,
117+
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
118+
);
119+
120+
long offset = startOffset;
121+
writer.add(offset);
122+
for (int i = 0; i < numValues; ++i) {
123+
offset += tempInput.readVLong();
124+
writer.add(offset);
125+
}
126+
writer.finish();
127+
} catch (Throwable e) {
128+
priorE = e;
129+
} finally {
130+
CodecUtil.checkFooter(tempInput, priorE);
131+
}
132+
}
133+
}
134+
135+
@Override
136+
public void close() throws IOException {
137+
if (tempOutput != null) {
138+
IOUtils.close(tempOutput, () -> dir.deleteFile(tempOutput.getName()));
139+
}
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)