Skip to content

Commit

Permalink
HSEARCH-3775 Implement forced refreshes in LuceneWriteWorkProcessor
Browse files Browse the repository at this point in the history
Essentially, after this change, it is possible to write to the index and
force the change to be visible to search queries without requesting a
commit.

Before this change, we would systematically commit in order to make the
changes visible, which 1) is overkill and 2) is not effective when a
refresh_interval different from 0 is configured.
  • Loading branch information
yrodiere committed Feb 13, 2020
1 parent 4a3b31b commit baec608
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 94 deletions.
Expand Up @@ -252,8 +252,8 @@ SearchException unableToDeleteEntryFromIndex(String tenantId, String id,
SearchException unableToCommitIndex(@Param EventContext context, @Cause Exception e);

@Message(id = ID_OFFSET_2 + 22,
value = "Could not open an index reader.")
SearchException unableToCreateIndexReader(@Param EventContext context, @Cause Exception e);
value = "Unable to refresh.")
SearchException unableToRefreshIndex(@Param EventContext context, @Cause Exception e);

@Message(id = ID_OFFSET_2 + 24,
value = "A multi-index scope cannot include both a Lucene index and another type of index."
Expand Down
Expand Up @@ -38,6 +38,11 @@ public interface IndexAccessor {
*/
void commit() throws IOException;

/**
* Refreshes the underlying index readers.
*/
void refresh() throws IOException;

/**
* @return The index writer delegator.
*/
Expand Down
Expand Up @@ -96,6 +96,11 @@ public void commit() throws IOException {
}
}

@Override
public void refresh() throws IOException {
indexReaderProvider.refresh();
}

@Override
public IndexWriterDelegator getIndexWriterDelegator() throws IOException {
return indexWriterProvider.getOrCreate();
Expand Down
Expand Up @@ -19,6 +19,12 @@ public interface IndexReaderProvider {
*/
void clear() throws IOException;

/**
* Make sure the index reader returned by the next call to {@link #getOrCreate()}
* will return an up-to-date view of the index.
*/
void refresh() throws IOException;

/**
* @return A ready-to-use index reader, with its reference count already increased.
* Callers are responsible for calling {@link DirectoryReader#decRef()} when they are done with the index reader.
Expand Down
Expand Up @@ -48,6 +48,15 @@ public synchronized void clear() throws IOException {
setCurrentReaderEntry( null );
}

@Override
public void refresh() throws IOException {
IndexReaderEntry entry = currentReaderEntry;

if ( entry != null ) {
entry.forceRefresh();
}
}

@Override
public DirectoryReader getOrCreate() throws IOException {
IndexReaderEntry entry = currentReaderEntry;
Expand Down Expand Up @@ -103,13 +112,29 @@ private static class IndexReaderEntry {
private final TimingSource timingSource;
private final long expiration;

private volatile boolean refreshForced = false;

private IndexReaderEntry(DirectoryReader reader, TimingSource timingSource, int refreshInterval) {
this.reader = reader;
this.timingSource = timingSource;
this.expiration = refreshInterval == 0 ? 0 : timingSource.getMonotonicTimeEstimate() + refreshInterval;
}

public void forceRefresh() {
refreshForced = true;
}

/**
* @return {@code true} if the reader is still fresh enough to be used,
* i.e. if it is completely up-to-date with the state of the index writer
* OR is out-of-date by less than the configured refresh interval,
* and refresh wasn't forced by a previous write.
* @throws IOException If an I/O failure occurs.
*/
boolean isFresh() throws IOException {
if ( refreshForced ) {
return false;
}
if ( expiration == 0 || expiration < timingSource.getMonotonicTimeEstimate() ) {
// The last refresh was a long time ago. Let's check if the reader is really fresh.
return reader.isCurrent();
Expand Down
Expand Up @@ -29,6 +29,11 @@ public void clear() {
// Nothing to do
}

@Override
public void refresh() {
// Nothing to do
}

@Override
public DirectoryReader getOrCreate() throws IOException {
return DirectoryReader.open( directoryHolder.get() );
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.hibernate.search.engine.reporting.IndexFailureContext;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.impl.Throwables;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;
import org.hibernate.search.util.common.reporting.EventContext;

Expand All @@ -42,6 +43,7 @@ public class LuceneWriteWorkProcessor implements BatchingExecutor.WorkProcessor
private List<LuceneWriteWork<?>> previousWorkSetsUncommittedWorks = new ArrayList<>();

private boolean workSetForcesCommit;
private boolean workSetForcesRefresh;
private List<LuceneWriteWork<?>> workSetUncommittedWorks = new ArrayList<>();
private boolean workSetHasFailure;

Expand Down Expand Up @@ -79,10 +81,8 @@ public CompletableFuture<?> endBatch() {
}

public void beforeWorkSet(DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
workSetForcesCommit = DocumentCommitStrategy.FORCE.equals( commitStrategy )
// We need to commit in order to make the changes visible
// TODO HSEARCH-3775 this may not be true with the NRT implementation from Search 5
|| DocumentRefreshStrategy.FORCE.equals( refreshStrategy );
workSetForcesCommit = DocumentCommitStrategy.FORCE.equals( commitStrategy );
workSetForcesRefresh = DocumentRefreshStrategy.FORCE.equals( refreshStrategy );
workSetUncommittedWorks.clear();
workSetHasFailure = false;
}
Expand Down Expand Up @@ -128,6 +128,7 @@ public void afterSuccessfulWorkSet() {
}
catch (RuntimeException e) {
cleanUpAfterFailure( e, "Commit after a set of index works" );
// We'll skip the refresh, but that's okay: we just reset the writer/reader anyway.
throw e;
}
finally {
Expand All @@ -140,6 +141,21 @@ public void afterSuccessfulWorkSet() {
previousWorkSetsUncommittedWorks.addAll( workSetUncommittedWorks );
workSetUncommittedWorks.clear();
}

if ( workSetForcesRefresh ) {
// In case of failure, just propagate the exception:
// we don't expect a refresh failure to affect the writer.
refresh();
}
}

private void refresh() {
try {
indexAccessor.refresh();
}
catch (RuntimeException | IOException e) {
throw log.unableToRefreshIndex( eventContext, e );
}
}

private void commit() {
Expand Down

0 comments on commit baec608

Please sign in to comment.