Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LUCENE-9827: avoid wasteful recompression for small segments #28

Merged
merged 3 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,6 @@ public final class Lucene90TermVectorsFormat extends Lucene90CompressingTermVect

/** Sole constructor. */
public Lucene90TermVectorsFormat() {
super("Lucene90TermVectorsData", "", CompressionMode.FAST, 1 << 12, 10);
super("Lucene90TermVectorsData", "", CompressionMode.FAST, 1 << 12, 128, 10);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
private final int numDocs;
private final boolean merging;
private final BlockState state;
private final long numChunks; // number of written blocks
private final long numDirtyChunks; // number of incomplete compressed blocks written
private final long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
private final long numDirtyDocs; // cumulative number of docs in incomplete chunks
private boolean closed;

// used by clone
Expand All @@ -106,6 +107,7 @@ private Lucene90CompressingStoredFieldsReader(
this.compressionMode = reader.compressionMode;
this.decompressor = reader.decompressor.clone();
this.numDocs = reader.numDocs;
this.numChunks = reader.numChunks;
this.numDirtyChunks = reader.numDirtyChunks;
this.numDirtyDocs = reader.numDirtyDocs;
this.merging = merging;
Expand Down Expand Up @@ -177,6 +179,7 @@ public Lucene90CompressingStoredFieldsReader(
this.maxPointer = maxPointer;
this.indexReader = indexReader;

numChunks = metaIn.readVLong();
numDirtyChunks = metaIn.readVLong();
numDirtyDocs = metaIn.readVLong();

Expand Down Expand Up @@ -720,6 +723,15 @@ long getNumDirtyChunks() {
return numDirtyChunks;
}

long getNumChunks() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException(
"getNumChunks should only ever get called when the reader is on the current version");
}
assert numChunks >= 0;
return numChunks;
}

int getNumDocs() {
return numDocs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
private int docBase; // doc ID at the beginning of the chunk
private int numBufferedDocs; // docBase + numBufferedDocs == current doc ID

private long numChunks;
private long numDirtyChunks; // number of incomplete compressed blocks written
private long numDirtyDocs; // cumulative number of missing docs in incomplete chunks

Expand Down Expand Up @@ -249,6 +250,7 @@ private boolean triggerFlush() {
}

private void flush() throws IOException {
numChunks++;
indexWriter.writeIndex(numBufferedDocs, fieldsStream.getFilePointer());

// transform end offsets into lengths
Expand Down Expand Up @@ -489,10 +491,7 @@ static void writeTLong(DataOutput out, long l) throws IOException {
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (numBufferedDocs > 0) {
numDirtyChunks++; // incomplete: we had to force this flush
final long expectedChunkDocs =
Math.min(
maxDocsPerChunk, (long) ((double) chunkSize / bufferedDocs.size() * numBufferedDocs));
numDirtyDocs += expectedChunkDocs - numBufferedDocs;
numDirtyDocs += numBufferedDocs;
flush();
} else {
assert bufferedDocs.size() == 0;
Expand All @@ -502,6 +501,7 @@ public void finish(FieldInfos fis, int numDocs) throws IOException {
"Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
}
indexWriter.finish(numDocs, fieldsStream.getFilePointer(), metaStream);
metaStream.writeVLong(numChunks);
metaStream.writeVLong(numDirtyChunks);
metaStream.writeVLong(numDirtyDocs);
CodecUtil.writeFooter(metaStream);
Expand Down Expand Up @@ -615,8 +615,9 @@ public int merge(MergeState mergeState) throws IOException {

// flush any pending chunks
if (numBufferedDocs > 0) {
flush();
numDirtyChunks++; // incomplete: we had to force this flush
numDirtyDocs += numBufferedDocs;
flush();
}

// iterate over each chunk. we use the stored fields index to find chunk boundaries,
Expand Down Expand Up @@ -709,10 +710,10 @@ public int merge(MergeState mergeState) throws IOException {
* ratio can degrade. This is a safety switch.
*/
boolean tooDirty(Lucene90CompressingStoredFieldsReader candidate) {
// more than 1% dirty, or more than hard limit of 1024 dirty chunks
return candidate.getNumDirtyChunks() > 1024
|| (candidate.getNumDirtyChunks() > 1
&& candidate.getNumDirtyDocs() * 100 > candidate.getNumDocs());
// A segment is considered dirty only if it has enough dirty docs to make a full block
// AND more than 1% blocks are dirty.
return candidate.getNumDirtyDocs() > maxDocsPerChunk
&& candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks();
}

private static class CompressingStoredFieldsMergeSub extends DocIDMerger.Sub {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class Lucene90CompressingTermVectorsFormat extends TermVectorsFormat {
private final CompressionMode compressionMode;
private final int chunkSize;
private final int blockSize;
private final int maxDocsPerChunk;

/**
* Create a new {@link Lucene90CompressingTermVectorsFormat}.
Expand All @@ -63,6 +64,7 @@ public class Lucene90CompressingTermVectorsFormat extends TermVectorsFormat {
* @param segmentSuffix a suffix to append to files created by this format
* @param compressionMode the {@link CompressionMode} to use
* @param chunkSize the minimum number of bytes of a single chunk of stored documents
* @param maxDocsPerChunk the maximum number of documents in a single chunk
* @param blockSize the number of chunks to store in an index block.
* @see CompressionMode
*/
Expand All @@ -71,6 +73,7 @@ public Lucene90CompressingTermVectorsFormat(
String segmentSuffix,
CompressionMode compressionMode,
int chunkSize,
int maxDocsPerChunk,
int blockSize) {
this.formatName = formatName;
this.segmentSuffix = segmentSuffix;
Expand All @@ -79,6 +82,7 @@ public Lucene90CompressingTermVectorsFormat(
throw new IllegalArgumentException("chunkSize must be >= 1");
}
this.chunkSize = chunkSize;
this.maxDocsPerChunk = maxDocsPerChunk;
if (blockSize < 1) {
throw new IllegalArgumentException("blockSize must be >= 1");
}
Expand All @@ -104,6 +108,7 @@ public final TermVectorsWriter vectorsWriter(
formatName,
compressionMode,
chunkSize,
maxDocsPerChunk,
blockSize);
}

Expand All @@ -114,6 +119,8 @@ public String toString() {
+ compressionMode
+ ", chunkSize="
+ chunkSize
+ ", maxDocsPerChunk="
+ maxDocsPerChunk
+ ", blockSize="
+ blockSize
+ ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
private final int numDocs;
private boolean closed;
private final BlockPackedReaderIterator reader;
private final long numChunks; // number of written blocks
private final long numDirtyChunks; // number of incomplete compressed blocks written
private final long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
private final long numDirtyDocs; // cumulative number of docs in incomplete chunks
private final long maxPointer; // end of the data section

// used by clone
Expand All @@ -102,6 +103,7 @@ private Lucene90CompressingTermVectorsReader(Lucene90CompressingTermVectorsReade
this.reader =
new BlockPackedReaderIterator(vectorsStream, packedIntsVersion, PACKED_BLOCK_SIZE, 0);
this.version = reader.version;
this.numChunks = reader.numChunks;
this.numDirtyChunks = reader.numDirtyChunks;
this.numDirtyDocs = reader.numDirtyDocs;
this.maxPointer = reader.maxPointer;
Expand Down Expand Up @@ -169,6 +171,7 @@ public Lucene90CompressingTermVectorsReader(
this.indexReader = fieldsIndexReader;
this.maxPointer = fieldsIndexReader.getMaxPointer();

numChunks = metaIn.readVLong();
numDirtyChunks = metaIn.readVLong();
numDirtyDocs = metaIn.readVLong();

Expand Down Expand Up @@ -242,6 +245,15 @@ long getNumDirtyChunks() {
return numDirtyChunks;
}

long getNumChunks() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException(
"getNumChunks should only ever get called when the reader is on the current version");
}
assert numChunks >= 0;
return numChunks;
}

int getNumDocs() {
return numDocs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@
*/
public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWriter {

// hard limit on the maximum number of documents per chunk
static final int MAX_DOCUMENTS_PER_CHUNK = 128;

static final String VECTORS_EXTENSION = "tvd";
static final String VECTORS_INDEX_EXTENSION = "tvx";
static final String VECTORS_META_EXTENSION = "tvm";
Expand All @@ -87,8 +84,9 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
private final Compressor compressor;
private final int chunkSize;

private long numChunks; // number of chunks
private long numDirtyChunks; // number of incomplete compressed blocks written
private long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
private long numDirtyDocs; // cumulative number of docs in incomplete chunks

/** a pending doc */
private class DocData {
Expand Down Expand Up @@ -224,6 +222,7 @@ void addPosition(int position, int startOffset, int length, int payloadLength) {
private final ByteBuffersDataOutput termSuffixes; // buffered term suffixes
private final ByteBuffersDataOutput payloadBytes; // buffered term payloads
private final BlockPackedWriter writer;
private final int maxDocsPerChunk; // hard limit on number of docs per chunk

/** Sole constructor. */
Lucene90CompressingTermVectorsWriter(
Expand All @@ -234,13 +233,15 @@ void addPosition(int position, int startOffset, int length, int payloadLength) {
String formatName,
CompressionMode compressionMode,
int chunkSize,
int maxDocsPerChunk,
int blockShift)
throws IOException {
assert directory != null;
this.segment = si.name;
this.compressionMode = compressionMode;
this.compressor = compressionMode.newCompressor();
this.chunkSize = chunkSize;
this.maxDocsPerChunk = maxDocsPerChunk;

numDocs = 0;
pendingDocs = new ArrayDeque<>();
Expand Down Expand Up @@ -373,10 +374,11 @@ public void addPosition(int position, int startOffset, int endOffset, BytesRef p
}

private boolean triggerFlush() {
return termSuffixes.size() >= chunkSize || pendingDocs.size() >= MAX_DOCUMENTS_PER_CHUNK;
return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk;
}

private void flush() throws IOException {
numChunks++;
final int chunkDocs = pendingDocs.size();
assert chunkDocs > 0 : chunkDocs;

Expand Down Expand Up @@ -712,18 +714,15 @@ private void flushPayloadLengths() throws IOException {
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (!pendingDocs.isEmpty()) {
numDirtyChunks++; // incomplete: we had to force this flush
final long expectedChunkDocs =
Math.min(
MAX_DOCUMENTS_PER_CHUNK,
(long) ((double) chunkSize / termSuffixes.size() * pendingDocs.size()));
numDirtyDocs += expectedChunkDocs - pendingDocs.size();
numDirtyDocs += pendingDocs.size();
flush();
}
if (numDocs != this.numDocs) {
throw new RuntimeException(
"Wrote " + this.numDocs + " docs, finish called with numDocs=" + numDocs);
}
indexWriter.finish(numDocs, vectorsStream.getFilePointer(), metaStream);
metaStream.writeVLong(numChunks);
metaStream.writeVLong(numDirtyChunks);
metaStream.writeVLong(numDirtyDocs);
CodecUtil.writeFooter(metaStream);
Expand Down Expand Up @@ -845,8 +844,9 @@ public int merge(MergeState mergeState) throws IOException {

// flush any pending chunks
if (!pendingDocs.isEmpty()) {
flush();
numDirtyChunks++; // incomplete: we had to force this flush
numDirtyDocs += pendingDocs.size();
flush();
}

// iterate over each chunk. we use the vectors index to find chunk boundaries,
Expand Down Expand Up @@ -937,10 +937,10 @@ public int merge(MergeState mergeState) throws IOException {
* ratio can degrade. This is a safety switch.
*/
boolean tooDirty(Lucene90CompressingTermVectorsReader candidate) {
// more than 1% dirty, or more than hard limit of 1024 dirty chunks
return candidate.getNumDirtyChunks() > 1024
|| (candidate.getNumDirtyChunks() > 1
&& candidate.getNumDirtyDocs() * 100 > candidate.getNumDocs());
// A segment is considered dirty only if it has enough dirty docs to make a full block
// AND more than 1% blocks are dirty.
return candidate.getNumDirtyDocs() > maxDocsPerChunk
&& candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ final class SortingTermVectorsConsumer extends TermVectorsConsumer {

private static final TermVectorsFormat TEMP_TERM_VECTORS_FORMAT =
new Lucene90CompressingTermVectorsFormat(
"TempTermVectors", "", SortingStoredFieldsConsumer.NO_COMPRESSION, 8 * 1024, 10);
"TempTermVectors", "", SortingStoredFieldsConsumer.NO_COMPRESSION, 8 * 1024, 128, 10);
TrackingTmpOutputDirectoryWrapper tmpDirectory;

SortingTermVectorsConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public CompressingCodec(
name, segmentSuffix, compressionMode, chunkSize, maxDocsPerChunk, blockShift);
this.termVectorsFormat =
new Lucene90CompressingTermVectorsFormat(
name, segmentSuffix, compressionMode, chunkSize, blockShift);
name, segmentSuffix, compressionMode, chunkSize, maxDocsPerChunk, blockShift);
}

/** Creates a compressing codec with an empty segment suffix */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void testChunkCleanup() throws IOException {

// we have to enforce certain things like maxDocsPerChunk to cause dirty chunks to be created
// by this test.
iwConf.setCodec(CompressingCodec.randomInstance(random(), 4 * 1024, 100, false, 8));
iwConf.setCodec(CompressingCodec.randomInstance(random(), 4 * 1024, 4, false, 8));
IndexWriter iw = new IndexWriter(dir, iwConf);
DirectoryReader ir = DirectoryReader.open(iw);
for (int i = 0; i < 5; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testChunkCleanup() throws IOException {

// we have to enforce certain things like maxDocsPerChunk to cause dirty chunks to be created
// by this test.
iwConf.setCodec(CompressingCodec.randomInstance(random(), 4 * 1024, 100, false, 8));
iwConf.setCodec(CompressingCodec.randomInstance(random(), 4 * 1024, 4, false, 8));
IndexWriter iw = new IndexWriter(dir, iwConf);
DirectoryReader ir = DirectoryReader.open(iw);
for (int i = 0; i < 5; i++) {
Expand Down