diff --git a/server/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java index d2b2e24c616a8..9f094197b8d9c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java +++ b/server/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java @@ -23,7 +23,7 @@ /** Holds a deleted version, which just adds a timestamp to {@link VersionValue} so we know when we can expire the deletion. */ -class DeleteVersionValue extends VersionValue { +final class DeleteVersionValue extends VersionValue { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(DeleteVersionValue.class); diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/IndexVersionValue.java similarity index 86% rename from server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java rename to server/src/main/java/org/elasticsearch/index/engine/IndexVersionValue.java index 67415ea6139a6..4f67372926712 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java +++ b/server/src/main/java/org/elasticsearch/index/engine/IndexVersionValue.java @@ -24,13 +24,13 @@ import java.util.Objects; -final class TranslogVersionValue extends VersionValue { +final class IndexVersionValue extends VersionValue { - private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(TranslogVersionValue.class); + private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexVersionValue.class); private final Translog.Location translogLocation; - TranslogVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) { + IndexVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) { super(version, seqNo, term); this.translogLocation = translogLocation; } @@ -45,7 +45,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; - TranslogVersionValue that = (TranslogVersionValue) o; + IndexVersionValue that = (IndexVersionValue) o; return Objects.equals(translogLocation, that.translogLocation); } @@ -56,7 +56,7 @@ public int hashCode() { @Override public String toString() { - return "TranslogVersionValue{" + + return "IndexVersionValue{" + "version=" + version + ", seqNo=" + seqNo + ", term=" + term + diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index b28a5cd59e25b..ab13639ce2fd2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -623,7 +623,7 @@ private VersionValue resolveDocVersion(final Operation op) throws IOException { assert incrementIndexVersionLookup(); // used for asserting in tests final long currentVersion = loadCurrentVersionFromIndex(op.uid()); if (currentVersion != Versions.NOT_FOUND) { - versionValue = new VersionValue(currentVersion, SequenceNumbers.UNASSIGNED_SEQ_NO, 0L); + versionValue = new IndexVersionValue(null, currentVersion, SequenceNumbers.UNASSIGNED_SEQ_NO, 0L); } } else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() && (engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue)versionValue).time) > getGcDeletesInMillis()) { @@ -785,8 +785,9 @@ public IndexResult index(Index index) throws IOException { indexResult.setTranslogLocation(location); } if (plan.indexIntoLucene && indexResult.hasFailure() == false) { - versionMap.maybePutUnderLock(index.uid().bytes(), - getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation())); + final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null; + versionMap.maybePutIndexUnderLock(index.uid().bytes(), + new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); @@ -937,13 +938,6 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) } } - private VersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) { - if (location != null && trackTranslogLocation.get()) { - return new TranslogVersionValue(location, version, seqNo, term); - } - return new VersionValue(version, seqNo, term); - } - /** * returns true if the indexing operation may have already be processed by this engine. * Note that it is OK to rarely return true even if this is not the case. However a `false` @@ -1193,7 +1187,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) indexWriter.deleteDocuments(delete.uid()); numDocDeletes.inc(); } - versionMap.putUnderLock(delete.uid().bytes(), + versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), engineConfig.getThreadPool().relativeTimeInMillis())); return new DeleteResult( diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 7c5dcfa5c9050..6d9dc4a38974c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -268,7 +268,7 @@ VersionValue getUnderLock(final BytesRef uid) { } private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) { - assert keyedLock.isHeldByCurrentThread(uid); + assert assertKeyedLockHeldByCurrentThread(uid); // First try to get the "live" value: VersionValue value = currentMaps.current.get(uid); if (value != null) { @@ -306,44 +306,36 @@ boolean isSafeAccessRequired() { /** * Adds this uid/version to the pending adds map iff the map needs safe access. */ - void maybePutUnderLock(BytesRef uid, VersionValue version) { - assert keyedLock.isHeldByCurrentThread(uid); + void maybePutIndexUnderLock(BytesRef uid, IndexVersionValue version) { + assert assertKeyedLockHeldByCurrentThread(uid); Maps maps = this.maps; if (maps.isSafeAccessMode()) { - putUnderLock(uid, version, maps); + putIndexUnderLock(uid, version); } else { maps.current.markAsUnsafe(); assert putAssertionMap(uid, version); } } - private boolean putAssertionMap(BytesRef uid, VersionValue version) { - putUnderLock(uid, version, unsafeKeysMap); - return true; + void putIndexUnderLock(BytesRef uid, IndexVersionValue version) { + assert assertKeyedLockHeldByCurrentThread(uid); + assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; + maps.put(uid, version); + removeTombstoneUnderLock(uid); } - /** - * Adds this uid/version to the pending adds map. - */ - void putUnderLock(BytesRef uid, VersionValue version) { - Maps maps = this.maps; - putUnderLock(uid, version, maps); + private boolean putAssertionMap(BytesRef uid, IndexVersionValue version) { + assert assertKeyedLockHeldByCurrentThread(uid); + assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; + unsafeKeysMap.put(uid, version); + return true; } - /** - * Adds this uid/version to the pending adds map. - */ - private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { - assert keyedLock.isHeldByCurrentThread(uid); + void putDeleteUnderLock(BytesRef uid, DeleteVersionValue version) { + assert assertKeyedLockHeldByCurrentThread(uid); assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; - if (version.isDelete() == false) { - maps.put(uid, version); - removeTombstoneUnderLock(uid); - } else { - DeleteVersionValue versionValue = (DeleteVersionValue) version; - putTombstone(uid, versionValue); - maps.remove(uid, versionValue); - } + putTombstone(uid, version); + maps.remove(uid, version); } private void putTombstone(BytesRef uid, DeleteVersionValue version) { @@ -365,7 +357,7 @@ private void putTombstone(BytesRef uid, DeleteVersionValue version) { * Removes this uid from the pending deletes map. */ void removeTombstoneUnderLock(BytesRef uid) { - assert keyedLock.isHeldByCurrentThread(uid); + assert assertKeyedLockHeldByCurrentThread(uid); long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; final VersionValue prev = tombstones.remove(uid); if (prev != null) { @@ -465,4 +457,9 @@ Map getAllTombstones() { Releasable acquireLock(BytesRef uid) { return keyedLock.acquire(uid); } + + private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) { + assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]"; + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java index d63306486732e..567a7964186ad 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -27,7 +27,7 @@ import java.util.Collection; import java.util.Collections; -class VersionValue implements Accountable { +abstract class VersionValue implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java index ce3ddff00dade..e0efcf9f0f73f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.RamUsageTester; import org.apache.lucene.util.TestUtil; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -47,9 +48,8 @@ public void testRamBytesUsed() throws Exception { for (int i = 0; i < 100000; ++i) { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); - VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); try (Releasable r = map.acquireLock(uid.toBytesRef())) { - map.putUnderLock(uid.toBytesRef(), version); + map.putIndexUnderLock(uid.toBytesRef(), randomIndexVersionValue()); } } long actualRamBytesUsed = RamUsageTester.sizeOf(map); @@ -64,9 +64,8 @@ public void testRamBytesUsed() throws Exception { for (int i = 0; i < 100000; ++i) { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); - VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); try (Releasable r = map.acquireLock(uid.toBytesRef())) { - map.putUnderLock(uid.toBytesRef(), version); + map.putIndexUnderLock(uid.toBytesRef(), randomIndexVersionValue()); } } actualRamBytesUsed = RamUsageTester.sizeOf(map); @@ -100,14 +99,15 @@ private BytesRef uid(String string) { public void testBasics() throws IOException { LiveVersionMap map = new LiveVersionMap(); try (Releasable r = map.acquireLock(uid("test"))) { - map.putUnderLock(uid("test"), new VersionValue(1,1,1)); - assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + Translog.Location tlogLoc = randomTranslogLocation(); + map.putIndexUnderLock(uid("test"), new IndexVersionValue(tlogLoc, 1, 1, 1)); + assertEquals(new IndexVersionValue(tlogLoc, 1, 1, 1), map.getUnderLock(uid("test"))); map.beforeRefresh(); - assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + assertEquals(new IndexVersionValue(tlogLoc, 1, 1, 1), map.getUnderLock(uid("test"))); map.afterRefresh(randomBoolean()); assertNull(map.getUnderLock(uid("test"))); - map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1)); + map.putDeleteUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1)); assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test"))); map.beforeRefresh(); assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test"))); @@ -154,21 +154,24 @@ public void testConcurrently() throws IOException, InterruptedException { BytesRef bytesRef = randomFrom(random(), keyList); try (Releasable r = map.acquireLock(bytesRef)) { VersionValue versionValue = values.computeIfAbsent(bytesRef, - v -> new VersionValue(randomLong(), maxSeqNo.incrementAndGet(), randomLong())); + v -> new IndexVersionValue( + randomTranslogLocation(), randomLong(), maxSeqNo.incrementAndGet(), randomLong())); boolean isDelete = versionValue instanceof DeleteVersionValue; if (isDelete) { map.removeTombstoneUnderLock(bytesRef); deletes.remove(bytesRef); } if (isDelete == false && rarely()) { - versionValue = new DeleteVersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), - versionValue.term, clock.getAndIncrement()); + versionValue = new DeleteVersionValue(versionValue.version + 1, + maxSeqNo.incrementAndGet(), versionValue.term, clock.getAndIncrement()); deletes.put(bytesRef, (DeleteVersionValue) versionValue); + map.putDeleteUnderLock(bytesRef, (DeleteVersionValue) versionValue); } else { - versionValue = new VersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term); + versionValue = new IndexVersionValue(randomTranslogLocation(), + versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term); + map.putIndexUnderLock(bytesRef, (IndexVersionValue) versionValue); } values.put(bytesRef, versionValue); - map.putUnderLock(bytesRef, versionValue); } if (rarely()) { final long pruneSeqNo = randomLongBetween(0, maxSeqNo.get()); @@ -268,7 +271,7 @@ public void testCarryOnSafeAccess() throws IOException { } try (Releasable r = map.acquireLock(uid(""))) { - map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong())); + map.maybePutIndexUnderLock(new BytesRef(""), randomIndexVersionValue()); } assertFalse(map.isUnsafe()); assertEquals(1, map.getAllCurrent().size()); @@ -278,7 +281,7 @@ public void testCarryOnSafeAccess() throws IOException { assertFalse(map.isUnsafe()); assertFalse(map.isSafeAccessRequired()); try (Releasable r = map.acquireLock(uid(""))) { - map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong())); + map.maybePutIndexUnderLock(new BytesRef(""), randomIndexVersionValue()); } assertTrue(map.isUnsafe()); assertFalse(map.isSafeAccessRequired()); @@ -288,7 +291,7 @@ public void testCarryOnSafeAccess() throws IOException { public void testRefreshTransition() throws IOException { LiveVersionMap map = new LiveVersionMap(); try (Releasable r = map.acquireLock(uid("1"))) { - map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong())); + map.maybePutIndexUnderLock(uid("1"), randomIndexVersionValue()); assertTrue(map.isUnsafe()); assertNull(map.getUnderLock(uid("1"))); map.beforeRefresh(); @@ -299,7 +302,7 @@ public void testRefreshTransition() throws IOException { assertFalse(map.isUnsafe()); map.enforceSafeAccess(); - map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong())); + map.maybePutIndexUnderLock(uid("1"), randomIndexVersionValue()); assertFalse(map.isUnsafe()); assertNotNull(map.getUnderLock(uid("1"))); map.beforeRefresh(); @@ -320,9 +323,10 @@ public void testAddAndDeleteRefreshConcurrently() throws IOException, Interrupte AtomicLong version = new AtomicLong(); CountDownLatch start = new CountDownLatch(2); BytesRef uid = uid("1"); - VersionValue initialVersion = new VersionValue(version.incrementAndGet(), 1, 1); + VersionValue initialVersion; try (Releasable ignore = map.acquireLock(uid)) { - map.putUnderLock(uid, initialVersion); + initialVersion = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1); + map.putIndexUnderLock(uid, (IndexVersionValue) initialVersion); } Thread t = new Thread(() -> { start.countDown(); @@ -337,14 +341,13 @@ public void testAddAndDeleteRefreshConcurrently() throws IOException, Interrupte } else { underLock = nextVersionValue; } - if (underLock.isDelete()) { - nextVersionValue = new VersionValue(version.incrementAndGet(), 1, 1); - } else if (randomBoolean()) { - nextVersionValue = new VersionValue(version.incrementAndGet(), 1, 1); + if (underLock.isDelete() || randomBoolean()) { + nextVersionValue = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1); + map.putIndexUnderLock(uid, (IndexVersionValue) nextVersionValue); } else { nextVersionValue = new DeleteVersionValue(version.incrementAndGet(), 1, 1, 0); + map.putDeleteUnderLock(uid, (DeleteVersionValue) nextVersionValue); } - map.putUnderLock(uid, nextVersionValue); } } } catch (Exception e) { @@ -375,7 +378,7 @@ public void testPruneTombstonesWhileLocked() throws InterruptedException, IOExce BytesRef uid = uid("1"); ; try (Releasable ignore = map.acquireLock(uid)) { - map.putUnderLock(uid, new DeleteVersionValue(0, 0, 0, 0)); + map.putDeleteUnderLock(uid, new DeleteVersionValue(0, 0, 0, 0)); map.beforeRefresh(); // refresh otherwise we won't prune since it's tracked by the current map map.afterRefresh(false); Thread thread = new Thread(() -> { @@ -392,4 +395,16 @@ public void testPruneTombstonesWhileLocked() throws InterruptedException, IOExce thread.join(); assertEquals(0, map.getAllTombstones().size()); } + + IndexVersionValue randomIndexVersionValue() { + return new IndexVersionValue(randomTranslogLocation(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + } + + Translog.Location randomTranslogLocation() { + if (randomBoolean()) { + return null; + } else { + return new Translog.Location(randomNonNegativeLong(), randomNonNegativeLong(), randomInt()); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java b/server/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java index 3b953edece1b4..242a568295dd6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java @@ -20,12 +20,17 @@ package org.elasticsearch.index.engine; import org.apache.lucene.util.RamUsageTester; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; public class VersionValueTests extends ESTestCase { - public void testRamBytesUsed() { - VersionValue versionValue = new VersionValue(randomLong(), randomLong(), randomLong()); + public void testIndexRamBytesUsed() { + Translog.Location translogLoc = null; + if (randomBoolean()) { + translogLoc = new Translog.Location(randomNonNegativeLong(), randomNonNegativeLong(), randomInt()); + } + IndexVersionValue versionValue = new IndexVersionValue(translogLoc, randomLong(), randomLong(), randomLong()); assertEquals(RamUsageTester.sizeOf(versionValue), versionValue.ramBytesUsed()); }