diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/VersionBucket.java index c0c28267ce87..8709b9cca6b0 100644 --- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java +++ b/solr/core/src/java/org/apache/solr/update/VersionBucket.java @@ -16,12 +16,27 @@ */ package org.apache.solr.update; +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + // TODO: make inner? // TODO: store the highest possible in the index on a commit (but how to not block adds?) // TODO: could also store highest possible in the transaction log after a commit. // Or on a new index, just scan "version" for the max? /** @lucene.internal */ public class VersionBucket { + private int versionLockInMill; + + public VersionBucket(int versionLockInMill) { + this.versionLockInMill = versionLockInMill; + } + + private final Lock lock = new ReentrantLock(true); + private final Condition condition = lock.newCondition(); + public long highest; public void updateHighest(long val) { @@ -29,4 +44,44 @@ public void updateHighest(long val) { highest = Math.max(highest, Math.abs(val)); } } + + public int getVersionLockInMill() { + return versionLockInMill; + } + + @VisibleForTesting + public void setVersionLockInMill(int versionLockInMill) { + this.versionLockInMill = versionLockInMill; + } + + public boolean tryLock() { + if (versionLockInMill > 0) { + try { + return lock.tryLock(versionLockInMill, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } else { + lock.lock(); + return true; + } + } + + public void unlock() { + lock.unlock(); + } + + public void wakeUpAll() { + condition.signalAll(); + } + + public void awaitNanos(long nanosTimeout) { + try { + condition.awaitNanos(nanosTimeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } } diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java index 7697be4faf54..e94aaaecb2c0 100644 --- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java +++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java @@ -45,8 +45,13 @@ import static org.apache.solr.common.params.CommonParams.VERSION_FIELD; public class VersionInfo { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String SYS_PROP_VERSION_LOCK_TIMEOUT_IN_MILL = "versionLockTimeoutInMill"; + + /** + * same as default client read timeout: 10 mins + */ + private static final int DEFAULT_VERSION_LOCK_TIMEOUT_IN_MILL = 19 * 60 * 1000; private final UpdateLog ulog; private final VersionBucket[] buckets; @@ -54,6 +59,8 @@ public class VersionInfo { private SchemaField idField; final ReadWriteLock lock = new ReentrantReadWriteLock(true); + private int versionLockInMill; + /** * Gets and returns the {@link org.apache.solr.common.params.CommonParams#VERSION_FIELD} from the specified * schema, after verifying that it is indexed, stored, and single-valued. @@ -94,9 +101,11 @@ public VersionInfo(UpdateLog ulog, int nBuckets) { IndexSchema schema = ulog.uhandler.core.getLatestSchema(); versionField = getAndCheckVersionField(schema); idField = schema.getUniqueKeyField(); + versionLockInMill = ulog.uhandler.core.getSolrConfig().getInt("updateHandler/versionLock/timeoutInMill", + Integer.parseInt(System.getProperty(SYS_PROP_VERSION_LOCK_TIMEOUT_IN_MILL, "" + DEFAULT_VERSION_LOCK_TIMEOUT_IN_MILL))); buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ]; for (int i=0; i 0) ) { - // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd - // specified it must exist (versionOnUpdate==1) and it does. - } else { - throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion); - } + if (versionOnUpdate != 0) { + Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); + long foundVersion = lastVersion == null ? -1 : lastVersion; + if ( versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0) || (versionOnUpdate==1 && foundVersion > 0) ) { + // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd + // specified it must exist (versionOnUpdate==1) and it does. + } else { + throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion); } + } - long version = vinfo.getNewClock(); - cmd.setVersion(version); - cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version); - bucket.updateHighest(version); - } else { - // The leader forwarded us this update. - cmd.setVersion(versionOnUpdate); + long version = vinfo.getNewClock(); + cmd.setVersion(version); + cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version); + bucket.updateHighest(version); + } else { + // The leader forwarded us this update. + cmd.setVersion(versionOnUpdate); - if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) { - // we're not in an active state, and this update isn't from a replay, so buffer it. - cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); - ulog.add(cmd); - return true; - } + if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) { + // we're not in an active state, and this update isn't from a replay, so buffer it. + cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); + ulog.add(cmd); + return true; + } - if (cmd.isInPlaceUpdate()) { - long prev = cmd.prevVersion; - Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); - if (lastVersion == null || Math.abs(lastVersion) < prev) { - // this was checked for (in waitForDependentUpdates()) before entering the synchronized block. - // So we shouldn't be here, unless what must've happened is: - // by the time synchronization block was entered, the prev update was deleted by DBQ. Since - // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version - // from the deleted list (which might be older than the prev update!) - UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate); - - if (fetchedFromLeader instanceof DeleteUpdateCommand) { - log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document" - + " was deleted at the leader subsequently.", idBytes.utf8ToString()); - versionDelete((DeleteUpdateCommand)fetchedFromLeader); - return true; - } else { - assert fetchedFromLeader instanceof AddUpdateCommand; - // Newer document was fetched from the leader. Apply that document instead of this current in-place update. - log.info("In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}", - idBytes.utf8ToString(), fetchedFromLeader); - - // Make this update to become a non-inplace update containing the full document obtained from the leader - cmd.solrDoc = ((AddUpdateCommand)fetchedFromLeader).solrDoc; - cmd.prevVersion = -1; - cmd.setVersion((long)cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD)); - assert cmd.isInPlaceUpdate() == false; - } + if (cmd.isInPlaceUpdate()) { + long prev = cmd.prevVersion; + Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); + if (lastVersion == null || Math.abs(lastVersion) < prev) { + // this was checked for (in waitForDependentUpdates()) before entering the synchronized block. + // So we shouldn't be here, unless what must've happened is: + // by the time synchronization block was entered, the prev update was deleted by DBQ. Since + // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version + // from the deleted list (which might be older than the prev update!) + UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate); + + if (fetchedFromLeader instanceof DeleteUpdateCommand) { + log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document" + + " was deleted at the leader subsequently.", idBytes.utf8ToString()); + versionDelete((DeleteUpdateCommand)fetchedFromLeader); + return true; } else { - if (lastVersion != null && Math.abs(lastVersion) > prev) { - // this means we got a newer full doc update and in that case it makes no sense to apply the older - // inplace update. Drop this update - log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion - + ". Dropping current update."); - return true; - } else { - // We're good, we should apply this update. First, update the bucket's highest. - if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { - bucket.updateHighest(versionOnUpdate); - } - } + assert fetchedFromLeader instanceof AddUpdateCommand; + // Newer document was fetched from the leader. Apply that document instead of this current in-place update. + log.info("In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}", + idBytes.utf8ToString(), fetchedFromLeader); + + // Make this update to become a non-inplace update containing the full document obtained from the leader + cmd.solrDoc = ((AddUpdateCommand)fetchedFromLeader).solrDoc; + cmd.prevVersion = -1; + cmd.setVersion((long)cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD)); + assert cmd.isInPlaceUpdate() == false; } } else { - // if we aren't the leader, then we need to check that updates were not re-ordered - if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { - // we're OK... this update has a version higher than anything we've seen - // in this bucket so far, so we know that no reordering has yet occurred. - bucket.updateHighest(versionOnUpdate); + if (lastVersion != null && Math.abs(lastVersion) > prev) { + // this means we got a newer full doc update and in that case it makes no sense to apply the older + // inplace update. Drop this update + log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion + + ". Dropping current update."); + return true; } else { - // there have been updates higher than the current update. we need to check - // the specific version for this id. - Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); - if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { - // This update is a repeat, or was reordered. We need to drop this update. - log.debug("Dropping add update due to version {}", idBytes.utf8ToString()); - return true; + // We're good, we should apply this update. First, update the bucket's highest. + if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { + bucket.updateHighest(versionOnUpdate); } } } - if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { - cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); + } else { + // if we aren't the leader, then we need to check that updates were not re-ordered + if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { + // we're OK... this update has a version higher than anything we've seen + // in this bucket so far, so we know that no reordering has yet occurred. + bucket.updateHighest(versionOnUpdate); + } else { + // there have been updates higher than the current update. we need to check + // the specific version for this id. + Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); + if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { + // This update is a repeat, or was reordered. We need to drop this update. + log.debug("Dropping add update due to version {}", idBytes.utf8ToString()); + return true; + } } } + if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); + } } - - boolean willDistrib = isLeader && nodes != null && nodes.size() > 0; - - SolrInputDocument clonedDoc = null; - if (willDistrib && cloneRequiredOnLeader) { - clonedDoc = cmd.solrDoc.deepCopy(); - } - - // TODO: possibly set checkDeleteByQueries as a flag on the command? - doLocalAdd(cmd); - - if (willDistrib && cloneRequiredOnLeader) { - cmd.solrDoc = clonedDoc; - } + } + + boolean willDistrib = isLeader && nodes != null && nodes.size() > 0; + + SolrInputDocument clonedDoc = null; + if (willDistrib && cloneRequiredOnLeader) { + clonedDoc = cmd.solrDoc.deepCopy(); + } - } // end synchronized (bucket) - } finally { + // TODO: possibly set checkDeleteByQueries as a flag on the command? + doLocalAdd(cmd); + + if (willDistrib && cloneRequiredOnLeader) { + cmd.solrDoc = clonedDoc; + } + } + finally { + if(vBucketLocked) { + bucket.unlock(); + } vinfo.unlockForUpdate(); } return false; } + /** + * @return true if able to get lock of VersionBucket, else throws SolrException + */ + @VisibleForTesting + protected boolean tryLockElseThrow(VersionBucket bucket) { + boolean vBucketLocked = bucket.tryLock(); + if (!vBucketLocked) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Unable to get version bucket lock in " + bucket.getVersionLockInMill() + " mill seconds"); + } + return vBucketLocked; + } + @VisibleForTesting boolean shouldBufferUpdate(AddUpdateCommand cmd, boolean isReplayOrPeersync, UpdateLog.State state) { if (state == UpdateLog.State.APPLYING_BUFFERED @@ -1195,8 +1212,9 @@ private long waitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate, TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME); vinfo.lockForUpdate(); + boolean vBucketLocked = false; try { - synchronized (bucket) { + vBucketLocked = tryLockElseThrow(bucket); Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId()); lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion; @@ -1206,19 +1224,14 @@ private long waitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate, } while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) { - try { - long timeLeft = waitTimeout.timeLeft(TimeUnit.MILLISECONDS); - if (timeLeft > 0) { // wait(0) waits forever until notified, but we don't want that. - bucket.wait(timeLeft); - } - } catch (InterruptedException ie) { - throw new RuntimeException(ie); - } + bucket.awaitNanos(waitTimeout.timeLeft(TimeUnit.NANOSECONDS)); lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId()); lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion; } - } } finally { + if (vBucketLocked) { + bucket.unlock(); + } vinfo.unlockForUpdate(); } @@ -1778,82 +1791,83 @@ protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException { VersionBucket bucket = vinfo.bucket(bucketHash); vinfo.lockForUpdate(); + boolean vBucketLocked = false; try { + vBucketLocked = tryLockElseThrow(bucket); + if (versionsStored) { + long bucketVersion = bucket.highest; - synchronized (bucket) { - if (versionsStored) { - long bucketVersion = bucket.highest; + if (leaderLogic) { - if (leaderLogic) { + if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) { + // forwarded from a collection but we are not buffering so strip original version and apply our own + // see SOLR-5308 + log.info("Removing version field from doc: " + cmd.getId()); + versionOnUpdate = signedVersionOnUpdate = 0; + } - if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) { - // forwarded from a collection but we are not buffering so strip original version and apply our own - // see SOLR-5308 - log.info("Removing version field from doc: " + cmd.getId()); - versionOnUpdate = signedVersionOnUpdate = 0; - } + // leaders can also be in buffering state during "migrate" API call, see SOLR-5308 + if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE + && !isReplayOrPeersync) { + // we're not in an active state, and this update isn't from a replay, so buffer it. + log.info("Leader logic applied but update log is buffering: " + cmd.getId()); + cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); + ulog.delete(cmd); + return true; + } - // leaders can also be in buffering state during "migrate" API call, see SOLR-5308 - if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE - && !isReplayOrPeersync) { - // we're not in an active state, and this update isn't from a replay, so buffer it. - log.info("Leader logic applied but update log is buffering: " + cmd.getId()); - cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); - ulog.delete(cmd); - return true; + if (signedVersionOnUpdate != 0) { + Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); + long foundVersion = lastVersion == null ? -1 : lastVersion; + if ( (signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0) || (signedVersionOnUpdate == 1 && foundVersion > 0) ) { + // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd + // specified it must exist (versionOnUpdate==1) and it does. + } else { + throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate + " actual=" + foundVersion); } + } - if (signedVersionOnUpdate != 0) { - Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); - long foundVersion = lastVersion == null ? -1 : lastVersion; - if ( (signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0) || (signedVersionOnUpdate == 1 && foundVersion > 0) ) { - // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd - // specified it must exist (versionOnUpdate==1) and it does. - } else { - throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate + " actual=" + foundVersion); - } - } + long version = vinfo.getNewClock(); + cmd.setVersion(-version); + bucket.updateHighest(version); + } else { + cmd.setVersion(-versionOnUpdate); - long version = vinfo.getNewClock(); - cmd.setVersion(-version); - bucket.updateHighest(version); - } else { - cmd.setVersion(-versionOnUpdate); + if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) { + // we're not in an active state, and this update isn't from a replay, so buffer it. + cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); + ulog.delete(cmd); + return true; + } - if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) { - // we're not in an active state, and this update isn't from a replay, so buffer it. - cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); - ulog.delete(cmd); + // if we aren't the leader, then we need to check that updates were not re-ordered + if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { + // we're OK... this update has a version higher than anything we've seen + // in this bucket so far, so we know that no reordering has yet occured. + bucket.updateHighest(versionOnUpdate); + } else { + // there have been updates higher than the current update. we need to check + // the specific version for this id. + Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); + if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { + // This update is a repeat, or was reordered. We need to drop this update. + log.debug("Dropping delete update due to version {}", idBytes.utf8ToString()); return true; } + } - // if we aren't the leader, then we need to check that updates were not re-ordered - if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { - // we're OK... this update has a version higher than anything we've seen - // in this bucket so far, so we know that no reordering has yet occured. - bucket.updateHighest(versionOnUpdate); - } else { - // there have been updates higher than the current update. we need to check - // the specific version for this id. - Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); - if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { - // This update is a repeat, or was reordered. We need to drop this update. - log.debug("Dropping delete update due to version {}", idBytes.utf8ToString()); - return true; - } - } - - if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { - cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); - } + if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); } } + } - doLocalDelete(cmd); - return false; - } // end synchronized (bucket) - + doLocalDelete(cmd); + return false; } finally { + if(vBucketLocked) { + bucket.unlock(); + } vinfo.unlockForUpdate(); } } diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java index 8f56d689fe5c..bc6a3fff7e84 100644 --- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java @@ -17,20 +17,42 @@ package org.apache.solr.update.processor; +import static org.hamcrest.CoreMatchers.is; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Function; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.UpdateLog; +import org.apache.solr.update.VersionBucket; import org.junit.BeforeClass; import org.junit.Test; public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 { + private static ExecutorService executor; + @BeforeClass public static void beforeClass() throws Exception { - initCore("solr/collection1/conf/solrconfig.xml","solr/collection1/conf/schema-minimal.xml"); + executor = Executors.newCachedThreadPool(); + initCore("solr/collection1/conf/solrconfig.xml","solr/collection1/conf/schema-minimal-with-another-uniqkey.xml"); + } + + public static void AfterClass() { + executor.shutdown(); } @Test @@ -40,6 +62,9 @@ public void testShouldBufferUpdate() { req, null, null, null); AddUpdateCommand cmd = new AddUpdateCommand(req); + cmd.solrDoc = new SolrInputDocument(); + cmd.solrDoc.setField("notid", "10"); + // applying buffer updates, isReplayOrPeerSync flag doesn't matter assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED)); assertFalse(processor.shouldBufferUpdate(cmd, true, UpdateLog.State.APPLYING_BUFFERED)); @@ -50,5 +75,98 @@ public void testShouldBufferUpdate() { assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED)); } + @Test + public void testVersionAdd() throws IOException { + SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams()); + int threads = 5; + Function versionAddFunc = (DistributedUpdateProcessor process) -> { + try { + AddUpdateCommand cmd = new AddUpdateCommand(req); + cmd.solrDoc = new SolrInputDocument(); + cmd.solrDoc.setField("notid", "10"); + return process.versionAdd(cmd); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + int failed = runCommands(threads, 1000, req, versionAddFunc); + // only one should succeed + assertThat(failed, is(threads - 1)); + + failed = runCommands(threads, -1, req, versionAddFunc); + // all should succeed + assertThat(failed, is(0)); + } + + @Test + public void testVersionDelete() throws IOException { + SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams()); + + int threads = 5; + Function versionDeleteFunc = (DistributedUpdateProcessor process) -> { + try { + DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); + cmd.id = "1"; + return process.versionDelete(cmd); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + int failed = runCommands(threads, 1000, req, versionDeleteFunc); + // only one should succeed + assertThat(failed, is(threads - 1)); + + failed = runCommands(threads, -1, req, versionDeleteFunc); + // all should succeed + assertThat(failed, is(0)); + } + + public int runCommands(int threads, int versionLockInMill, SolrQueryRequest req, + Function function) + throws IOException { + try (DistributedUpdateProcessor processor = new DistributedUpdateProcessor( + req, null, null, null) { + @Override + protected boolean tryLockElseThrow(VersionBucket bucket) { + bucket.setVersionLockInMill(versionLockInMill); + + boolean locked = super.tryLockElseThrow(bucket); + if (locked) { + try { + // simulate the case: it takes 5 seconds to add the doc + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Unable to get version bucket lock in " + bucket.getVersionLockInMill() + " mill seconds"); + } + return locked; + } + }) { + CountDownLatch latch = new CountDownLatch(1); + Collection> futures = new ArrayList<>(); + for (int t = 0; t < threads; ++t) { + futures.add(executor.submit(() -> { + latch.await(); + return function.apply(processor); + })); + } + latch.countDown(); + + int failed = 0; + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + failed++; + } + } + return failed; + } + } }