From dff7f37dfc96c69da4a6df470d37fffd9a49f43e Mon Sep 17 00:00:00 2001 From: Jerven Bolleman Date: Tue, 30 Apr 2024 21:54:57 +0200 Subject: [PATCH] GH-4899 Fix a bug in the PathIteration due strings not matching between different constructions. Introduce optimise-able data-structures in the collection factory for ValuePair used in the PathIteration without exposing the type by wrapping making it extend BindingSet. --- .../factory/api/CollectionFactory.java | 21 +- .../mapdb/MapDb3CollectionFactory.java | 16 ++ .../evaluation/iterator/PathIteration.java | 197 ++++++++++++++++-- 3 files changed, 210 insertions(+), 24 deletions(-) diff --git a/core/collection-factory/api/src/main/java/org/eclipse/rdf4j/collection/factory/api/CollectionFactory.java b/core/collection-factory/api/src/main/java/org/eclipse/rdf4j/collection/factory/api/CollectionFactory.java index 27e5bdf324d..c95ac3e1757 100644 --- a/core/collection-factory/api/src/main/java/org/eclipse/rdf4j/collection/factory/api/CollectionFactory.java +++ b/core/collection-factory/api/src/main/java/org/eclipse/rdf4j/collection/factory/api/CollectionFactory.java @@ -84,7 +84,7 @@ public default Set createSetOfBindingSets() { * @return a set that may be optimised and/or disk based */ public Set createSetOfBindingSets(Supplier create, - Function> getHas, Function> getget, + Function> getHas, Function> getGet, Function> getSet); /** @@ -116,6 +116,25 @@ public Set createSetOfBindingSets(Supplier create */ public Queue createValueQueue(); + /** + * @return a new queue that may be optimized and may use the functions passed in. + */ + public default Queue createBindingSetQueue(Supplier create, + Function> getHas, Function> getget, + Function> getSet) { + return createQueue(); + } + + /** + * @return a new queue optimized for bindingsets + */ + public default Queue createBindingSetQueue() { + Function> gethas = (n) -> (b) -> b.hasBinding(n); + Function> getget = (n) -> (b) -> b.getValue(n); + Function> getSet = (n) -> (v, b) -> b.setBinding(n, v); + return createBindingSetQueue(MapBindingSet::new, gethas, getget, getSet); + } + @InternalUseOnly public Map createGroupByMap(); diff --git a/core/collection-factory/mapdb3/src/main/java/org/eclipse/rdf4j/collection/factory/mapdb/MapDb3CollectionFactory.java b/core/collection-factory/mapdb3/src/main/java/org/eclipse/rdf4j/collection/factory/mapdb/MapDb3CollectionFactory.java index 96731e7bcd4..d4e4e8889d6 100644 --- a/core/collection-factory/mapdb3/src/main/java/org/eclipse/rdf4j/collection/factory/mapdb/MapDb3CollectionFactory.java +++ b/core/collection-factory/mapdb3/src/main/java/org/eclipse/rdf4j/collection/factory/mapdb/MapDb3CollectionFactory.java @@ -259,6 +259,22 @@ public Queue createValueQueue() { } } + @Override + public Queue createBindingSetQueue(Supplier create, + Function> getHas, Function> getget, + Function> getSet) { + if (iterationCacheSyncThreshold > 0) { + init(); + Serializer s = createBindingSetSerializer(create, getHas, getget, getSet); + Map m = db.hashMap(Long.toHexString(colectionId++), new SerializerLong(), s).create(); + return new MemoryTillSizeXQueue<>(delegate.createBindingSetQueue(create, getHas, getget, getSet), 128, + () -> new MapDb3BackedQueue<>(m)); + + } else { + return delegate.createBindingSetQueue(); + } + } + @Override public void close() throws RDF4JException { if (db != null) { diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/PathIteration.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/PathIteration.java index a3f1cb9ee27..237c33c41be 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/PathIteration.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/PathIteration.java @@ -11,13 +11,19 @@ package org.eclipse.rdf4j.query.algebra.evaluation.iterator; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Predicate; import org.eclipse.rdf4j.collection.factory.api.CollectionFactory; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.LookAheadIteration; import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.query.Binding; import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.MutableBindingSet; import org.eclipse.rdf4j.query.QueryEvaluationException; @@ -29,9 +35,14 @@ import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy; import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet; import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor; +import org.eclipse.rdf4j.query.impl.SimpleBinding; public class PathIteration extends LookAheadIteration { + // Should never be seen by code outside of this iterator + private static final String END = "$end_from_path_iteration"; + private static final String START = "$start_from_path_iteration"; + /** * */ @@ -53,11 +64,11 @@ public class PathIteration extends LookAheadIteration { private final boolean endVarFixed; - private final Queue valueQueue; + private final Queue valueQueue; - private final Set reportedValues; + private final Set reportedValues; - private final Set unreportedValues; + private final Set unreportedValues; private final TupleExpr pathExpression; @@ -70,6 +81,10 @@ public class PathIteration extends LookAheadIteration { private final Set namedIntermediateJoins = new HashSet<>(); private final CollectionFactory collectionFactory; + private static volatile int PATH_ITERATOR_ID_GENERATOR = 0; + + private final int pathIteratorId = PATH_ITERATOR_ID_GENERATOR++; + private final String endVarName = "END_" + JOINVAR_PREFIX + pathIteratorId; public PathIteration(EvaluationStrategy strategy, Scope scope, Var startVar, TupleExpr pathExpression, Var endVar, Var contextVar, long minLength, BindingSet bindings) @@ -89,13 +104,53 @@ public PathIteration(EvaluationStrategy strategy, Scope scope, Var startVar, this.bindings = bindings; collectionFactory = strategy.getCollectionFactory().get(); - this.reportedValues = collectionFactory.createSet(); - this.unreportedValues = collectionFactory.createSet(); - this.valueQueue = collectionFactory.createQueue(); + + // This is all necessary for optimized collections to be usable. This only becomes important on very large + // stores with large intermediary results. + this.reportedValues = collectionFactory.createSetOfBindingSets(ValuePair::new, PathIteration::getHas, + PathIteration::getGet, PathIteration::getSet); + this.unreportedValues = collectionFactory.createSetOfBindingSets(ValuePair::new, PathIteration::getHas, + PathIteration::getGet, PathIteration::getSet); + this.valueQueue = collectionFactory.createBindingSetQueue(ValuePair::new, PathIteration::getHas, + PathIteration::getGet, PathIteration::getSet); createIteration(); } + private static final BiConsumer getSet(String s) { + switch (s) { + case START: + return (v, vp) -> ((ValuePair) vp).startValue = v; + case END: + return (v, vp) -> ((ValuePair) vp).endValue = v; + default: + return (v, vp) -> { + }; + } + } + + private static final Function getGet(String s) { + switch (s) { + case START: + return (vp) -> ((ValuePair) vp).startValue; + case END: + return (vp) -> ((ValuePair) vp).endValue; + default: + return (vp) -> null; + } + }; + + private static final Predicate getHas(String s) { + switch (s) { + case START: + return (vp) -> ((ValuePair) vp).startValue != null; + case END: + return (vp) -> ((ValuePair) vp).endValue != null; + default: + return (vp) -> false; + } + }; + @Override protected BindingSet getNextElement() throws QueryEvaluationException { again: while (true) { @@ -183,11 +238,8 @@ protected BindingSet getNextElement() throws QueryEvaluationException { } } - // if we're done, throw away the cached lists of values to avoid - // hogging resources - reportedValues.clear(); - unreportedValues.clear(); - valueQueue.clear(); + // We are done but let the close deal with clearing up resources. + // That method knows how to do it in the cheapest way possible. return null; } } @@ -202,10 +254,10 @@ private ValuePair valuePairFromStartAndEnd(MutableBindingSet nextElement) { if (startVarFixed && endVarFixed && currentLength > 2) { v1 = getVarValue(startVar, startVarFixed, nextElement); - v2 = nextElement.getValue("END_" + JOINVAR_PREFIX + this.hashCode()); + v2 = nextElement.getValue(endVarName); } else if (startVarFixed && endVarFixed && currentLength == 2) { v1 = getVarValue(startVar, startVarFixed, nextElement); - v2 = nextElement.getValue(JOINVAR_PREFIX + (currentLength - 1) + "_" + this.hashCode()); + v2 = nextElement.getValue(varNameAtPathLengthOf(currentLength - 1)); } else { v1 = getVarValue(startVar, startVarFixed, nextElement); v2 = getVarValue(endVar, endVarFixed, nextElement); @@ -229,7 +281,7 @@ protected void handleClose() throws QueryEvaluationException { * @param valueQueue2 * @param vp */ - protected boolean addToQueue(Queue valueQueue2, ValuePair vp) throws QueryEvaluationException { + protected boolean addToQueue(Queue valueQueue2, ValuePair vp) throws QueryEvaluationException { return valueQueue2.add(vp); } @@ -237,7 +289,7 @@ protected boolean addToQueue(Queue valueQueue2, ValuePair vp) throws * @param valueSet * @param vp */ - protected boolean add(Set valueSet, ValuePair vp) throws QueryEvaluationException { + protected boolean add(Set valueSet, ValuePair vp) throws QueryEvaluationException { return valueSet.add(vp); } @@ -278,7 +330,7 @@ private void createIteration() throws QueryEvaluationException { TupleExpr pathExprClone = pathExpression.clone(); if (startVarFixed && endVarFixed) { - String varName = JOINVAR_PREFIX + currentLength + "_" + this.hashCode(); + String varName = varNameAtPathLengthOf(currentLength); Var replacement = createAnonVar(varName, null, true); VarReplacer replacer = new VarReplacer(endVar, replacement, 0, false); @@ -288,7 +340,7 @@ private void createIteration() throws QueryEvaluationException { currentLength++; } else { - currentVp = valueQueue.poll(); + currentVp = (ValuePair) valueQueue.poll(); if (currentVp != null) { @@ -297,9 +349,9 @@ private void createIteration() throws QueryEvaluationException { if (startVarFixed && endVarFixed) { Value v = currentVp.getEndValue(); - Var startReplacement = createAnonVar(JOINVAR_PREFIX + currentLength + "_" + this.hashCode(), v, + Var startReplacement = createAnonVar(varNameAtPathLengthOf(currentLength), v, false); - Var endReplacement = createAnonVar("END_" + JOINVAR_PREFIX + this.hashCode(), null, false); + Var endReplacement = createAnonVar(endVarName, null, false); VarReplacer replacer = new VarReplacer(startVar, startReplacement, 0, false); pathExprClone.visit(replacer); @@ -317,7 +369,7 @@ private void createIteration() throws QueryEvaluationException { v = currentVp.getStartValue(); } - String varName = JOINVAR_PREFIX + currentLength + "-" + this.hashCode(); + String varName = varNameAtPathLengthOf(currentLength); Var replacement = createAnonVar(varName, v, true); VarReplacer replacer = new VarReplacer(toBeReplaced, replacement, 0, false); @@ -333,6 +385,10 @@ private void createIteration() throws QueryEvaluationException { } } + private String varNameAtPathLengthOf(long atLength) { + return JOINVAR_PREFIX + atLength + "_" + pathIteratorId; + } + protected boolean isUnbound(Var var, BindingSet bindings) { if (var == null) { return false; @@ -341,11 +397,16 @@ protected boolean isUnbound(Var var, BindingSet bindings) { } } - protected static class ValuePair { + protected static class ValuePair implements MutableBindingSet { + private static final long serialVersionUID = 1L; + + private Value startValue; + + private Value endValue; - private final Value startValue; + public ValuePair() { - private final Value endValue; + } public ValuePair(Value startValue, Value endValue) { this.startValue = startValue; @@ -403,6 +464,96 @@ public boolean equals(Object obj) { } return true; } + + @Override + public Iterator iterator() { + Binding sb = new SimpleBinding(START, startValue); + Binding eb = new SimpleBinding(END, endValue); + return List.of(sb, eb).iterator(); + } + + @Override + public Set getBindingNames() { + return Set.of(START, END); + } + + @Override + public Binding getBinding(String bindingName) { + switch (bindingName) { + case START: + return new SimpleBinding(START, startValue); + case END: + return new SimpleBinding(END, endValue); + default: + return null; + } + } + + @Override + public boolean hasBinding(String bindingName) { + switch (bindingName) { + case START: + return true; + case END: + return false; + default: + return false; + } + } + + @Override + public Value getValue(String bindingName) { + switch (bindingName) { + case START: + return startValue; + case END: + return endValue; + default: + return null; + } + } + + @Override + public int size() { + return 2; + } + + @Override + public void addBinding(Binding binding) { + switch (binding.getName()) { + case START: + startValue = binding.getValue(); + break; + case END: + endValue = binding.getValue(); + break; + } + } + + @Override + public void setBinding(String name, Value value) { + switch (name) { + case START: + startValue = value; + break; + case END: + endValue = value; + break; + } + + } + + @Override + public void setBinding(Binding binding) { + switch (binding.getName()) { + case START: + startValue = binding.getValue(); + break; + case END: + endValue = binding.getValue(); + break; + } + } } class VarReplacer extends AbstractQueryModelVisitor {