Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid side-effect in VersionMap when assertion enabled #29585

Merged
merged 8 commits into from
Apr 19, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/** Holds a deleted version, which just adds a timestamp to {@link VersionValue} so we know when we can expire the deletion. */

class DeleteVersionValue extends VersionValue {
final class DeleteVersionValue extends VersionValue {

private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(DeleteVersionValue.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

import java.util.Objects;

final class TranslogVersionValue extends VersionValue {
final class IndexVersionValue extends VersionValue {

private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(TranslogVersionValue.class);
private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexVersionValue.class);

private final Translog.Location translogLocation;

TranslogVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) {
IndexVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) {
super(version, seqNo, term);
this.translogLocation = translogLocation;
}
Expand All @@ -45,7 +45,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
TranslogVersionValue that = (TranslogVersionValue) o;
IndexVersionValue that = (IndexVersionValue) o;
return Objects.equals(translogLocation, that.translogLocation);
}

Expand All @@ -56,7 +56,7 @@ public int hashCode() {

@Override
public String toString() {
return "TranslogVersionValue{" +
return "IndexVersionValue{" +
"version=" + version +
", seqNo=" + seqNo +
", term=" + term +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ private VersionValue resolveDocVersion(final Operation op) throws IOException {
assert incrementIndexVersionLookup(); // used for asserting in tests
final long currentVersion = loadCurrentVersionFromIndex(op.uid());
if (currentVersion != Versions.NOT_FOUND) {
versionValue = new VersionValue(currentVersion, SequenceNumbers.UNASSIGNED_SEQ_NO, 0L);
versionValue = new IndexVersionValue(null, currentVersion, SequenceNumbers.UNASSIGNED_SEQ_NO, 0L);
}
} else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() &&
(engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue)versionValue).time) > getGcDeletesInMillis()) {
Expand Down Expand Up @@ -785,8 +785,9 @@ public IndexResult index(Index index) throws IOException {
indexResult.setTranslogLocation(location);
}
if (plan.indexIntoLucene && indexResult.hasFailure() == false) {
versionMap.maybePutUnderLock(index.uid().bytes(),
getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation()));
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
}
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
Expand Down Expand Up @@ -937,13 +938,6 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
}
}

private VersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) {
if (location != null && trackTranslogLocation.get()) {
return new TranslogVersionValue(location, version, seqNo, term);
}
return new VersionValue(version, seqNo, term);
}

/**
* returns true if the indexing operation may have already be processed by this engine.
* Note that it is OK to rarely return true even if this is not the case. However a `false`
Expand Down Expand Up @@ -1193,7 +1187,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
indexWriter.deleteDocuments(delete.uid());
numDocDeletes.inc();
}
versionMap.putUnderLock(delete.uid().bytes(),
versionMap.putDeleteUnderLock(delete.uid().bytes(),
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis()));
return new DeleteResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ VersionValue getUnderLock(final BytesRef uid) {
}

private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
assert keyedLock.isHeldByCurrentThread(uid);
assert assertKeyedLockHeldByCurrentThread(uid);
// First try to get the "live" value:
VersionValue value = currentMaps.current.get(uid);
if (value != null) {
Expand Down Expand Up @@ -306,44 +306,36 @@ boolean isSafeAccessRequired() {
/**
* Adds this uid/version to the pending adds map iff the map needs safe access.
*/
void maybePutUnderLock(BytesRef uid, VersionValue version) {
assert keyedLock.isHeldByCurrentThread(uid);
void maybePutIndexUnderLock(BytesRef uid, IndexVersionValue version) {
assert assertKeyedLockHeldByCurrentThread(uid);
Maps maps = this.maps;
if (maps.isSafeAccessMode()) {
putUnderLock(uid, version, maps);
putIndexUnderLock(uid, version);
} else {
maps.current.markAsUnsafe();
assert putAssertionMap(uid, version);
}
}

private boolean putAssertionMap(BytesRef uid, VersionValue version) {
putUnderLock(uid, version, unsafeKeysMap);
return true;
void putIndexUnderLock(BytesRef uid, IndexVersionValue version) {
assert assertKeyedLockHeldByCurrentThread(uid);
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
maps.put(uid, version);
removeTombstoneUnderLock(uid);
}

/**
* Adds this uid/version to the pending adds map.
*/
void putUnderLock(BytesRef uid, VersionValue version) {
Maps maps = this.maps;
putUnderLock(uid, version, maps);
private boolean putAssertionMap(BytesRef uid, IndexVersionValue version) {
assert assertKeyedLockHeldByCurrentThread(uid);
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
unsafeKeysMap.put(uid, version);
return true;
}

/**
* Adds this uid/version to the pending adds map.
*/
private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
assert keyedLock.isHeldByCurrentThread(uid);
void putDeleteUnderLock(BytesRef uid, DeleteVersionValue version) {
assert assertKeyedLockHeldByCurrentThread(uid);
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
if (version.isDelete() == false) {
maps.put(uid, version);
removeTombstoneUnderLock(uid);
} else {
DeleteVersionValue versionValue = (DeleteVersionValue) version;
putTombstone(uid, versionValue);
maps.remove(uid, versionValue);
}
putTombstone(uid, version);
maps.remove(uid, version);
}

private void putTombstone(BytesRef uid, DeleteVersionValue version) {
Expand All @@ -365,7 +357,7 @@ private void putTombstone(BytesRef uid, DeleteVersionValue version) {
* Removes this uid from the pending deletes map.
*/
void removeTombstoneUnderLock(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid);
assert assertKeyedLockHeldByCurrentThread(uid);
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
final VersionValue prev = tombstones.remove(uid);
if (prev != null) {
Expand Down Expand Up @@ -465,4 +457,9 @@ Map<BytesRef, DeleteVersionValue> getAllTombstones() {
Releasable acquireLock(BytesRef uid) {
return keyedLock.acquire(uid);
}

private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]";
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Collection;
import java.util.Collections;

class VersionValue implements Accountable {
abstract class VersionValue implements Accountable {

private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.util.RamUsageTester;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand All @@ -47,9 +48,8 @@ public void testRamBytesUsed() throws Exception {
for (int i = 0; i < 100000; ++i) {
BytesRefBuilder uid = new BytesRefBuilder();
uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20));
VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong());
try (Releasable r = map.acquireLock(uid.toBytesRef())) {
map.putUnderLock(uid.toBytesRef(), version);
map.putIndexUnderLock(uid.toBytesRef(), randomIndexVersionValue());
}
}
long actualRamBytesUsed = RamUsageTester.sizeOf(map);
Expand All @@ -64,9 +64,8 @@ public void testRamBytesUsed() throws Exception {
for (int i = 0; i < 100000; ++i) {
BytesRefBuilder uid = new BytesRefBuilder();
uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20));
VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong());
try (Releasable r = map.acquireLock(uid.toBytesRef())) {
map.putUnderLock(uid.toBytesRef(), version);
map.putIndexUnderLock(uid.toBytesRef(), randomIndexVersionValue());
}
}
actualRamBytesUsed = RamUsageTester.sizeOf(map);
Expand Down Expand Up @@ -100,14 +99,15 @@ private BytesRef uid(String string) {
public void testBasics() throws IOException {
LiveVersionMap map = new LiveVersionMap();
try (Releasable r = map.acquireLock(uid("test"))) {
map.putUnderLock(uid("test"), new VersionValue(1,1,1));
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
Translog.Location tlogLoc = randomTranslogLocation();
map.putIndexUnderLock(uid("test"), new IndexVersionValue(tlogLoc, 1, 1, 1));
assertEquals(new IndexVersionValue(tlogLoc, 1, 1, 1), map.getUnderLock(uid("test")));
map.beforeRefresh();
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
assertEquals(new IndexVersionValue(tlogLoc, 1, 1, 1), map.getUnderLock(uid("test")));
map.afterRefresh(randomBoolean());
assertNull(map.getUnderLock(uid("test")));

map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1));
map.putDeleteUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1));
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
map.beforeRefresh();
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
Expand Down Expand Up @@ -154,21 +154,24 @@ public void testConcurrently() throws IOException, InterruptedException {
BytesRef bytesRef = randomFrom(random(), keyList);
try (Releasable r = map.acquireLock(bytesRef)) {
VersionValue versionValue = values.computeIfAbsent(bytesRef,
v -> new VersionValue(randomLong(), maxSeqNo.incrementAndGet(), randomLong()));
v -> new IndexVersionValue(
randomTranslogLocation(), randomLong(), maxSeqNo.incrementAndGet(), randomLong()));
boolean isDelete = versionValue instanceof DeleteVersionValue;
if (isDelete) {
map.removeTombstoneUnderLock(bytesRef);
deletes.remove(bytesRef);
}
if (isDelete == false && rarely()) {
versionValue = new DeleteVersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(),
versionValue.term, clock.getAndIncrement());
versionValue = new DeleteVersionValue(versionValue.version + 1,
maxSeqNo.incrementAndGet(), versionValue.term, clock.getAndIncrement());
deletes.put(bytesRef, (DeleteVersionValue) versionValue);
map.putDeleteUnderLock(bytesRef, (DeleteVersionValue) versionValue);
} else {
versionValue = new VersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term);
versionValue = new IndexVersionValue(randomTranslogLocation(),
versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term);
map.putIndexUnderLock(bytesRef, (IndexVersionValue) versionValue);
}
values.put(bytesRef, versionValue);
map.putUnderLock(bytesRef, versionValue);
}
if (rarely()) {
final long pruneSeqNo = randomLongBetween(0, maxSeqNo.get());
Expand Down Expand Up @@ -268,7 +271,7 @@ public void testCarryOnSafeAccess() throws IOException {
}

try (Releasable r = map.acquireLock(uid(""))) {
map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong()));
map.maybePutIndexUnderLock(new BytesRef(""), randomIndexVersionValue());
}
assertFalse(map.isUnsafe());
assertEquals(1, map.getAllCurrent().size());
Expand All @@ -278,7 +281,7 @@ public void testCarryOnSafeAccess() throws IOException {
assertFalse(map.isUnsafe());
assertFalse(map.isSafeAccessRequired());
try (Releasable r = map.acquireLock(uid(""))) {
map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong()));
map.maybePutIndexUnderLock(new BytesRef(""), randomIndexVersionValue());
}
assertTrue(map.isUnsafe());
assertFalse(map.isSafeAccessRequired());
Expand All @@ -288,7 +291,7 @@ public void testCarryOnSafeAccess() throws IOException {
public void testRefreshTransition() throws IOException {
LiveVersionMap map = new LiveVersionMap();
try (Releasable r = map.acquireLock(uid("1"))) {
map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong()));
map.maybePutIndexUnderLock(uid("1"), randomIndexVersionValue());
assertTrue(map.isUnsafe());
assertNull(map.getUnderLock(uid("1")));
map.beforeRefresh();
Expand All @@ -299,7 +302,7 @@ public void testRefreshTransition() throws IOException {
assertFalse(map.isUnsafe());

map.enforceSafeAccess();
map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong()));
map.maybePutIndexUnderLock(uid("1"), randomIndexVersionValue());
assertFalse(map.isUnsafe());
assertNotNull(map.getUnderLock(uid("1")));
map.beforeRefresh();
Expand All @@ -320,9 +323,10 @@ public void testAddAndDeleteRefreshConcurrently() throws IOException, Interrupte
AtomicLong version = new AtomicLong();
CountDownLatch start = new CountDownLatch(2);
BytesRef uid = uid("1");
VersionValue initialVersion = new VersionValue(version.incrementAndGet(), 1, 1);
VersionValue initialVersion;
try (Releasable ignore = map.acquireLock(uid)) {
map.putUnderLock(uid, initialVersion);
initialVersion = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1);
map.putIndexUnderLock(uid, (IndexVersionValue) initialVersion);
}
Thread t = new Thread(() -> {
start.countDown();
Expand All @@ -337,14 +341,13 @@ public void testAddAndDeleteRefreshConcurrently() throws IOException, Interrupte
} else {
underLock = nextVersionValue;
}
if (underLock.isDelete()) {
nextVersionValue = new VersionValue(version.incrementAndGet(), 1, 1);
} else if (randomBoolean()) {
nextVersionValue = new VersionValue(version.incrementAndGet(), 1, 1);
if (underLock.isDelete() || randomBoolean()) {
nextVersionValue = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1);
map.putIndexUnderLock(uid, (IndexVersionValue) nextVersionValue);
} else {
nextVersionValue = new DeleteVersionValue(version.incrementAndGet(), 1, 1, 0);
map.putDeleteUnderLock(uid, (DeleteVersionValue) nextVersionValue);
}
map.putUnderLock(uid, nextVersionValue);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -375,7 +378,7 @@ public void testPruneTombstonesWhileLocked() throws InterruptedException, IOExce
BytesRef uid = uid("1");
;
try (Releasable ignore = map.acquireLock(uid)) {
map.putUnderLock(uid, new DeleteVersionValue(0, 0, 0, 0));
map.putDeleteUnderLock(uid, new DeleteVersionValue(0, 0, 0, 0));
map.beforeRefresh(); // refresh otherwise we won't prune since it's tracked by the current map
map.afterRefresh(false);
Thread thread = new Thread(() -> {
Expand All @@ -392,4 +395,16 @@ public void testPruneTombstonesWhileLocked() throws InterruptedException, IOExce
thread.join();
assertEquals(0, map.getAllTombstones().size());
}

IndexVersionValue randomIndexVersionValue() {
return new IndexVersionValue(randomTranslogLocation(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
}

Translog.Location randomTranslogLocation() {
if (randomBoolean()) {
return null;
} else {
return new Translog.Location(randomNonNegativeLong(), randomNonNegativeLong(), randomInt());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.util.RamUsageTester;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;

public class VersionValueTests extends ESTestCase {

public void testRamBytesUsed() {
VersionValue versionValue = new VersionValue(randomLong(), randomLong(), randomLong());
public void testIndexRamBytesUsed() {
Translog.Location translogLoc = null;
if (randomBoolean()) {
translogLoc = new Translog.Location(randomNonNegativeLong(), randomNonNegativeLong(), randomInt());
}
IndexVersionValue versionValue = new IndexVersionValue(translogLoc, randomLong(), randomLong(), randomLong());
assertEquals(RamUsageTester.sizeOf(versionValue), versionValue.ramBytesUsed());
}

Expand Down