Skip to content

Commit

Permalink
Merge main into develop (#4600)
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed May 26, 2023
2 parents a8daad4 + 3852aec commit 2e9ef2a
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ protected void cleanSnapshots() throws InterruptedException {
}

// stale statement
this.statements.remove(st, i);
this.statements.optimisticRemove(st, i);
prioritiseCleaning = prioritiseSnapshotCleaningIfLowOnMemory(prioritiseCleaning);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ public void addObjectStatement(MemStatement st) throws InterruptedException {
objectStatements.add(st);
}

@Override
public void removeObjectStatement(MemStatement st) throws InterruptedException {
objectStatements.remove(st);

}

@Override
public void cleanSnapshotsFromObjectStatements(int currentSnapshot) throws InterruptedException {
objectStatements.cleanSnapshots(currentSnapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,6 @@ public void addPredicateStatement(MemStatement st) throws InterruptedException {
predicateStatements.add(st);
}

/**
* Removes a statement from this MemURI's list of statements for which it is the predicate.
*/
public void removePredicateStatement(MemStatement st) throws InterruptedException {
predicateStatements.remove(st);
}

/**
* Removes statements from old snapshots (those that have expired at or before the specified snapshot version) from
* this MemValue's list of statements for which it is the predicate.
Expand All @@ -232,11 +225,6 @@ public void addObjectStatement(MemStatement st) throws InterruptedException {
objectStatements.add(st);
}

@Override
public void removeObjectStatement(MemStatement st) throws InterruptedException {
objectStatements.remove(st);
}

@Override
public void cleanSnapshotsFromObjectStatements(int currentSnapshot) throws InterruptedException {
objectStatements.cleanSnapshots(currentSnapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,6 @@ public void addObjectStatement(MemStatement st) throws InterruptedException {
objectStatements.add(st);
}

@Override
public void removeObjectStatement(MemStatement st) throws InterruptedException {
objectStatements.remove(st);
}

@Override
public void cleanSnapshotsFromObjectStatements(int currentSnapshot) throws InterruptedException {
objectStatements.cleanSnapshots(currentSnapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ public void addSubjectStatement(MemStatement st) throws InterruptedException {
subjectStatements.add(st);
}

public void removeSubjectStatement(MemStatement st) throws InterruptedException {
subjectStatements.remove(st);
}

public void cleanSnapshotsFromSubjectStatements(int currentSnapshot) throws InterruptedException {
subjectStatements.cleanSnapshots(currentSnapshot);
}
Expand All @@ -69,10 +65,6 @@ public void addContextStatement(MemStatement st) throws InterruptedException {
contextStatements.add(st);
}

public void removeContextStatement(MemStatement st) throws InterruptedException {
contextStatements.remove(st);
}

public void cleanSnapshotsFromContextStatements(int currentSnapshot) throws InterruptedException {
contextStatements.cleanSnapshots(currentSnapshot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.StampedLock;

/**
* A dedicated data structure for storing MemStatement objects, offering operations optimized for their use in the
Expand Down Expand Up @@ -46,8 +45,6 @@ public class MemStatementList {
private volatile boolean prioritiseCleanup;
private static final VarHandle PRIORITISE_CLEANUP;

private final StampedLock addRemoveLock = new StampedLock();

private final AtomicReference<Thread> prioritisedThread = new AtomicReference<>();

public MemStatementList() {
Expand Down Expand Up @@ -75,69 +72,73 @@ public void add(MemStatement st) throws InterruptedException {
}
}

long readLock = addRemoveLock.readLock();
try {
do {

MemStatement[] statements = getStatements();
int length = statements.length;
do {

if (length > (int) SIZE.getAcquire(this)) {
MemStatement[] statements = getStatements();
int length = statements.length;

int previouslyInsertedIndex = (int) PREVIOUSLY_INSERTED_INDEX.getOpaque(this);
if (previouslyInsertedIndex >= length) {
continue;
}
boolean shouldGrowArray = true;

int i = previouslyInsertedIndex + 1 >= length ? 0 : previouslyInsertedIndex + 1;
if (length > (int) SIZE.getAcquire(this)) {

for (; i != previouslyInsertedIndex; i = (i + 1 >= length ? 0 : i + 1)) {
int previouslyInsertedIndex = (int) PREVIOUSLY_INSERTED_INDEX.getOpaque(this);
if (previouslyInsertedIndex >= length) {
continue;
}

if (statements[i] == null) {
int i = previouslyInsertedIndex + 1 >= length ? 0 : previouslyInsertedIndex + 1;

boolean success = STATEMENTS_ARRAY.compareAndSet(statements, i, null, st);
if (success) {
for (; i != previouslyInsertedIndex; i = (i + 1 >= length ? 0 : i + 1)) {

// check if the statements array has been swapped out (because it was grown) while we
// were
// inserting into it
MemStatement[] statementsAfterInsert = getStatements();
if (statementsAfterInsert != statements
&& STATEMENTS_ARRAY.getAcquire(statements, i) != st) {
// we wrote into an array while it was growing and our write was lost
break;
}
if (statements[i] == null) {

PREVIOUSLY_INSERTED_INDEX.setRelease(this, i);
SIZE.getAndAdd(this, 1);
boolean success = STATEMENTS_ARRAY.compareAndSet(statements, i, null, st);

updateGuaranteedLastIndexInUse(i);
if (success) {
shouldGrowArray = false;

return;
// check if the statements array has been swapped out (because it has grown) while we were
// inserting into it
MemStatement[] statementsAfterInsert = getStatementsWithoutInterrupt();
if (statementsAfterInsert != statements
&& STATEMENTS_ARRAY.getAcquire(statementsAfterInsert, i) != st) {
// We wrote into an array while it was growing and our write was lost.
break;
}
}

PREVIOUSLY_INSERTED_INDEX.setRelease(this, i);
SIZE.getAndAdd(this, 1);

updateGuaranteedLastIndexInUse(i);

return;
}
} else if (previouslyInsertedIndex < 0 && i == length - 1) {
// The array is full but no threads have made it to the code line where the
// PREVIOUSLY_INSERTED_INDEX is updated. Don't grow the array just yet, it is better to wait
// until PREVIOUSLY_INSERTED_INDEX is updated.
shouldGrowArray = false;
break;
}
}
}

if (shouldGrowArray && STATEMENTS.compareAndSet(this, statements, null)) {
// Grow array
MemStatement[] newArray = new MemStatement[Math.max(4, length * 2)];
if (statements != EMPTY_ARRAY) {
System.arraycopy(statements, 0, newArray, 0, length);
}

// statements array is probably full
STATEMENTS.setRelease(this, newArray);
}

if (STATEMENTS.compareAndSet(this, statements, null)) {// Grow array
MemStatement[] newArray = new MemStatement[Math.max(4, length * 2)];
if (statements != EMPTY_ARRAY) {
System.arraycopy(statements, 0, newArray, 0, length);
}
if (Thread.interrupted()) {
throw new InterruptedException();
}

} while (true);

STATEMENTS.setRelease(this, newArray);
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} while (true);
} finally {
addRemoveLock.unlockRead(readLock);
}
}

private void updateGuaranteedLastIndexInUse(int newValue) {
Expand All @@ -151,81 +152,60 @@ private void updateGuaranteedLastIndexInUse(int newValue) {
}
}

public void remove(MemStatement st) throws InterruptedException {
public boolean optimisticRemove(MemStatement st) throws InterruptedException {

do {
MemStatement[] statements = getStatements();

boolean success = true;
for (int i = 0; i < statements.length; i++) {
if (statements[i] == st) {

success = innerRemove(st, statements, i);
MemStatement[] statements = getStatements();

break;
}
for (int i = 0; i < statements.length; i++) {
if (statements[i] == st) {
return optimisticInnerRemove(st, statements, i);
}
if (success) {
break;
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} while (true);
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
return false;
}

public void remove(MemStatement st, int index) throws InterruptedException {

do {
MemStatement[] statements = getStatements();
public boolean optimisticRemove(MemStatement st, int index) throws InterruptedException {
MemStatement[] statements = getStatements();

if (statements[index] == st && innerRemove(st, statements, index)) {
return;
} else {
remove(st);
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} while (true);
if (statements[index] == st && optimisticInnerRemove(st, statements, index)) {
return true;
} else {
return optimisticRemove(st);
}
}

private boolean innerRemove(MemStatement st, MemStatement[] statements, int i) throws InterruptedException {
long writeLock = addRemoveLock.writeLock();
try {
if (getStatements() != statements) {
return false;
}
private boolean optimisticInnerRemove(MemStatement toRemove, MemStatement[] statements, int i) {

boolean success = STATEMENTS_ARRAY.compareAndSet(statements, i, st, null);
if (success) {
while (true) {
int size = size();
boolean decrementedSize = SIZE.compareAndSet(this, size, size - 1);
if (decrementedSize) {
return true;
}
boolean success = STATEMENTS_ARRAY.weakCompareAndSet(statements, i, toRemove, null);
if (success) {

MemStatement[] statementsAfterRemoval = getStatementsWithoutInterrupt();
if (statementsAfterRemoval != statements) {
// We don't know if the statement was removed because the STATEMENTS_ARRAY has changed (because it
// grew). Since it can never shrink we know that if we managed to remove the statement then the index
// should either be null or a different statement
if (STATEMENTS_ARRAY.getAcquire(statementsAfterRemoval, i) == toRemove) {
return false;
}
} else {
return false;
}
} finally {
addRemoveLock.unlockWrite(writeLock);
SIZE.getAndAdd(this, -1);

return true;
} else {
return false;
}

}

public void clear() {
long writeLock = addRemoveLock.writeLock();
try {
statements = EMPTY_ARRAY;
size = 0;
previouslyInsertedIndex = -1;
guaranteedLastIndexInUse = -10;
prioritiseCleanup = false;
} finally {
addRemoveLock.unlockWrite(writeLock);
}
statements = EMPTY_ARRAY;
size = 0;
previouslyInsertedIndex = -1;
guaranteedLastIndexInUse = -10;
prioritiseCleanup = false;
}

public void cleanSnapshots(int currentSnapshot) throws InterruptedException {
Expand All @@ -244,7 +224,7 @@ public void cleanSnapshots(int currentSnapshot) throws InterruptedException {

MemStatement statement = statements[i];
if (statement != null && statement.getTillSnapshot() <= currentSnapshot) {
boolean success = innerRemove(statement, statements, i);
boolean success = optimisticInnerRemove(statement, statements, i);
if (!success) {
error = true;
break;
Expand Down Expand Up @@ -307,6 +287,15 @@ public MemStatement[] getStatements() throws InterruptedException {
return statements;
}

private MemStatement[] getStatementsWithoutInterrupt() {
MemStatement[] statements = (MemStatement[]) STATEMENTS.getAcquire(this);
while (statements == null) {
Thread.onSpinWait();
statements = (MemStatement[]) STATEMENTS.getAcquire(this);
}
return statements;
}

public int getGuaranteedLastIndexInUse() {
return ((int) GUARANTEED_LAST_INDEX_IN_USE.getAcquire(this));
}
Expand Down Expand Up @@ -382,4 +371,27 @@ public void setPrioritiseCleanup(boolean prioritiseCleanup) {
STATEMENTS_ARRAY = MethodHandles.arrayElementVarHandle(MemStatement[].class);
}

boolean verifySizeForTesting() {
MemStatement[] statements1 = getStatementsWithoutInterrupt();
int size = 0;
for (int i = 0; i < statements1.length; i++) {
if (statements1[i] != null) {
size++;
}
}
return size == size();

}

int getRealSizeForTesting() {
MemStatement[] statements1 = getStatementsWithoutInterrupt();
int size = 0;
for (int i = 0; i < statements1.length; i++) {
if (statements1[i] != null) {
size++;
}
}
return size;

}
}
Loading

0 comments on commit 2e9ef2a

Please sign in to comment.