From f1355cd57fe5a37a508db25cd149e556da74856d Mon Sep 17 00:00:00 2001 From: Aaron McCurry Date: Tue, 13 Mar 2012 18:25:25 -0400 Subject: [PATCH] The WAL is finished! Fixed #178 --- .../indexserver/DistributedIndexServer.java | 3 +- .../manager/indexserver/LocalIndexServer.java | 9 +- .../blur/manager/writer/BlurNRTIndex.java | 108 ++++----- .../manager/writer/TransactionRecorder.java | 223 ++++++++++++++++-- .../blur/thrift/ThriftBlurShardServer.java | 6 +- .../blur/manager/IndexManagerTest.java | 10 +- .../writer/TransactionRecorderTest.java | 86 +++++++ 7 files changed, 345 insertions(+), 100 deletions(-) create mode 100644 src/blur-core/src/test/java/com/nearinfinity/blur/manager/writer/TransactionRecorderTest.java diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DistributedIndexServer.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DistributedIndexServer.java index 23aed5a1..a3850a0c 100644 --- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DistributedIndexServer.java +++ b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DistributedIndexServer.java @@ -549,7 +549,8 @@ private BlurIndex openShard(String table, String shard) throws IOException { writer.setTimeBetweenRefreshs(TimeUnit.MILLISECONDS.toNanos(500)); writer.setNrtCachingMaxCachedMB(60); writer.setNrtCachingMaxMergeSizeMB(5.0); - writer.setRecorder(_recorder); + writer.setWalPath(_walPath); + writer.setConfiguration(_configuration); writer.init(); index = writer; } diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/LocalIndexServer.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/LocalIndexServer.java index 407e7631..0582f5a4 100644 --- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/LocalIndexServer.java +++ b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/LocalIndexServer.java @@ -33,6 +33,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.lucene.analysis.KeywordAnalyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; @@ -64,12 +66,15 @@ public class LocalIndexServer extends AbstractIndexServer { private int _blockSize = 65536; private CompressionCodec _compression = CompressedFieldDataDirectory.DEFAULT_COMPRESSION; private ExecutorService _executorService = Executors.newCachedThreadPool(); + private Path _walPath; + private Configuration _configuration = new Configuration(); - public LocalIndexServer(File file) { + public LocalIndexServer(File file, Path walPath) { _localDir = file; _localDir.mkdirs(); _closer = new BlurIndexCloser(); _closer.init(); + _walPath = walPath; } @Override @@ -149,6 +154,8 @@ private BlurIndex openIndex(String table, String shard, Directory dir) throws Co index.setShard(shard); index.setSimilarity(getSimilarity(table)); index.setTable(table); + index.setWalPath(new Path(new Path(_walPath,table),shard)); + index.setConfiguration(_configuration); index.init(); return index; } diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurNRTIndex.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurNRTIndex.java index 06b2968a..ef0b4176 100644 --- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurNRTIndex.java +++ b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurNRTIndex.java @@ -3,20 +3,16 @@ import static com.nearinfinity.blur.lucene.LuceneConstant.LUCENE_VERSION; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.Field.Index; -import org.apache.lucene.document.Field.Store; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.NRTManager; @@ -32,30 +28,13 @@ import com.nearinfinity.blur.log.LogFactory; import com.nearinfinity.blur.thrift.generated.Record; import com.nearinfinity.blur.thrift.generated.Row; -import com.nearinfinity.blur.utils.BlurConstants; import com.nearinfinity.blur.utils.PrimeDocCache; -import com.nearinfinity.blur.utils.RowIndexWriter; public class BlurNRTIndex extends BlurIndex { private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class); - private static final Field PRIME_DOC_FIELD = new Field(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO, Index.NOT_ANALYZED_NO_NORMS); - private static final Term ROW_ID = new Term(BlurConstants.ROW_ID); private static final boolean APPLY_ALL_DELETES = true; - private SearcherWarmer _warmer = new SearcherWarmer() { - @Override - public void warm(IndexSearcher s) throws IOException { - IndexReader indexReader = s.getIndexReader(); - IndexReader[] subReaders = indexReader.getSequentialSubReaders(); - if (subReaders == null) { - PrimeDocCache.getPrimeDocBitSet(indexReader); - } else { - for (IndexReader reader : subReaders) { - PrimeDocCache.getPrimeDocBitSet(reader); - } - } - } - }; + private NRTManager _nrtManager; private SearcherManager _manager; private AtomicBoolean _isClosed = new AtomicBoolean(); @@ -76,8 +55,28 @@ public void warm(IndexSearcher s) throws IOException { private double _nrtCachingMaxCachedMB = 5.0; private Thread _refresher; private TransactionRecorder _recorder; + private Configuration _configuration; + + private SearcherWarmer _warmer = new SearcherWarmer() { + @Override + public void warm(IndexSearcher s) throws IOException { + IndexReader indexReader = s.getIndexReader(); + IndexReader[] subReaders = indexReader.getSequentialSubReaders(); + if (subReaders == null) { + PrimeDocCache.getPrimeDocBitSet(indexReader); + } else { + for (IndexReader reader : subReaders) { + PrimeDocCache.getPrimeDocBitSet(reader); + } + } + } + }; + private Path _walPath; public void init() throws IOException { + Path walTablePath = new Path(_walPath, _table); + Path walShardPath = new Path(walTablePath, _shard); + IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer); conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5)); conf.setSimilarity(_similarity); @@ -86,6 +85,14 @@ public void init() throws IOException { NRTCachingDirectory cachingDirectory = new NRTCachingDirectory(_directory, _nrtCachingMaxMergeSizeMB, _nrtCachingMaxCachedMB); _writer = new IndexWriter(cachingDirectory, conf); + _recorder = new TransactionRecorder(); + _recorder.setAnalyzer(_analyzer); + _recorder.setConfiguration(_configuration); + _recorder.setWalPath(walShardPath); + + _recorder.init(); + _recorder.replay(_writer); + _nrtManager = new NRTManager(_writer, _executorService, _warmer, APPLY_ALL_DELETES); _manager = _nrtManager.getSearcherManager(APPLY_ALL_DELETES); _lastRefresh = System.nanoTime(); @@ -127,7 +134,7 @@ public void run() { while (!_isClosed.get()) { try { LOG.info("Committing of [{0}/{1}].", _table, _shard); - _writer.commit(); + _recorder.commit(_writer); } catch (CorruptIndexException e) { LOG.error("Error during commit of [{0}/{1}].", _table, _shard, e); } catch (IOException e) { @@ -156,27 +163,13 @@ public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOE deleteRow(waitToBeVisible, wal, row.id); return; } - if (wal) { - if (_recorder == null) { - LOG.warn("No transaction recorder set."); - } else { - _recorder.replaceRow(row); - } - } - long generation = _nrtManager.updateDocuments(ROW_ID.createTerm(row.id), getDocs(row)); + long generation = _recorder.replaceRow(wal, row, _nrtManager); waitToBeVisible(waitToBeVisible, generation); } @Override public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException { - if (wal) { - if (_recorder == null) { - LOG.warn("No transaction recorder set."); - } else { - _recorder.deleteRow(rowId); - } - } - long generation = _nrtManager.deleteDocuments(ROW_ID.createTerm(rowId)); + long generation = _recorder.deleteRow(wal, rowId, _nrtManager); waitToBeVisible(waitToBeVisible, generation); } @@ -193,6 +186,7 @@ public void close() throws IOException { _committer.interrupt(); _refresher.interrupt(); try { + _recorder.close(); _writer.close(); _manager.close(); _nrtManager.close(); @@ -236,30 +230,6 @@ private void maybeReopen() throws IOException { } } - private List getDocs(Row row) { - List records = row.records; - int size = records.size(); - final String rowId = row.id; - final StringBuilder builder = new StringBuilder(); - List docs = new ArrayList(size); - for (int i = 0; i < size; i++) { - Document document = convert(rowId, records.get(i), builder); - if (i == 0) { - document.add(PRIME_DOC_FIELD); - } - docs.add(document); - } - return docs; - } - - private Document convert(String rowId, Record record, StringBuilder builder) { - Document document = new Document(); - document.add(new Field(BlurConstants.ROW_ID, rowId, Store.YES, Index.NOT_ANALYZED_NO_NORMS)); - document.add(new Field(BlurConstants.RECORD_ID, record.recordId, Store.YES, Index.NOT_ANALYZED_NO_NORMS)); - RowIndexWriter.addColumns(document, _analyzer, builder, record.family, record.columns); - return document; - } - public void setAnalyzer(BlurAnalyzer analyzer) { _analyzer = analyzer; } @@ -300,7 +270,11 @@ public void setNrtCachingMaxCachedMB(int nrtCachingMaxCachedMB) { _nrtCachingMaxCachedMB = nrtCachingMaxCachedMB; } - public void setRecorder(TransactionRecorder recorder) { - _recorder = recorder; + public void setWalPath(Path walPath) { + _walPath = walPath; + } + + public void setConfiguration(Configuration configuration) { + _configuration = configuration; } } diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/TransactionRecorder.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/TransactionRecorder.java index 2e65fbf1..c29bcb6a 100644 --- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/TransactionRecorder.java +++ b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/TransactionRecorder.java @@ -1,22 +1,41 @@ package com.nearinfinity.blur.manager.writer; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.record.Utils; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Field.Index; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.NRTManager; +import com.nearinfinity.blur.analysis.BlurAnalyzer; +import com.nearinfinity.blur.index.IndexWriter; import com.nearinfinity.blur.log.Log; import com.nearinfinity.blur.log.LogFactory; import com.nearinfinity.blur.thrift.generated.Column; import com.nearinfinity.blur.thrift.generated.Record; import com.nearinfinity.blur.thrift.generated.Row; +import com.nearinfinity.blur.utils.BlurConstants; +import com.nearinfinity.blur.utils.RowIndexWriter; public class TransactionRecorder { @@ -45,31 +64,104 @@ public static TYPE lookup(byte b) { } private static final Log LOG = LogFactory.getLog(TransactionRecorder.class); - + private static final Field PRIME_DOC_FIELD = new Field(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO, Index.NOT_ANALYZED_NO_NORMS); + private static final Term ROW_ID = new Term(BlurConstants.ROW_ID); private AtomicBoolean running = new AtomicBoolean(true); private Path walPath; private Configuration configuration; private FileSystem fileSystem; - private FSDataOutputStream outputStream; + private AtomicReference outputStream = new AtomicReference(); private AtomicLong lastSync = new AtomicLong(); private long timeBetweenSyncs = TimeUnit.MILLISECONDS.toNanos(10); + private BlurAnalyzer analyzer; public void init() throws IOException { fileSystem = walPath.getFileSystem(configuration); + } + + public void open() throws IOException { if (fileSystem.exists(walPath)) { - outputStream = fileSystem.append(walPath); + throw new IOException("WAL path [" + walPath + "] still exists, replay must have not worked."); } else { - outputStream = fileSystem.create(walPath); + outputStream.set(fileSystem.create(walPath)); + } + if (outputStream == null) { + throw new RuntimeException(); } lastSync.set(System.nanoTime()); } - public void close() { - running.set(true); + public void replay(IndexWriter writer) throws IOException { + if (fileSystem.exists(walPath)) { + FSDataInputStream inputStream = fileSystem.open(walPath); + replay(writer, inputStream); + inputStream.close(); + commit(writer); + } else { + open(); + } + } + + private void replay(IndexWriter writer, DataInputStream inputStream) throws CorruptIndexException, IOException { + long updateCount = 0; + long deleteCount = 0; + byte[] buffer; + while ((buffer = readBuffer(inputStream)) != null) { + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(buffer)); + TYPE lookup = TYPE.lookup(dataInputStream.readByte()); + switch (lookup) { + case ROW: + Row row = readRow(dataInputStream); + writer.updateDocuments(ROW_ID.createTerm(row.id), getDocs(row)); + updateCount++; + continue; + case DELETE: + String deleteRowId = readString(dataInputStream); + writer.deleteDocuments(ROW_ID.createTerm(deleteRowId)); + deleteCount++; + continue; + default: + LOG.error("Unknown type [{0}]", lookup); + throw new IOException("Unknown type [" + lookup + "]"); + } + } + LOG.info("Rows reclaimed form the WAL [{0}]", updateCount); + LOG.info("Deletes reclaimed form the WAL [{0}]", deleteCount); } - private void writeRow(Row row) throws IOException { - outputStream.writeByte(TYPE.ROW.value()); + private byte[] readBuffer(DataInputStream inputStream) { + try { + int length = inputStream.readInt(); + byte[] buffer = new byte[length]; + inputStream.readFully(buffer); + return buffer; + } catch (IOException e) { + if (e instanceof EOFException) { + return null; + } + e.printStackTrace(); + } + return null; + } + + private void rollLog() throws IOException { + LOG.info("Rolling WAL path [" + walPath + "]"); + FSDataOutputStream os = outputStream.get(); + if (os != null) { + os.close(); + } + fileSystem.delete(walPath, false); + open(); + } + + public void close() throws IOException { + synchronized (running) { + running.set(false); + } + outputStream.get().close(); + } + + private static void writeRow(DataOutputStream outputStream, Row row) throws IOException { writeString(outputStream, row.id); List records = row.records; int size = records.size(); @@ -80,7 +172,17 @@ private void writeRow(Row row) throws IOException { } } - private static void writeRecord(FSDataOutputStream outputStream, Record record) throws IOException { + private static Row readRow(DataInputStream inputStream) throws IOException { + Row row = new Row(); + row.id = readString(inputStream); + int size = inputStream.readInt(); + for (int i = 0; i < size; i++) { + row.addToRecords(readRecord(inputStream)); + } + return row; + } + + private static void writeRecord(DataOutputStream outputStream, Record record) throws IOException { writeString(outputStream, record.recordId); writeString(outputStream, record.family); List columns = record.columns; @@ -91,41 +193,85 @@ private static void writeRecord(FSDataOutputStream outputStream, Record record) } } - private static void writeColumn(FSDataOutputStream outputStream, Column column) throws IOException { + private static Record readRecord(DataInputStream inputStream) throws IOException { + Record record = new Record(); + record.recordId = readString(inputStream); + record.family = readString(inputStream); + int size = inputStream.readInt(); + for (int i = 0; i < size; i++) { + record.addToColumns(readColumn(inputStream)); + } + return record; + } + + private static void writeColumn(DataOutputStream outputStream, Column column) throws IOException { writeString(outputStream, column.name); writeString(outputStream, column.value); } - private void writeDelete(String deleteRowId) throws IOException { - outputStream.writeByte(TYPE.DELETE.value()); + private static Column readColumn(DataInputStream inputStream) throws IOException { + Column column = new Column(); + column.name = readString(inputStream); + column.value = readString(inputStream); + return column; + } + + private static void writeDelete(DataOutputStream outputStream, String deleteRowId) throws IOException { writeString(outputStream, deleteRowId); } - private static void writeString(FSDataOutputStream outputStream, String s) throws IOException { + private static void writeString(DataOutputStream outputStream, String s) throws IOException { byte[] bs = s.getBytes(); Utils.writeVInt(outputStream, bs.length); outputStream.write(bs); } - private void sync() throws IOException { + private static String readString(DataInputStream inputStream) throws IOException { + int length = Utils.readVInt(inputStream); + byte[] buffer = new byte[length]; + inputStream.readFully(buffer); + return new String(buffer); + } + + private void sync(byte[] bs) throws IOException { + if (bs == null || outputStream == null) { + throw new RuntimeException("bs [" + bs + "] outputStream [" + outputStream + "]"); + } + FSDataOutputStream os = outputStream.get(); + os.writeInt(bs.length); + os.write(bs); long now = System.nanoTime(); if (lastSync.get() + timeBetweenSyncs < now) { - outputStream.sync(); + os.sync(); lastSync.set(now); } } - public void replaceRow(Row row) throws IOException { + public long replaceRow(boolean wal, Row row, NRTManager nrtManager) throws IOException { synchronized (running) { - writeRow(row); - sync(); + if (wal) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(baos); + outputStream.writeByte(TYPE.ROW.value()); + writeRow(outputStream, row); + outputStream.close(); + sync(baos.toByteArray()); + } + return nrtManager.updateDocuments(ROW_ID.createTerm(row.id), getDocs(row)); } } - public void deleteRow(String rowId) throws IOException { + public long deleteRow(boolean wal, String rowId, NRTManager nrtManager) throws IOException { synchronized (running) { - writeDelete(rowId); - sync(); + if (wal) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(baos); + outputStream.writeByte(TYPE.DELETE.value()); + writeDelete(outputStream, rowId); + outputStream.close(); + sync(baos.toByteArray()); + } + return nrtManager.deleteDocuments(ROW_ID.createTerm(rowId)); } } @@ -136,4 +282,39 @@ public void setWalPath(Path walPath) { public void setConfiguration(Configuration configuration) { this.configuration = configuration; } + + public void commit(IndexWriter writer) throws CorruptIndexException, IOException { + synchronized (running) { + writer.commit(); + rollLog(); + } + } + + private List getDocs(Row row) { + List records = row.records; + int size = records.size(); + final String rowId = row.id; + final StringBuilder builder = new StringBuilder(); + List docs = new ArrayList(size); + for (int i = 0; i < size; i++) { + Document document = convert(rowId, records.get(i), builder); + if (i == 0) { + document.add(PRIME_DOC_FIELD); + } + docs.add(document); + } + return docs; + } + + private Document convert(String rowId, Record record, StringBuilder builder) { + Document document = new Document(); + document.add(new Field(BlurConstants.ROW_ID, rowId, Store.YES, Index.NOT_ANALYZED_NO_NORMS)); + document.add(new Field(BlurConstants.RECORD_ID, record.recordId, Store.YES, Index.NOT_ANALYZED_NO_NORMS)); + RowIndexWriter.addColumns(document, analyzer, builder, record.family, record.columns); + return document; + } + + public void setAnalyzer(BlurAnalyzer analyzer) { + this.analyzer = analyzer; + } } diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/thrift/ThriftBlurShardServer.java b/src/blur-core/src/main/java/com/nearinfinity/blur/thrift/ThriftBlurShardServer.java index 816a4ae4..cb355079 100644 --- a/src/blur-core/src/main/java/com/nearinfinity/blur/thrift/ThriftBlurShardServer.java +++ b/src/blur-core/src/main/java/com/nearinfinity/blur/thrift/ThriftBlurShardServer.java @@ -143,6 +143,8 @@ public static void main(String[] args) throws TTransportException, IOException, BlurFilterCache filterCache = getFilterCache(configuration); BlurIndexWarmup indexWarmup = getIndexWarmup(configuration); IndexDeletionPolicy indexDeletionPolicy = getIndexDeletionPolicy(configuration); + + String walLogPath = configuration.get(BLUR_SHARD_WAL_PATH, "hdfs://localhost"); final DistributedIndexServer indexServer = new DistributedIndexServer(); indexServer.setBlurMetrics(blurMetrics); @@ -157,9 +159,7 @@ public static void main(String[] args) throws TTransportException, IOException, indexServer.setSafeModeDelay(configuration.getLong(BLUR_SHARD_SAFEMODEDELAY, 60000)); indexServer.setWarmup(indexWarmup); indexServer.setIndexDeletionPolicy(indexDeletionPolicy); - String walLogPath = configuration.get(BLUR_SHARD_WAL_PATH, "hdfs://localhost"); - String walLogPathStr = walLogPath + "/" + nodeName.replace(":", "_") + "-" + System.currentTimeMillis() + ".wal"; - indexServer.setWalPath(new Path(walLogPathStr)); + indexServer.setWalPath(new Path(walLogPath)); indexServer.init(); final IndexManager indexManager = new IndexManager(); diff --git a/src/blur-core/src/test/java/com/nearinfinity/blur/manager/IndexManagerTest.java b/src/blur-core/src/test/java/com/nearinfinity/blur/manager/IndexManagerTest.java index 38f511a1..58c83d26 100644 --- a/src/blur-core/src/test/java/com/nearinfinity/blur/manager/IndexManagerTest.java +++ b/src/blur-core/src/test/java/com/nearinfinity/blur/manager/IndexManagerTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLongArray; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -78,16 +79,13 @@ public class IndexManagerTest { private static final String FAMILY = "test-family"; private LocalIndexServer server; private IndexManager indexManager; - private BlurIndexRefresher refresher; @Before - public void setUp() throws BlurException, IOException { + public void setUp() throws BlurException, IOException, InterruptedException { File file = new File("./tmp/indexer-manager-test"); rm(file); new File(new File(file, TABLE), SHARD_NAME).mkdirs(); - refresher = new BlurIndexRefresher(); - refresher.init(); - server = new LocalIndexServer(file); + server = new LocalIndexServer(file,new Path("./tmp/indexer-manager-test")); indexManager = new IndexManager(); indexManager.setStatusCleanupTimerDelay(1000); @@ -95,13 +93,11 @@ public void setUp() throws BlurException, IOException { indexManager.setThreadCount(1); indexManager.setBlurMetrics(new BlurMetrics(new Configuration())); indexManager.init(); - setupData(); } @After public void teardown() { - refresher.close(); indexManager.close(); } diff --git a/src/blur-core/src/test/java/com/nearinfinity/blur/manager/writer/TransactionRecorderTest.java b/src/blur-core/src/test/java/com/nearinfinity/blur/manager/writer/TransactionRecorderTest.java new file mode 100644 index 00000000..3c684026 --- /dev/null +++ b/src/blur-core/src/test/java/com/nearinfinity/blur/manager/writer/TransactionRecorderTest.java @@ -0,0 +1,86 @@ +package com.nearinfinity.blur.manager.writer; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.lucene.analysis.KeywordAnalyzer; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.Version; +import org.junit.Test; + +import com.nearinfinity.blur.analysis.BlurAnalyzer; +import com.nearinfinity.blur.index.IndexWriter; +import com.nearinfinity.blur.thrift.generated.Column; +import com.nearinfinity.blur.thrift.generated.Record; +import com.nearinfinity.blur.thrift.generated.Row; + +public class TransactionRecorderTest { + + @Test + public void testReplay() throws IOException { + String tmpPath = "./tmp/transaction-recorder/wal"; + rm(new File(tmpPath)); + + KeywordAnalyzer analyzer = new KeywordAnalyzer(); + Configuration configuration = new Configuration(); + BlurAnalyzer blurAnalyzer = new BlurAnalyzer(analyzer); + + TransactionRecorder transactionRecorder = new TransactionRecorder(); + transactionRecorder.setAnalyzer(blurAnalyzer); + transactionRecorder.setConfiguration(configuration); + + transactionRecorder.setWalPath(new Path(tmpPath)); + transactionRecorder.init(); + transactionRecorder.open(); + try { + transactionRecorder.replaceRow(true, genRow(), null); + fail("Should NPE"); + } catch (NullPointerException e) { + } + transactionRecorder.close(); //this is done so that the rawfs will flush the file to disk for reading + + RAMDirectory directory = new RAMDirectory(); + IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer); + IndexWriter writer = new IndexWriter(directory, conf); + + TransactionRecorder replayTransactionRecorder = new TransactionRecorder(); + replayTransactionRecorder.setAnalyzer(blurAnalyzer); + replayTransactionRecorder.setConfiguration(configuration); + replayTransactionRecorder.setWalPath(new Path(tmpPath)); + replayTransactionRecorder.init(); + + replayTransactionRecorder.replay(writer); + IndexReader reader = IndexReader.open(directory); + assertEquals(1, reader.numDocs()); + } + + private void rm(File file) { + if (!file.exists()) { + return; + } + if (file.isDirectory()) { + for (File f : file.listFiles()) { + rm(f); + } + } + file.delete(); + } + + private Row genRow() { + Row row = new Row(); + row.id = "1"; + Record record = new Record(); + record.recordId = "1"; + record.family = "test"; + record.addToColumns(new Column("name", "value")); + row.addToRecords(record); + return row; + } + +}