Skip to content

Commit

Permalink
First major update in the NRT index updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
amccurry committed Feb 25, 2012
1 parent 5f6047d commit 84dc476
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import com.nearinfinity.blur.manager.writer.BlurIndexCommiter;
import com.nearinfinity.blur.manager.writer.BlurIndexReader;
import com.nearinfinity.blur.manager.writer.BlurIndexRefresher;
import com.nearinfinity.blur.manager.writer.BlurIndexWriter;
import com.nearinfinity.blur.manager.writer.BlurNRTIndex;
import com.nearinfinity.blur.metrics.BlurMetrics;
import com.nearinfinity.blur.store.blockcache.BlockDirectory;
import com.nearinfinity.blur.store.blockcache.BlockDirectoryCache;
Expand Down Expand Up @@ -534,27 +534,16 @@ private BlurIndex openShard(String table, String shard) throws IOException {
reader.init();
index = reader;
} else {
// BlurRtIndex rtIndex = new BlurRtIndex();
// rtIndex.setAnalyzer(getAnalyzer(table));
// rtIndex.setDirectory(dir);
// rtIndex.setLimit(1000);
// rtIndex.init();
// index = rtIndex;
BlurIndexWriter writer = new BlurIndexWriter();
writer.setCloser(_closer);
writer.setCommiter(_commiter);
BlurNRTIndex writer = new BlurNRTIndex();
writer.setAnalyzer(getAnalyzer(table));
writer.setDirectory(dir);
writer.setRefresher(_refresher);
writer.setBlurMetrics(_blurMetrics);
writer.setExecutorService(_openerService);
writer.setShard(shard);
writer.setTable(table);
writer.setIndexDeletionPolicy(_indexDeletionPolicy);
writer.setSimilarity(getSimilarity(table));
writer.setClusterStatus(_clusterStatus);
writer.setTimeBetweenCommits(TimeUnit.SECONDS.toMillis(60));
writer.setTimeBetweenRefreshs(TimeUnit.MILLISECONDS.toNanos(10));
writer.init();
index = writer;

}
_filterCache.opening(table, shard, index);
return warmUp(index, table, shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public abstract class BlurIndex {

public abstract boolean replaceRow(boolean wal, Row row) throws IOException;
public abstract void replaceRow(boolean wal, Row row) throws IOException;

public abstract IndexReader getIndexReader(boolean forceRefresh) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void close() throws IOException {
}

@Override
public boolean replaceRow(boolean wal, Row row) throws IOException {
public void replaceRow(boolean wal, Row row) throws IOException {
throw new RuntimeException("Read-only shard");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,9 @@ public Void call() throws Exception {
}

@Override
public boolean replaceRow(boolean wal, Row row) throws IOException {
public void replaceRow(boolean wal, Row row) throws IOException {
synchronized (_writer) {
_rowIndexWriter.replace(wal, row);
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package com.nearinfinity.blur.manager.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.NRTManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.SearcherWarmer;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.Version;

import com.nearinfinity.blur.analysis.BlurAnalyzer;
import com.nearinfinity.blur.log.Log;
import com.nearinfinity.blur.log.LogFactory;
import com.nearinfinity.blur.thrift.generated.Record;
import com.nearinfinity.blur.thrift.generated.Row;
import com.nearinfinity.blur.utils.BlurConstants;
import com.nearinfinity.blur.utils.RowWalIndexWriter;

public class BlurNRTIndex extends BlurIndex {

private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class);
private static final Field PRIME_DOC_FIELD = new Field(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO, Index.NOT_ANALYZED_NO_NORMS);
private static final Term ROW_ID = new Term(BlurConstants.ROW_ID);
private static final boolean APPLY_ALL_DELETES = true;
private SearcherWarmer _warmer = new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {

}
};
private NRTManager _nrtManager;
private SearcherManager _manager;
private AtomicBoolean _isClosed = new AtomicBoolean();
private IndexWriter _writer;
private Thread _committer;

// externally set
private BlurAnalyzer _analyzer;
private Directory _directory;
private ExecutorService _executorService;
private String _table;
private String _shard;
private long _timeBetweenCommits;
private long _timeBetweenRefreshs;
private long _lastRefresh;

public void init() throws IOException {
IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, _analyzer);
NRTCachingDirectory cachingDirectory = new NRTCachingDirectory(_directory, 5.0, 60);
_writer = new IndexWriter(cachingDirectory, conf);
_nrtManager = new NRTManager(_writer, _executorService, _warmer, APPLY_ALL_DELETES);
_manager = _nrtManager.getSearcherManager(APPLY_ALL_DELETES);
_lastRefresh = System.nanoTime();
startCommiter();
}

private void startCommiter() {
_committer = new Thread(new Runnable() {
@Override
public void run() {
while (!_isClosed.get()) {
try {
LOG.info("Committing of [{0}/{1}].", _table, _shard);
_writer.commit();
} catch (CorruptIndexException e) {
LOG.error("Error during commit of [{0}/{1}].", _table, _shard, e);
} catch (IOException e) {
LOG.error("Error during commit of [{0}/{1}].", _table, _shard, e);
}
try {
Thread.sleep(_timeBetweenCommits);
} catch (InterruptedException e) {
LOG.error("Unknown error with committer thread [{0}/{1}].", _table, _shard, e);
}
}
}
});
_committer.setDaemon(true);
_committer.setName("Commit Thread [" + _table + "/" + _shard + "]");
_committer.start();
}

@Override
public void replaceRow(boolean wal, Row row) throws IOException {
_nrtManager.updateDocuments(ROW_ID.createTerm(row.id), getDocs(row));
}

@Override
public void deleteRow(boolean wal, String rowId) throws IOException {
_nrtManager.deleteDocuments(ROW_ID.createTerm(rowId));
}

@Override
public IndexReader getIndexReader(boolean forceRefresh) throws IOException {
if (forceRefresh) {
_nrtManager.maybeReopen(APPLY_ALL_DELETES);
}
if (_lastRefresh + _timeBetweenRefreshs < System.nanoTime()) {
_manager.maybeReopen();
_lastRefresh = System.nanoTime();
}
IndexSearcher searcher = _manager.acquire();
return searcher.getIndexReader();
}

@Override
public void close() throws IOException {
_isClosed.set(true);
_manager.close();
_nrtManager.close();
}

@Override
public void refresh() throws IOException {
_nrtManager.maybeReopen(APPLY_ALL_DELETES);
}

@Override
public AtomicBoolean isClosed() {
return _isClosed;
}

@Override
public void optimize(int numberOfSegmentsPerShard) throws IOException {
LOG.info("Optimize is not supported");
}

private List<Document> getDocs(Row row) {
List<Record> records = row.records;
int size = records.size();
final String rowId = row.id;
final StringBuilder builder = new StringBuilder();
List<Document> docs = new ArrayList<Document>(size);
for (int i = 0; i < size; i++) {
Document document = convert(rowId, records.get(i), builder);
if (i == 0) {
document.add(PRIME_DOC_FIELD);
}
docs.add(document);
}
return docs;
}

private Document convert(String rowId, Record record, StringBuilder builder) {
Document document = new Document();
document.add(new Field(BlurConstants.ROW_ID, rowId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
document.add(new Field(BlurConstants.RECORD_ID, record.recordId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
RowWalIndexWriter.addColumns(document, _analyzer, builder, record.family, record.columns);
return document;
}

public void setAnalyzer(BlurAnalyzer analyzer) {
_analyzer = analyzer;
}

public void setDirectory(Directory directory) {
_directory = directory;
}

public void setExecutorService(ExecutorService executorService) {
_executorService = executorService;
}

public void setTable(String table) {
_table = table;
}

public void setShard(String shard) {
_shard = shard;
}

public void setTimeBetweenCommits(long timeBetweenCommits) {
_timeBetweenCommits = timeBetweenCommits;
}

public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
_timeBetweenRefreshs = timeBetweenRefreshs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,24 @@ public class PrimeDocCache {

public static final OpenBitSet EMPTY_BIT_SET = new OpenBitSet();
private static final Term PRIME_DOC_TERM = new Term(PRIME_DOC, PRIME_DOC_VALUE);
private static Map<IndexReader,OpenBitSet> primeDocMap = new ConcurrentHashMap<IndexReader, OpenBitSet>();

private static Map<Object, OpenBitSet> primeDocMap = new ConcurrentHashMap<Object, OpenBitSet>();

public static synchronized OpenBitSet getPrimeDocBitSet(IndexReader reader) throws IOException {
OpenBitSet bitSet = primeDocMap.get(reader);
Object key = reader.getCoreCacheKey();
OpenBitSet bitSet = primeDocMap.get(key);
if (bitSet == null) {
reader.addReaderFinishedListener(new ReaderFinishedListener() {
@Override
public void finished(IndexReader reader) {
primeDocMap.remove(reader);
Object key = reader.getCoreCacheKey();
LOG.debug("Current size [" + primeDocMap.size() + "] Prime Doc BitSet removing for segment [" + reader + "]");
primeDocMap.remove(key);
}
});
LOG.info("Prime Doc BitSet missing for segment [" + reader + "]");
LOG.debug("Prime Doc BitSet missing for segment [" + reader + "] current size [" + primeDocMap.size() + "]");
bitSet = new OpenBitSet(reader.maxDoc());
primeDocMap.put(reader,bitSet);
primeDocMap.put(key, bitSet);
TermDocs termDocs = reader.termDocs(PRIME_DOC_TERM);
while (termDocs.next()) {
bitSet.set(termDocs.doc());
Expand Down

0 comments on commit 84dc476

Please sign in to comment.