Skip to content

Commit

Permalink
The WAL is finished! Fixed #178
Browse files Browse the repository at this point in the history
  • Loading branch information
amccurry committed Mar 13, 2012
1 parent a47da45 commit f1355cd
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 100 deletions.
Expand Up @@ -549,7 +549,8 @@ private BlurIndex openShard(String table, String shard) throws IOException {
writer.setTimeBetweenRefreshs(TimeUnit.MILLISECONDS.toNanos(500));
writer.setNrtCachingMaxCachedMB(60);
writer.setNrtCachingMaxMergeSizeMB(5.0);
writer.setRecorder(_recorder);
writer.setWalPath(_walPath);
writer.setConfiguration(_configuration);
writer.init();
index = writer;
}
Expand Down
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.lucene.analysis.KeywordAnalyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
Expand Down Expand Up @@ -64,12 +66,15 @@ public class LocalIndexServer extends AbstractIndexServer {
private int _blockSize = 65536;
private CompressionCodec _compression = CompressedFieldDataDirectory.DEFAULT_COMPRESSION;
private ExecutorService _executorService = Executors.newCachedThreadPool();
private Path _walPath;
private Configuration _configuration = new Configuration();

public LocalIndexServer(File file) {
public LocalIndexServer(File file, Path walPath) {
_localDir = file;
_localDir.mkdirs();
_closer = new BlurIndexCloser();
_closer.init();
_walPath = walPath;
}

@Override
Expand Down Expand Up @@ -149,6 +154,8 @@ private BlurIndex openIndex(String table, String shard, Directory dir) throws Co
index.setShard(shard);
index.setSimilarity(getSimilarity(table));
index.setTable(table);
index.setWalPath(new Path(new Path(_walPath,table),shard));
index.setConfiguration(_configuration);
index.init();
return index;
}
Expand Down
Expand Up @@ -3,20 +3,16 @@
import static com.nearinfinity.blur.lucene.LuceneConstant.LUCENE_VERSION;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.NRTManager;
Expand All @@ -32,30 +28,13 @@
import com.nearinfinity.blur.log.LogFactory;
import com.nearinfinity.blur.thrift.generated.Record;
import com.nearinfinity.blur.thrift.generated.Row;
import com.nearinfinity.blur.utils.BlurConstants;
import com.nearinfinity.blur.utils.PrimeDocCache;
import com.nearinfinity.blur.utils.RowIndexWriter;

public class BlurNRTIndex extends BlurIndex {

private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class);
private static final Field PRIME_DOC_FIELD = new Field(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO, Index.NOT_ANALYZED_NO_NORMS);
private static final Term ROW_ID = new Term(BlurConstants.ROW_ID);
private static final boolean APPLY_ALL_DELETES = true;
private SearcherWarmer _warmer = new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {
IndexReader indexReader = s.getIndexReader();
IndexReader[] subReaders = indexReader.getSequentialSubReaders();
if (subReaders == null) {
PrimeDocCache.getPrimeDocBitSet(indexReader);
} else {
for (IndexReader reader : subReaders) {
PrimeDocCache.getPrimeDocBitSet(reader);
}
}
}
};

private NRTManager _nrtManager;
private SearcherManager _manager;
private AtomicBoolean _isClosed = new AtomicBoolean();
Expand All @@ -76,8 +55,28 @@ public void warm(IndexSearcher s) throws IOException {
private double _nrtCachingMaxCachedMB = 5.0;
private Thread _refresher;
private TransactionRecorder _recorder;
private Configuration _configuration;

private SearcherWarmer _warmer = new SearcherWarmer() {
@Override
public void warm(IndexSearcher s) throws IOException {
IndexReader indexReader = s.getIndexReader();
IndexReader[] subReaders = indexReader.getSequentialSubReaders();
if (subReaders == null) {
PrimeDocCache.getPrimeDocBitSet(indexReader);
} else {
for (IndexReader reader : subReaders) {
PrimeDocCache.getPrimeDocBitSet(reader);
}
}
}
};
private Path _walPath;

public void init() throws IOException {
Path walTablePath = new Path(_walPath, _table);
Path walShardPath = new Path(walTablePath, _shard);

IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
conf.setSimilarity(_similarity);
Expand All @@ -86,6 +85,14 @@ public void init() throws IOException {

NRTCachingDirectory cachingDirectory = new NRTCachingDirectory(_directory, _nrtCachingMaxMergeSizeMB, _nrtCachingMaxCachedMB);
_writer = new IndexWriter(cachingDirectory, conf);
_recorder = new TransactionRecorder();
_recorder.setAnalyzer(_analyzer);
_recorder.setConfiguration(_configuration);
_recorder.setWalPath(walShardPath);

_recorder.init();
_recorder.replay(_writer);

_nrtManager = new NRTManager(_writer, _executorService, _warmer, APPLY_ALL_DELETES);
_manager = _nrtManager.getSearcherManager(APPLY_ALL_DELETES);
_lastRefresh = System.nanoTime();
Expand Down Expand Up @@ -127,7 +134,7 @@ public void run() {
while (!_isClosed.get()) {
try {
LOG.info("Committing of [{0}/{1}].", _table, _shard);
_writer.commit();
_recorder.commit(_writer);
} catch (CorruptIndexException e) {
LOG.error("Error during commit of [{0}/{1}].", _table, _shard, e);
} catch (IOException e) {
Expand Down Expand Up @@ -156,27 +163,13 @@ public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOE
deleteRow(waitToBeVisible, wal, row.id);
return;
}
if (wal) {
if (_recorder == null) {
LOG.warn("No transaction recorder set.");
} else {
_recorder.replaceRow(row);
}
}
long generation = _nrtManager.updateDocuments(ROW_ID.createTerm(row.id), getDocs(row));
long generation = _recorder.replaceRow(wal, row, _nrtManager);
waitToBeVisible(waitToBeVisible, generation);
}

@Override
public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
if (wal) {
if (_recorder == null) {
LOG.warn("No transaction recorder set.");
} else {
_recorder.deleteRow(rowId);
}
}
long generation = _nrtManager.deleteDocuments(ROW_ID.createTerm(rowId));
long generation = _recorder.deleteRow(wal, rowId, _nrtManager);
waitToBeVisible(waitToBeVisible, generation);
}

Expand All @@ -193,6 +186,7 @@ public void close() throws IOException {
_committer.interrupt();
_refresher.interrupt();
try {
_recorder.close();
_writer.close();
_manager.close();
_nrtManager.close();
Expand Down Expand Up @@ -236,30 +230,6 @@ private void maybeReopen() throws IOException {
}
}

private List<Document> getDocs(Row row) {
List<Record> records = row.records;
int size = records.size();
final String rowId = row.id;
final StringBuilder builder = new StringBuilder();
List<Document> docs = new ArrayList<Document>(size);
for (int i = 0; i < size; i++) {
Document document = convert(rowId, records.get(i), builder);
if (i == 0) {
document.add(PRIME_DOC_FIELD);
}
docs.add(document);
}
return docs;
}

private Document convert(String rowId, Record record, StringBuilder builder) {
Document document = new Document();
document.add(new Field(BlurConstants.ROW_ID, rowId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
document.add(new Field(BlurConstants.RECORD_ID, record.recordId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
RowIndexWriter.addColumns(document, _analyzer, builder, record.family, record.columns);
return document;
}

public void setAnalyzer(BlurAnalyzer analyzer) {
_analyzer = analyzer;
}
Expand Down Expand Up @@ -300,7 +270,11 @@ public void setNrtCachingMaxCachedMB(int nrtCachingMaxCachedMB) {
_nrtCachingMaxCachedMB = nrtCachingMaxCachedMB;
}

public void setRecorder(TransactionRecorder recorder) {
_recorder = recorder;
public void setWalPath(Path walPath) {
_walPath = walPath;
}

public void setConfiguration(Configuration configuration) {
_configuration = configuration;
}
}

0 comments on commit f1355cd

Please sign in to comment.