Skip to content

Commit

Permalink
FTS index upgrade fails after any index write operations #53
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Subbotin committed Mar 5, 2019
1 parent b86f941 commit 67c81f2
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.morphology.LuceneMorphology;
import org.apache.lucene.search.SearcherManager;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
Expand Down Expand Up @@ -65,6 +64,7 @@ protected IndexWriter createWriter() {
try {
IndexWriterConfig config = new IndexWriterConfig(analyzer);
config.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
config.setMergePolicy(new LiveUpgradeMergePolicy(config.getMergePolicy()));
return new IndexWriter(directoryProvider.getDirectory(), config);
} catch (IOException e) {
throw new RuntimeException("Error on IndexWriter creation", e);
Expand Down
124 changes: 124 additions & 0 deletions modules/core/src/com/haulmont/fts/core/sys/LiveUpgradeMergePolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2008-2019 Haulmont. All rights reserved.
* Use is subject to license terms, see http://www.cuba-platform.com/commercial-software-license for details.
*/

package com.haulmont.fts.core.sys;

import org.apache.lucene.index.*;
import org.apache.lucene.util.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

/** This {@link MergePolicy} is used for upgrading all existing segments of
* an index when calling {@link IndexWriter#forceMerge(int)}.
* This allows for an as-cheap-as possible upgrade of an older index by only upgrading segments that
* are created by previous Lucene versions. forceMerge does no longer really merge;
* it is just used to "forceMerge" older segment versions away.
* <p>For a fully customizeable upgrade, you can use this like any other {@code MergePolicy}
* and call {@link IndexWriter#forceMerge(int)}:
* <pre class="prettyprint lang-java">
* IndexWriterConfig iwc = new IndexWriterConfig(Version.LUCENE_XX, new KeywordAnalyzer());
* iwc.setMergePolicy(new LiveUpgradeMergePolicy(iwc.getMergePolicy()));
* IndexWriter w = new IndexWriter(dir, iwc);
* w.forceMerge(1);
* w.close();
* </pre>
* <p><b>Warning:</b> This merge policy may reorder documents if the index was partially
* upgraded before calling forceMerge (e.g., documents were added). If your application relies
* on &quot;monotonicity&quot; of doc IDs (which means that the order in which the documents
* were added to the index is preserved), do a forceMerge(1) instead. Please note, the
* delegate {@code MergePolicy} may also reorder documents.
*/
//https://issues.apache.org/jira/browse/LUCENE-7671?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=15952832#comment-15952832
//https://github.com/apache/lucene-solr/pull/151/files
public class LiveUpgradeMergePolicy extends FilterMergePolicy {
// True if the next merge request should do segment upgrades:
private volatile boolean upgradeInProgress;

private static final Logger log = LoggerFactory.getLogger(LiveUpgradeMergePolicy.class);

/**
* Creates a new merge policy instance.
*
* @param in the wrapped {@link MergePolicy}
*/
public LiveUpgradeMergePolicy(MergePolicy in) {
super(in);
}

public void setUpgradeInProgress(boolean upgradeInProgress) {
this.upgradeInProgress = upgradeInProgress;
}

/**
* Returns if the given segment should be upgraded. The default implementation
* will return {@code !Version.LATEST.equals(si.getVersion())},
* so all segments created with a different version number than this Lucene version will
* get upgraded.
*/
protected boolean shouldUpgradeSegment(SegmentCommitInfo si) {
return !Version.LATEST.equals(si.info.getVersion());
}

@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount,
Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
MergeSpecification spec = in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext);

if (upgradeInProgress) {
try {
// first find all old segments
final Map<SegmentCommitInfo, Boolean> oldSegments = new HashMap<>();
for (final SegmentCommitInfo si : segmentInfos) {
final Boolean v = segmentsToMerge.get(si);
if (v != null && shouldUpgradeSegment(si)) {
oldSegments.put(si, v);
}
}

log.trace("findForcedMerges: segmentsToUpgrade={}", oldSegments);

if (oldSegments.isEmpty()) {
return spec;
}

if (spec != null) {
// remove all segments that are in merge specification from oldSegments,
// the resulting set contains all segments that are left over
// and will be merged to one additional segment:
for (final OneMerge om : spec.merges) {
oldSegments.keySet().removeAll(om.segments);
}
}

if (!oldSegments.isEmpty()) {
log.trace("findForcedMerges: {} does not want to merge all old segments, merge remaining ones into new segment: {}",
in.getClass().getSimpleName(), oldSegments);

if (spec == null) {
spec = new MergeSpecification();
}

for (final SegmentCommitInfo si : segmentInfos) {
if (oldSegments.containsKey(si)) {
// Add a merge of only the upgrading segment to the spec
// We don't want to merge, just upgrade
spec.add(new OneMerge(Collections.singletonList(si)));
}
}
}

return spec;

} finally {
upgradeInProgress = false;
}
} else {
return spec;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,29 @@
import com.haulmont.cuba.security.app.Authentication;
import com.haulmont.fts.global.FtsConfig;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.lucene.index.IndexUpgrader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;

@Component(LuceneIndexMaintenance.NAME)
public class LuceneIndexMaintenanceBean implements LuceneIndexMaintenance{
public class LuceneIndexMaintenanceBean implements LuceneIndexMaintenance {

@Inject
protected FtsConfig ftsConfig;

@Inject
protected IndexWriterProvider indexWriterProvider;

@Inject
protected DirectoryProvider directoryProvider;

@Inject
protected Authentication authentication;

protected final ReentrantLock optimizeLock = new ReentrantLock();

private final Logger log = LoggerFactory.getLogger(LuceneIndexMaintenanceBean.class);

@Override
Expand All @@ -56,9 +55,14 @@ public String optimize() {
log.debug("Start optimize");
authentication.begin();
try {
IndexWriter indexWriter = indexWriterProvider.getIndexWriter();
indexWriter.forceMerge(1);
indexWriter.commit();
optimizeLock.lock();
try {
IndexWriter indexWriter = indexWriterProvider.getIndexWriter();
indexWriter.forceMerge(1);
indexWriter.commit();
} finally {
optimizeLock.unlock();
}
return "Done";
} catch (Throwable e) {
log.error("Error", e);
Expand All @@ -70,13 +74,33 @@ public String optimize() {

@Override
public String upgrade() {
IndexUpgrader upgrader = new IndexUpgrader(directoryProvider.getDirectory());
if (!AppContext.isStarted())
return "Application is not started";

log.debug("Start upgrade");
authentication.begin();
try {
upgrader.upgrade();
} catch (IOException e) {
IndexWriter indexWriter = indexWriterProvider.getIndexWriter();
MergePolicy mergePolicy = indexWriter.getConfig().getMergePolicy();
optimizeLock.lock();
try {
if (mergePolicy instanceof LiveUpgradeMergePolicy) {
((LiveUpgradeMergePolicy) mergePolicy).setUpgradeInProgress(true);
}
indexWriter.forceMerge(1);
indexWriter.commit();
} finally {
if (mergePolicy instanceof LiveUpgradeMergePolicy) {
((LiveUpgradeMergePolicy) mergePolicy).setUpgradeInProgress(false);
}
optimizeLock.unlock();
}
return "Done";
} catch (Throwable e) {
log.error("Error", e);
return ExceptionUtils.getStackTrace(e);
} finally {
authentication.end();
}
return "successful";
}
}

0 comments on commit 67c81f2

Please sign in to comment.