Skip to content

Commit

Permalink
Adding reader settings for moving fst offheap
Browse files Browse the repository at this point in the history
  • Loading branch information
jainankitk committed Mar 8, 2019
1 parent 3b2d836 commit 906c222
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 9 deletions.
Expand Up @@ -193,7 +193,7 @@ public BlockTreeTermsReader(PostingsReaderBase postingsReader, SegmentReadState
final long indexStartFP = indexIn.readVLong();
FieldReader previous = fields.put(fieldInfo.name,
new FieldReader(this, fieldInfo, numTerms, rootCode, sumTotalTermFreq, sumDocFreq, docCount,
indexStartFP, longsSize, indexIn, minTerm, maxTerm));
indexStartFP, longsSize, indexIn, minTerm, maxTerm, state.directory.getReaderAttributes()));
if (previous != null) {
throw new CorruptIndexException("duplicate field: " + fieldInfo.name, termsIn);
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexOptions;
Expand All @@ -45,6 +46,8 @@ public final class FieldReader extends Terms implements Accountable {

// private final boolean DEBUG = BlockTreeTermsWriter.DEBUG;

public static final String FST_OFFHEAP_ATTRIBUTE = "fst.offheap";

private static final long BASE_RAM_BYTES_USED =
RamUsageEstimator.shallowSizeOfInstance(FieldReader.class)
+ 3 * RamUsageEstimator.shallowSizeOfInstance(BytesRef.class);
Expand All @@ -67,6 +70,12 @@ public final class FieldReader extends Terms implements Accountable {

FieldReader(BlockTreeTermsReader parent, FieldInfo fieldInfo, long numTerms, BytesRef rootCode, long sumTotalTermFreq, long sumDocFreq, int docCount,
long indexStartFP, int longsSize, IndexInput indexIn, BytesRef minTerm, BytesRef maxTerm) throws IOException {
this(parent, fieldInfo, numTerms, rootCode, sumTotalTermFreq, sumDocFreq, docCount, indexStartFP, longsSize, indexIn, minTerm, maxTerm, null);
}


FieldReader(BlockTreeTermsReader parent, FieldInfo fieldInfo, long numTerms, BytesRef rootCode, long sumTotalTermFreq, long sumDocFreq, int docCount,
long indexStartFP, int longsSize, IndexInput indexIn, BytesRef minTerm, BytesRef maxTerm, Map<String, String> readerAttributes) throws IOException {
assert numTerms > 0;
this.fieldInfo = fieldInfo;
//DEBUG = BlockTreeTermsReader.DEBUG && fieldInfo.name.equals("id");
Expand All @@ -88,10 +97,13 @@ public final class FieldReader extends Terms implements Accountable {

if (indexIn != null) {
final IndexInput clone = indexIn.clone();
//System.out.println("start=" + indexStartFP + " field=" + fieldInfo.name);
clone.seek(indexStartFP);
// Initialize FST offheap if index is MMapDirectory and
// docCount != sumDocFreq implying field is not primary key
if (clone instanceof ByteBufferIndexInput && this.docCount != this.sumDocFreq) {

final boolean fstOffHeap = readerAttributes != null && "true".equals(readerAttributes.get(FST_OFFHEAP_ATTRIBUTE));
// Initialize FST offheap if the fst.offheap reader attribute is set or index is
// MMapDirectory and docCount != sumDocFreq implying field is not primary key
if (fstOffHeap || (clone instanceof ByteBufferIndexInput && this.docCount != this.sumDocFreq)) {
index = new FST<>(clone, ByteSequenceOutputs.getSingleton(), new OffHeapFSTStore());
} else {
index = new FST<>(clone, ByteSequenceOutputs.getSingleton());
Expand Down
Expand Up @@ -733,6 +733,7 @@ public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
directoryOrig = d;
directory = new LockValidatingDirectoryWrapper(d, writeLock);

directoryOrig.setReaderAttributes(config.getReaderAttributes());
analyzer = config.getAnalyzer();
mergeScheduler = config.getMergeScheduler();
mergeScheduler.setInfoStream(infoStream);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.PrintStream;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.lucene.analysis.Analyzer;
Expand Down Expand Up @@ -482,6 +483,15 @@ public IndexWriterConfig setCommitOnClose(boolean commitOnClose) {
return this;
}

/**
* Sets read time attributes that will be loaded into SegmentReadState while
* opening the index
*/
public IndexWriterConfig setReaderAttributes(Map<String, String> readerAttributes) {
this.readerAttributes = readerAttributes;
return this;
}

/** We only allow sorting on these types */
private static final EnumSet<SortField.Type> ALLOWED_INDEX_SORT_TYPES = EnumSet.of(SortField.Type.STRING,
SortField.Type.LONG,
Expand All @@ -507,6 +517,7 @@ public IndexWriterConfig setIndexSort(Sort sort) {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(super.toString());
sb.append("readerAttributes=").append(this.readerAttributes).append("\n");
sb.append("writer=").append(writer.get()).append("\n");
return sb.toString();
}
Expand Down
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.lucene.index;


import java.util.Collections;
import java.util.Map;
import java.util.Set;

import org.apache.lucene.analysis.Analyzer;
Expand Down Expand Up @@ -50,6 +50,10 @@ public class LiveIndexWriterConfig {
* points are deleted. */
protected volatile IndexDeletionPolicy delPolicy;

// modified by IndexWriterConfig
/** Atributes used by SegmentReadState during index open */
protected Map<String, String> readerAttributes;

/** {@link IndexCommit} that {@link IndexWriter} is
* opened on. */
protected volatile IndexCommit commit;
Expand Down Expand Up @@ -203,7 +207,11 @@ public synchronized LiveIndexWriterConfig setRAMBufferSizeMB(double ramBufferSiz
public double getRAMBufferSizeMB() {
return ramBufferSizeMB;
}


public Map<String, String> getReaderAttributes() {
return this.readerAttributes;
}

/**
* Determines the minimal number of documents required before the buffered
* in-memory documents are flushed as a new Segment. Large values generally
Expand Down
25 changes: 25 additions & 0 deletions lucene/core/src/java/org/apache/lucene/store/Directory.java
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.NoSuchFileException;
import java.util.Collection; // for javadocs
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import org.apache.lucene.util.IOUtils;
Expand All @@ -48,6 +49,12 @@
* @see FilterDirectory
*/
public abstract class Directory implements Closeable {

/**
* Reader only attributes for this directory.
*/
private Map<String, String> readerAttributes;

/**
* Returns names of all files stored in this directory.
* The output must be in sorted (UTF-16, java's {@link String#compareTo}) order.
Expand Down Expand Up @@ -210,4 +217,22 @@ protected void ensureOpen() throws AlreadyClosedException {}
public Set<String> getPendingDeletions() throws IOException {
return Collections.emptySet();
}

/**
* Sets reader only attributes for this directory.
*
* @lucene.internal
*/
public void setReaderAttributes(Map<String, String> readerAttributes) {
this.readerAttributes = readerAttributes;
}

/**
* Returns reader only attributes for this directory.
*
* @lucene.internal
*/
public Map<String, String> getReaderAttributes() {
return this.readerAttributes;
}
}
11 changes: 8 additions & 3 deletions lucene/core/src/test/org/apache/lucene/util/fst/TestFSTs.java
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand All @@ -40,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.codecs.blocktree.FieldReader;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.DirectoryReader;
Expand Down Expand Up @@ -313,7 +315,8 @@ public void testRealTerms() throws Exception {
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH));

final IndexWriterConfig conf = newIndexWriterConfig(analyzer).setMaxBufferedDocs(-1).setRAMBufferSizeMB(64);
final IndexWriterConfig conf = newIndexWriterConfig(analyzer).setMaxBufferedDocs(-1).setRAMBufferSizeMB(64)
.setReaderAttributes(new HashMap<String, String>() {{put(FieldReader.FST_OFFHEAP_ATTRIBUTE, "true");}});
final Path tempDir = createTempDir("fstlines");
final Directory dir = newFSDirectory(tempDir);
final IndexWriter writer = new IndexWriter(dir, conf);
Expand Down Expand Up @@ -847,7 +850,8 @@ public void testPrimaryKeys() throws Exception {
System.out.println("TEST: cycle=" + cycle);
}
RandomIndexWriter w = new RandomIndexWriter(random(), dir,
newIndexWriterConfig(new MockAnalyzer(random())).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
newIndexWriterConfig(new MockAnalyzer(random())).setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setReaderAttributes(new HashMap<String, String>() {{put(FieldReader.FST_OFFHEAP_ATTRIBUTE, "true");}}));
Document doc = new Document();
Field idField = newStringField("id", "", Field.Store.NO);
doc.add(idField);
Expand Down Expand Up @@ -977,7 +981,8 @@ public void testRandomTermLookup() throws Exception {
Directory dir = newDirectory();

RandomIndexWriter w = new RandomIndexWriter(random(), dir,
newIndexWriterConfig(new MockAnalyzer(random())).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
newIndexWriterConfig(new MockAnalyzer(random())).setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setReaderAttributes(new HashMap<String, String>() {{put(FieldReader.FST_OFFHEAP_ATTRIBUTE, "true");}}));
Document doc = new Document();
Field f = newStringField("field", "", Field.Store.NO);
doc.add(f);
Expand Down

0 comments on commit 906c222

Please sign in to comment.