Skip to content

Commit

Permalink
GH-4554 Correctly close changesets
Browse files Browse the repository at this point in the history
- some shallow copies of Changeset were not closed at all
- use reference counting for approved and deprecated models
  • Loading branch information
kenwenzel committed May 24, 2023
1 parent e9dac06 commit cc63c42
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package org.eclipse.rdf4j.sail.base;

import java.lang.invoke.VarHandle;
import java.lang.ref.Reference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -23,6 +24,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -51,6 +53,28 @@
@InternalUseOnly
public abstract class Changeset implements SailSink, ModelFactory {

static class CountedReference<T> {
final T referent;
AtomicInteger count = new AtomicInteger(1);

CountedReference(T referent) {
this.referent = referent;
}

CountedReference<T> retain() {
count.incrementAndGet();
return this;
}

boolean release() {
return count.decrementAndGet() == 0;
}

T get() {
return referent;
}
}

AdderBasedReadWriteLock readWriteLock = new AdderBasedReadWriteLock();
AdderBasedReadWriteLock refBacksReadWriteLock = new AdderBasedReadWriteLock();
Semaphore prependLock = new Semaphore(1);
Expand Down Expand Up @@ -78,15 +102,15 @@ public abstract class Changeset implements SailSink, ModelFactory {
* <p>
* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE
*/
private volatile Model approved;
private volatile CountedReference<Model> approved;
private volatile boolean approvedEmpty = true;

/**
* Explicit statements that have been removed as part of a transaction, but have not yet been committed.
* <p>
* DO NOT EXPOSE THE MODEL OUTSIDE OF THIS CLASS BECAUSE IT IS NOT THREAD-SAFE
*/
private volatile Model deprecated;
private volatile CountedReference<Model> deprecated;
private volatile boolean deprecatedEmpty = true;

/**
Expand Down Expand Up @@ -132,14 +156,14 @@ public void close() throws SailException {
addedNamespaces = null;
removedPrefixes = null;
try {
if (approved instanceof AutoCloseable) {
((AutoCloseable) approved).close();
if (approved != null && approved.release() && approved.get() instanceof AutoCloseable) {
((AutoCloseable) approved.get()).close();
}
} catch (Exception e) {
throw new SailException(e);
} finally {
approved = null;
if (deprecated instanceof AutoCloseable) {
if (deprecated != null && deprecated.release() && deprecated.get() instanceof AutoCloseable) {
try {
((AutoCloseable) deprecated).close();
} catch (Exception e) {
Expand Down Expand Up @@ -184,7 +208,7 @@ boolean hasApproved(Resource subj, IRI pred, Value obj, Resource[] contexts) {

boolean readLock = readWriteLock.readLock();
try {
return approved.contains(subj, pred, obj, contexts);
return approved.get().contains(subj, pred, obj, contexts);
} finally {
readWriteLock.unlockReader(readLock);
}
Expand All @@ -206,7 +230,7 @@ boolean hasDeprecated(Resource subj, IRI pred, Value obj, Resource[] contexts) {
}
}

return deprecated.contains(subj, pred, obj, contexts);
return deprecated.get().contains(subj, pred, obj, contexts);
} finally {
readWriteLock.unlockReader(readLock);
}
Expand Down Expand Up @@ -389,7 +413,7 @@ public void clear(Resource... contexts) {
statementCleared = true;

if (approved != null) {
approved.clear();
approved.get().clear();
}
if (approvedContexts != null) {
approvedContexts.clear();
Expand All @@ -399,7 +423,7 @@ public void clear(Resource... contexts) {
deprecatedContexts = new HashSet<>();
}
if (approved != null) {
approved.remove(null, null, null, contexts);
approved.get().remove(null, null, null, contexts);
}
if (approvedContexts != null && contexts != null) {
for (Resource resource : contexts) {
Expand All @@ -410,7 +434,7 @@ public void clear(Resource... contexts) {
deprecatedContexts.addAll(Arrays.asList(contexts));
}
}
approvedEmpty = approved == null || approved.isEmpty();
approvedEmpty = approved == null || approved.get().isEmpty();
} finally {
readWriteLock.unlockWriter(writeLock);
}
Expand All @@ -425,13 +449,13 @@ public void approve(Statement statement) {
try {

if (deprecated != null) {
deprecated.remove(statement);
deprecatedEmpty = deprecated == null || deprecated.isEmpty();
deprecated.get().remove(statement);
deprecatedEmpty = deprecated == null || deprecated.get().isEmpty();
}
if (approved == null) {
approved = createEmptyModel();
approved = new CountedReference<>(createEmptyModel());
}
approved.add(statement);
approved.get().add(statement);
approvedEmpty = false;
if (statement.getContext() != null) {
if (approvedContexts == null) {
Expand All @@ -456,17 +480,17 @@ public void deprecate(Statement statement) {
long writeLock = readWriteLock.writeLock();
try {
if (approved != null) {
approved.remove(statement);
approvedEmpty = approved == null || approved.isEmpty();
approved.get().remove(statement);
approvedEmpty = approved == null || approved.get().isEmpty();
}
if (deprecated == null) {
deprecated = createEmptyModel();
deprecated = new CountedReference<>(createEmptyModel());
}
deprecated.add(statement);
deprecated.get().add(statement);
deprecatedEmpty = false;
Resource ctx = statement.getContext();
if (approvedContexts != null && approvedContexts.contains(ctx)
&& !approved.contains(null, null, null, ctx)) {
&& !approved.get().contains(null, null, null, ctx)) {
approvedContexts.remove(ctx);
}
} finally {
Expand Down Expand Up @@ -501,11 +525,11 @@ public String toString() {
sb.append(" deprecatedContexts, ");
}
if (deprecated != null) {
sb.append(deprecated.size());
sb.append(deprecated.get().size());
sb.append(" deprecated, ");
}
if (approved != null) {
sb.append(approved.size());
sb.append(approved.get().size());
sb.append(" approved, ");
}
if (sb.length() > 0) {
Expand All @@ -520,9 +544,9 @@ protected void setChangeset(Changeset from) {
assert !from.closed;

this.observed = from.observed;
this.approved = from.approved;
this.approved = from.approved != null ? from.approved.retain() : null;
this.approvedEmpty = from.approvedEmpty;
this.deprecated = from.deprecated;
this.deprecated = from.deprecated != null ? from.deprecated.retain() : null;
this.deprecatedEmpty = from.deprecatedEmpty;
this.approvedContexts = from.approvedContexts;
this.deprecatedContexts = from.deprecatedContexts;
Expand Down Expand Up @@ -689,7 +713,7 @@ List<Statement> getDeprecatedStatements() {

boolean readLock = readWriteLock.readLock();
try {
return new ArrayList<>(deprecated);
return new ArrayList<>(deprecated.get());
} finally {
readWriteLock.unlockReader(readLock);
}
Expand All @@ -704,7 +728,7 @@ List<Statement> getApprovedStatements() {

boolean readLock = readWriteLock.readLock();
try {
return new ArrayList<>(approved);
return new ArrayList<>(approved.get());
} finally {
readWriteLock.unlockReader(readLock);
}
Expand All @@ -725,7 +749,7 @@ boolean hasDeprecated(Statement statement) {
}
}
if (deprecated != null) {
return deprecated.contains(statement);
return deprecated.get().contains(statement);
} else {
return false;
}
Expand All @@ -751,7 +775,7 @@ Iterable<Statement> getApprovedStatements(Resource subj, IRI pred, Value obj,
boolean readLock = readWriteLock.readLock();
try {

Iterable<Statement> statements = approved.getStatements(subj, pred, obj, contexts);
Iterable<Statement> statements = approved.get().getStatements(subj, pred, obj, contexts);

// This is a synchronized context, users of this method will be allowed to use the results at their leisure.
// We
Expand Down Expand Up @@ -788,7 +812,8 @@ Iterable<Triple> getApprovedTriples(Resource subj, IRI pred, Value obj) {
try {
// TODO none of this is particularly well thought-out in terms of performance, but we are aiming
// for functionally complete first.
Stream<Triple> approvedSubjectTriples = approved.parallelStream()
Stream<Triple> approvedSubjectTriples = approved.get()
.parallelStream()
.filter(st -> st.getSubject().isTriple())
.map(st -> (Triple) st.getSubject())
.filter(t -> {
Expand All @@ -801,7 +826,8 @@ Iterable<Triple> getApprovedTriples(Resource subj, IRI pred, Value obj) {
return obj == null || obj.equals(t.getObject());
});

Stream<Triple> approvedObjectTriples = approved.parallelStream()
Stream<Triple> approvedObjectTriples = approved.get()
.parallelStream()
.filter(st -> st.getObject().isTriple())
.map(st -> (Triple) st.getObject())
.filter(t -> {
Expand All @@ -825,8 +851,8 @@ void removeApproved(Statement next) {
long writeLock = readWriteLock.writeLock();
try {
if (approved != null) {
approved.remove(next);
approvedEmpty = approved == null || approved.isEmpty();
approved.get().remove(next);
approvedEmpty = approved == null || approved.get().isEmpty();
}
} finally {
readWriteLock.unlockWriter(writeLock);
Expand All @@ -850,7 +876,7 @@ void sinkApproved(SailSink sink) {
boolean readLock = readWriteLock.readLock();
try {
if (approved != null) {
sink.approveAll(approved, approvedContexts);
sink.approveAll(approved.get(), approvedContexts);
}
} finally {
readWriteLock.unlockReader(readLock);
Expand All @@ -865,7 +891,7 @@ void sinkDeprecated(SailSink sink) {
boolean readLock = readWriteLock.readLock();
try {
if (deprecated != null) {
sink.deprecateAll(deprecated);
sink.deprecateAll(deprecated.get());
}
} finally {
readWriteLock.unlockReader(readLock);
Expand Down Expand Up @@ -895,12 +921,12 @@ public void approveAll(Set<Statement> approve, Set<Resource> approveContexts) {
try {

if (deprecated != null) {
deprecated.removeAll(approve);
deprecated.get().removeAll(approve);
}
if (approved == null) {
approved = createEmptyModel();
approved = new CountedReference<>(createEmptyModel());
}
approved.addAll(approve);
approved.get().addAll(approve);
approvedEmpty = approvedEmpty && approve.isEmpty();

if (approveContexts != null) {
Expand All @@ -921,19 +947,19 @@ public void deprecateAll(Set<Statement> deprecate) {
try {

if (approved != null) {
approved.removeAll(deprecate);
approvedEmpty = approved == null || approved.isEmpty();
approved.get().removeAll(deprecate);
approvedEmpty = approved == null || approved.get().isEmpty();
}
if (deprecated == null) {
deprecated = createEmptyModel();
deprecated = new CountedReference<>(createEmptyModel());
}
deprecated.addAll(deprecate);
deprecated.get().addAll(deprecate);
deprecatedEmpty = deprecatedEmpty && deprecate.isEmpty();

for (Statement statement : deprecate) {
Resource ctx = statement.getContext();
if (approvedContexts != null && approvedContexts.contains(ctx)
&& !approved.contains(null, null, null, ctx)) {
&& !approved.get().contains(null, null, null, ctx)) {
approvedContexts.remove(ctx);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,10 @@ private void flush(SailSink sink) throws SailException {
&& !isChanged((Changeset) sink)) {
// one change to apply that is not in use to an empty Changeset
Changeset dst = (Changeset) sink;
dst.setChangeset(changes.pop());
Changeset src = changes.pop();
dst.setChangeset(src);
// correctly close changeset
src.close();
} else {
Iterator<Changeset> iter = changes.iterator();
while (iter.hasNext()) {
Expand Down Expand Up @@ -517,6 +520,9 @@ private void flush(Changeset change, SailSink sink) throws SailException {

change.sinkDeprecated(sink);
change.sinkApproved(sink);

// correctly close changeset
change.close();
}

}

0 comments on commit cc63c42

Please sign in to comment.