Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

/**
* Thrown by lucene on detecting that Thread.interrupt() had been called. Unlike Java's
* InterruptedException, this exception is not checked..
* InterruptedException, this exception is not checked.
*/
public final class ThreadInterruptedException extends RuntimeException {
public ThreadInterruptedException(InterruptedException ie) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.lucene.index;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import java.io.IOException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
Expand All @@ -37,23 +38,25 @@
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.index.SuppressingConcurrentMergeScheduler;
import org.apache.lucene.tests.mockfile.HandleLimitFS;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.LineFileDocs;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.ThreadInterruptedException;

/** MultiThreaded IndexWriter tests */
@LuceneTestCase.SuppressCodecs("SimpleText")
// Some of these tests spin uncoordinated threads that generate lots of documents and have very
// small limits on in-memory buffers, etc. They occasionally can exceed the default
// max handles limit...
@HandleLimitFS.MaxOpenHandles(limit = HandleLimitFS.MaxOpenHandles.MAX_OPEN_FILES * 4)
public class TestIndexWriterWithThreads extends LuceneTestCase {

// Used by test cases below
private static class IndexerThread extends Thread {

private final CyclicBarrier syncStart;
Throwable error;
IndexWriter writer;
Expand All @@ -66,7 +69,6 @@ public IndexerThread(IndexWriter writer, boolean noErrors, CyclicBarrier syncSta
this.syncStart = syncStart;
}

@SuppressForbidden(reason = "Thread sleep")
@Override
public void run() {
try {
Expand Down Expand Up @@ -97,14 +99,11 @@ public void run() {
System.out.println("TEST: expected exc:");
ioe.printStackTrace(System.out);
}
// System.out.println(Thread.currentThread().getName() + ": hit exc");
// ioe.printStackTrace(System.out);

if (ioe.getMessage().startsWith("fake disk full at")
|| ioe.getMessage().equals("now failing on purpose")) {
try {
Thread.sleep(1);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
if (Thread.currentThread().isInterrupted()) {
throw new ThreadInterruptedException(new InterruptedException());
}
if (fullCount++ >= 5) break;
} else {
Expand Down Expand Up @@ -188,77 +187,76 @@ public void testImmediateDiskFullWithThreads() throws Exception {
// threads are trying to add documents. Strictly
// speaking, this isn't valid us of Lucene's APIs, but we
// still want to be robust to this case:
@SuppressForbidden(reason = "Thread sleep")
public void testCloseWithThreads() throws Exception {
int NUM_THREADS = 3;
int numIterations = TEST_NIGHTLY ? 7 : 3;
for (int iter = 0; iter < numIterations; iter++) {
if (VERBOSE) {
System.out.println("\nTEST: iter=" + iter);
}
Directory dir = newDirectory();
try (Directory dir = newDirectory()) {
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
try (IndexWriter writer =
new IndexWriter(
dir,
newIndexWriterConfig(new MockAnalyzer(random()))
.setMaxBufferedDocs(10)
.setMergeScheduler(new ConcurrentMergeScheduler())
.setMergePolicy(newLogMergePolicy(4))
.setCommitOnClose(false))) {
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler())
.setSuppressExceptions();

CyclicBarrier syncStart = new CyclicBarrier(NUM_THREADS + 1);
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new IndexerThread(writer, false, syncStart);
threads[i].start();
}
syncStart.await();

IndexWriter writer =
new IndexWriter(
dir,
newIndexWriterConfig(new MockAnalyzer(random()))
.setMaxBufferedDocs(10)
.setMergeScheduler(new ConcurrentMergeScheduler())
.setMergePolicy(newLogMergePolicy(4))
.setCommitOnClose(false));
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
// wait for at least this many documents to be added, then resume.
int minDocsAdded = RandomizedTest.randomIntBetween(1, 50);
while (true) {
Thread.yield(); // intentional, spin loop instead of wall-clock wait.

CyclicBarrier syncStart = new CyclicBarrier(NUM_THREADS + 1);
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new IndexerThread(writer, false, syncStart);
threads[i].start();
}
syncStart.await();
int docsAdded = 0;
for (int i = 0; i < NUM_THREADS; i++) {
docsAdded += threads[i].addCount;

boolean done = false;
while (!done) {
Thread.sleep(100);
for (int i = 0; i < NUM_THREADS; i++)
// only stop when at least one thread has added a doc
if (threads[i].addCount > 0) {
done = true;
break;
} else if (!threads[i].isAlive()) {
fail("thread failed before indexing a single document");
}
}
if (!threads[i].isAlive()) {
fail("thread failed before indexing a single document: " + threads[i]);
}
}

if (VERBOSE) {
System.out.println("\nTEST: now close");
}
try {
writer.commit();
} finally {
writer.close();
}
if (docsAdded > minDocsAdded) {
break;
}
}

// Make sure threads that are adding docs are not hung:
for (int i = 0; i < NUM_THREADS; i++) {
// Without fix for LUCENE-1130: one of the
// threads will hang
threads[i].join();
// now commit and then close the index writer, while background threads still keep
// adding documents.
writer.commit();
}

// [DW] this is unreachable once join() returns a thread cannot be alive.
if (threads[i].isAlive()) fail("thread seems to be hung");
}
// Make sure threads that are adding docs are not hung.
for (int i = 0; i < NUM_THREADS; i++) {
// Without fix for LUCENE-1130: one of the threads will hang
if (threads[i] != null) {
threads[i].join();
}
}

// Quick test to make sure index is not corrupt:
IndexReader reader = DirectoryReader.open(dir);
PostingsEnum tdocs = TestUtil.docs(random(), reader, "field", new BytesRef("aaa"), null, 0);
int count = 0;
while (tdocs.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
count++;
// Quick test to make sure index is not corrupt.
try (IndexReader reader = DirectoryReader.open(dir)) {
PostingsEnum tdocs =
TestUtil.docs(random(), reader, "field", new BytesRef("aaa"), null, 0);
int count = 0;
while (tdocs.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
count++;
}
assertTrue(count > 0);
}
}
assertTrue(count > 0);
reader.close();

dir.close();
}
}

Expand Down Expand Up @@ -564,7 +562,7 @@ public void testRollbackAndCommitWithThreads() throws Exception {
writerRef.get().commit();
final LineFileDocs docs = new LineFileDocs(random());
final Thread[] threads = new Thread[threadCount];
final int iters = atLeast(100);
final int iters = atLeast(25);
final AtomicBoolean failed = new AtomicBoolean();
final Lock rollbackLock = new ReentrantLock();
final Lock commitLock = new ReentrantLock();
Expand All @@ -574,7 +572,6 @@ public void testRollbackAndCommitWithThreads() throws Exception {
@Override
public void run() {
for (int iter = 0; iter < iters && !failed.get(); iter++) {
// final int x = random().nextInt(5);
final int x = random().nextInt(3);
try {
switch (x) {
Expand Down Expand Up @@ -637,8 +634,8 @@ public void run() {
threads[threadID].start();
}

for (int threadID = 0; threadID < threadCount; threadID++) {
threads[threadID].join();
for (var t : threads) {
t.join();
}

assertTrue(!failed.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ public static IndexWriterConfig newIndexWriterConfig(Analyzer a) {
public static IndexWriterConfig newIndexWriterConfig(Random r, Analyzer a) {
IndexWriterConfig c = new IndexWriterConfig(a);
c.setSimilarity(classEnvRule.similarity);
if (VERBOSE) {
if (INFOSTREAM) {
// Even though TestRuleSetupAndRestoreClassEnv calls
// InfoStream.setDefault, we do it again here so that
// the PrintStreamInfoStream.messageID increments so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private FileSystem initializeFileSystem() {
avoid.addAll(Arrays.asList(a.value()));
}
FileSystem fs = FileSystems.getDefault();
if (LuceneTestCase.VERBOSE && allowed(avoid, VerboseFS.class)) {
if (LuceneTestCase.INFOSTREAM && allowed(avoid, VerboseFS.class)) {
fs =
new VerboseFS(
fs,
Expand Down