Skip to content

Commit

Permalink
GH-4899 Fix a bug in the PathIteration due strings not matching between
Browse files Browse the repository at this point in the history
different constructions.

Introduce optimise-able data-structures in the collection factory for
ValuePair used in the PathIteration without exposing the type by
wrapping making it extend BindingSet.
  • Loading branch information
JervenBolleman committed May 1, 2024
1 parent e283cde commit dff7f37
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public default Set<BindingSet> createSetOfBindingSets() {
* @return a set that may be optimised and/or disk based
*/
public Set<BindingSet> createSetOfBindingSets(Supplier<MutableBindingSet> create,
Function<String, Predicate<BindingSet>> getHas, Function<String, Function<BindingSet, Value>> getget,
Function<String, Predicate<BindingSet>> getHas, Function<String, Function<BindingSet, Value>> getGet,
Function<String, BiConsumer<Value, MutableBindingSet>> getSet);

/**
Expand Down Expand Up @@ -116,6 +116,25 @@ public Set<BindingSet> createSetOfBindingSets(Supplier<MutableBindingSet> create
*/
public Queue<Value> createValueQueue();

/**
* @return a new queue that may be optimized and may use the functions passed in.
*/
public default Queue<BindingSet> createBindingSetQueue(Supplier<MutableBindingSet> create,
Function<String, Predicate<BindingSet>> getHas, Function<String, Function<BindingSet, Value>> getget,
Function<String, BiConsumer<Value, MutableBindingSet>> getSet) {
return createQueue();
}

/**
* @return a new queue optimized for bindingsets
*/
public default Queue<BindingSet> createBindingSetQueue() {
Function<String, Predicate<BindingSet>> gethas = (n) -> (b) -> b.hasBinding(n);
Function<String, Function<BindingSet, Value>> getget = (n) -> (b) -> b.getValue(n);
Function<String, BiConsumer<Value, MutableBindingSet>> getSet = (n) -> (v, b) -> b.setBinding(n, v);
return createBindingSetQueue(MapBindingSet::new, gethas, getget, getSet);
}

@InternalUseOnly
public <E> Map<BindingSetKey, E> createGroupByMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,22 @@ public Queue<Value> createValueQueue() {
}
}

@Override
public Queue<BindingSet> createBindingSetQueue(Supplier<MutableBindingSet> create,
Function<String, Predicate<BindingSet>> getHas, Function<String, Function<BindingSet, Value>> getget,
Function<String, BiConsumer<Value, MutableBindingSet>> getSet) {
if (iterationCacheSyncThreshold > 0) {
init();
Serializer<BindingSet> s = createBindingSetSerializer(create, getHas, getget, getSet);
Map<Long, BindingSet> m = db.hashMap(Long.toHexString(colectionId++), new SerializerLong(), s).create();
return new MemoryTillSizeXQueue<>(delegate.createBindingSetQueue(create, getHas, getget, getSet), 128,
() -> new MapDb3BackedQueue<>(m));

} else {
return delegate.createBindingSetQueue();
}
}

@Override
public void close() throws RDF4JException {
if (db != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
package org.eclipse.rdf4j.query.algebra.evaluation.iterator;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;

import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.Binding;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MutableBindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
Expand All @@ -29,9 +35,14 @@
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
import org.eclipse.rdf4j.query.impl.SimpleBinding;

public class PathIteration extends LookAheadIteration<BindingSet> {

// Should never be seen by code outside of this iterator
private static final String END = "$end_from_path_iteration";
private static final String START = "$start_from_path_iteration";

/**
*
*/
Expand All @@ -53,11 +64,11 @@ public class PathIteration extends LookAheadIteration<BindingSet> {

private final boolean endVarFixed;

private final Queue<ValuePair> valueQueue;
private final Queue<BindingSet> valueQueue;

private final Set<ValuePair> reportedValues;
private final Set<BindingSet> reportedValues;

private final Set<ValuePair> unreportedValues;
private final Set<BindingSet> unreportedValues;

private final TupleExpr pathExpression;

Expand All @@ -70,6 +81,10 @@ public class PathIteration extends LookAheadIteration<BindingSet> {
private final Set<String> namedIntermediateJoins = new HashSet<>();

private final CollectionFactory collectionFactory;
private static volatile int PATH_ITERATOR_ID_GENERATOR = 0;

private final int pathIteratorId = PATH_ITERATOR_ID_GENERATOR++;
private final String endVarName = "END_" + JOINVAR_PREFIX + pathIteratorId;

public PathIteration(EvaluationStrategy strategy, Scope scope, Var startVar,
TupleExpr pathExpression, Var endVar, Var contextVar, long minLength, BindingSet bindings)
Expand All @@ -89,13 +104,53 @@ public PathIteration(EvaluationStrategy strategy, Scope scope, Var startVar,
this.bindings = bindings;

collectionFactory = strategy.getCollectionFactory().get();
this.reportedValues = collectionFactory.createSet();
this.unreportedValues = collectionFactory.createSet();
this.valueQueue = collectionFactory.createQueue();

// This is all necessary for optimized collections to be usable. This only becomes important on very large
// stores with large intermediary results.
this.reportedValues = collectionFactory.createSetOfBindingSets(ValuePair::new, PathIteration::getHas,
PathIteration::getGet, PathIteration::getSet);
this.unreportedValues = collectionFactory.createSetOfBindingSets(ValuePair::new, PathIteration::getHas,
PathIteration::getGet, PathIteration::getSet);
this.valueQueue = collectionFactory.createBindingSetQueue(ValuePair::new, PathIteration::getHas,
PathIteration::getGet, PathIteration::getSet);

createIteration();
}

private static final BiConsumer<Value, MutableBindingSet> getSet(String s) {
switch (s) {
case START:
return (v, vp) -> ((ValuePair) vp).startValue = v;
case END:
return (v, vp) -> ((ValuePair) vp).endValue = v;
default:
return (v, vp) -> {
};
}
}

private static final Function<BindingSet, Value> getGet(String s) {
switch (s) {
case START:
return (vp) -> ((ValuePair) vp).startValue;
case END:
return (vp) -> ((ValuePair) vp).endValue;
default:
return (vp) -> null;
}
};

private static final Predicate<BindingSet> getHas(String s) {
switch (s) {
case START:
return (vp) -> ((ValuePair) vp).startValue != null;
case END:
return (vp) -> ((ValuePair) vp).endValue != null;
default:
return (vp) -> false;
}
};

@Override
protected BindingSet getNextElement() throws QueryEvaluationException {
again: while (true) {
Expand Down Expand Up @@ -183,11 +238,8 @@ protected BindingSet getNextElement() throws QueryEvaluationException {
}
}

// if we're done, throw away the cached lists of values to avoid
// hogging resources
reportedValues.clear();
unreportedValues.clear();
valueQueue.clear();
// We are done but let the close deal with clearing up resources.
// That method knows how to do it in the cheapest way possible.
return null;
}
}
Expand All @@ -202,10 +254,10 @@ private ValuePair valuePairFromStartAndEnd(MutableBindingSet nextElement) {

if (startVarFixed && endVarFixed && currentLength > 2) {
v1 = getVarValue(startVar, startVarFixed, nextElement);
v2 = nextElement.getValue("END_" + JOINVAR_PREFIX + this.hashCode());
v2 = nextElement.getValue(endVarName);
} else if (startVarFixed && endVarFixed && currentLength == 2) {
v1 = getVarValue(startVar, startVarFixed, nextElement);
v2 = nextElement.getValue(JOINVAR_PREFIX + (currentLength - 1) + "_" + this.hashCode());
v2 = nextElement.getValue(varNameAtPathLengthOf(currentLength - 1));
} else {
v1 = getVarValue(startVar, startVarFixed, nextElement);
v2 = getVarValue(endVar, endVarFixed, nextElement);
Expand All @@ -229,15 +281,15 @@ protected void handleClose() throws QueryEvaluationException {
* @param valueQueue2
* @param vp
*/
protected boolean addToQueue(Queue<ValuePair> valueQueue2, ValuePair vp) throws QueryEvaluationException {
protected boolean addToQueue(Queue<BindingSet> valueQueue2, ValuePair vp) throws QueryEvaluationException {
return valueQueue2.add(vp);
}

/**
* @param valueSet
* @param vp
*/
protected boolean add(Set<ValuePair> valueSet, ValuePair vp) throws QueryEvaluationException {
protected boolean add(Set<BindingSet> valueSet, ValuePair vp) throws QueryEvaluationException {
return valueSet.add(vp);
}

Expand Down Expand Up @@ -278,7 +330,7 @@ private void createIteration() throws QueryEvaluationException {
TupleExpr pathExprClone = pathExpression.clone();

if (startVarFixed && endVarFixed) {
String varName = JOINVAR_PREFIX + currentLength + "_" + this.hashCode();
String varName = varNameAtPathLengthOf(currentLength);
Var replacement = createAnonVar(varName, null, true);

VarReplacer replacer = new VarReplacer(endVar, replacement, 0, false);
Expand All @@ -288,7 +340,7 @@ private void createIteration() throws QueryEvaluationException {
currentLength++;
} else {

currentVp = valueQueue.poll();
currentVp = (ValuePair) valueQueue.poll();

if (currentVp != null) {

Expand All @@ -297,9 +349,9 @@ private void createIteration() throws QueryEvaluationException {
if (startVarFixed && endVarFixed) {

Value v = currentVp.getEndValue();
Var startReplacement = createAnonVar(JOINVAR_PREFIX + currentLength + "_" + this.hashCode(), v,
Var startReplacement = createAnonVar(varNameAtPathLengthOf(currentLength), v,
false);
Var endReplacement = createAnonVar("END_" + JOINVAR_PREFIX + this.hashCode(), null, false);
Var endReplacement = createAnonVar(endVarName, null, false);

VarReplacer replacer = new VarReplacer(startVar, startReplacement, 0, false);
pathExprClone.visit(replacer);
Expand All @@ -317,7 +369,7 @@ private void createIteration() throws QueryEvaluationException {
v = currentVp.getStartValue();
}

String varName = JOINVAR_PREFIX + currentLength + "-" + this.hashCode();
String varName = varNameAtPathLengthOf(currentLength);
Var replacement = createAnonVar(varName, v, true);

VarReplacer replacer = new VarReplacer(toBeReplaced, replacement, 0, false);
Expand All @@ -333,6 +385,10 @@ private void createIteration() throws QueryEvaluationException {
}
}

private String varNameAtPathLengthOf(long atLength) {
return JOINVAR_PREFIX + atLength + "_" + pathIteratorId;
}

protected boolean isUnbound(Var var, BindingSet bindings) {
if (var == null) {
return false;
Expand All @@ -341,11 +397,16 @@ protected boolean isUnbound(Var var, BindingSet bindings) {
}
}

protected static class ValuePair {
protected static class ValuePair implements MutableBindingSet {
private static final long serialVersionUID = 1L;

private Value startValue;

private Value endValue;

private final Value startValue;
public ValuePair() {

private final Value endValue;
}

public ValuePair(Value startValue, Value endValue) {
this.startValue = startValue;
Expand Down Expand Up @@ -403,6 +464,96 @@ public boolean equals(Object obj) {
}
return true;
}

@Override
public Iterator<Binding> iterator() {
Binding sb = new SimpleBinding(START, startValue);
Binding eb = new SimpleBinding(END, endValue);
return List.of(sb, eb).iterator();
}

@Override
public Set<String> getBindingNames() {
return Set.of(START, END);
}

@Override
public Binding getBinding(String bindingName) {
switch (bindingName) {
case START:
return new SimpleBinding(START, startValue);
case END:
return new SimpleBinding(END, endValue);
default:
return null;
}
}

@Override
public boolean hasBinding(String bindingName) {
switch (bindingName) {
case START:
return true;
case END:
return false;
default:
return false;
}
}

@Override
public Value getValue(String bindingName) {
switch (bindingName) {
case START:
return startValue;
case END:
return endValue;
default:
return null;
}
}

@Override
public int size() {
return 2;
}

@Override
public void addBinding(Binding binding) {
switch (binding.getName()) {
case START:
startValue = binding.getValue();
break;
case END:
endValue = binding.getValue();
break;
}
}

@Override
public void setBinding(String name, Value value) {
switch (name) {
case START:
startValue = value;
break;
case END:
endValue = value;
break;
}

}

@Override
public void setBinding(Binding binding) {
switch (binding.getName()) {
case START:
startValue = binding.getValue();
break;
case END:
endValue = binding.getValue();
break;
}
}
}

class VarReplacer extends AbstractQueryModelVisitor<QueryEvaluationException> {
Expand Down

0 comments on commit dff7f37

Please sign in to comment.