Skip to content

Commit

Permalink
Enable multiQuery optimization for has step
Browse files Browse the repository at this point in the history
Fixes #3244

Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed Apr 20, 2023
1 parent 8da859c commit 9f31967
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4738,18 +4738,29 @@ public void testLimitBatchSizeForMultiQuery() {
bs[i] = graph.addVertex();
cs[i] = graph.addVertex();
cs[i].property("foo", "bar");
cs[i].property("fooBar", "Bar");
a.addEdge("knows", bs[i]);
bs[i].addEdge("knows", cs[i]);
}

int barrierSize = 27;
int limit = 40;
Supplier<GraphTraversal<?, ?>> traversal;
TraversalMetrics profile;

// test batching for `has()`
traversal = () -> graph.traversal().V(cs).barrier(barrierSize).has("foo", "bar").has("fooBar", "Bar");
assertEqualResultWithAndWithoutLimitBatchSize(traversal);
clopen(option(USE_MULTIQUERY), true, option(LIMIT_BATCH_SIZE), true);
profile = traversal.get().profile().next();
assertEquals(3, countBackendQueriesOfSize(barrierSize * 2, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 2, profile.getMetrics()));

// test batching for `out()`
Supplier<GraphTraversal<?, ?>> traversal = () -> graph.traversal().V(bs).barrier(barrierSize).out();
traversal = () -> graph.traversal().V(bs).barrier(barrierSize).out();
assertEqualResultWithAndWithoutLimitBatchSize(traversal);
clopen(option(USE_MULTIQUERY), true, option(LIMIT_BATCH_SIZE), true);
TraversalMetrics profile = traversal.get().profile().next();
profile = traversal.get().profile().next();
assertEquals(3, countBackendQueriesOfSize(barrierSize * 2, profile.getMetrics()));
assertEquals(1, countBackendQueriesOfSize((numV - 3 * barrierSize) * 2, profile.getMetrics()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.janusgraph.graphdb.query.vertex.BasicVertexCentricQueryBuilder;
import org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphTraversalUtil;
import org.janusgraph.graphdb.tinkerpop.profile.TP3ProfileWrapper;
import org.janusgraph.graphdb.util.CopyStepUtil;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -47,7 +48,7 @@ public class JanusGraphEdgeVertexStep extends EdgeVertexStep implements Profilin

public JanusGraphEdgeVertexStep(EdgeVertexStep originalStep, int txVertexCacheSize) {
super(originalStep.getTraversal(), originalStep.getDirection());
originalStep.getLabels().forEach(this::addLabel);
CopyStepUtil.copyAbstractStepModifiableFields(originalStep, this);
this.txVertexCacheSize = txVertexCacheSize;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2023 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.graphdb.tinkerpop.optimize.step;

import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Profiling;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
import org.apache.tinkerpop.gremlin.process.traversal.util.MutableMetrics;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.core.BaseVertexQuery;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.JanusGraphMultiVertexQuery;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.graphdb.query.profile.QueryProfiler;
import org.janusgraph.graphdb.query.vertex.BasicVertexCentricQueryBuilder;
import org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphTraversalUtil;
import org.janusgraph.graphdb.tinkerpop.profile.TP3ProfileWrapper;
import org.janusgraph.graphdb.util.CopyStepUtil;

import java.util.HashSet;
import java.util.Set;

public class JanusGraphHasStep<S extends Element> extends HasStep<S> implements Profiling, MultiQueriable<S,S> {

private boolean useMultiQuery = false;
private boolean batchPropertyPrefetching = false;
private int txVertexCacheSize = 20000;
private Set<JanusGraphVertex> verticesToPrefetch = new HashSet<>();
private Set<Vertex> prefetchedVertices = new HashSet<>();

private QueryProfiler queryProfiler = QueryProfiler.NO_OP;

public JanusGraphHasStep(HasStep<S> originalStep){
this(originalStep.getTraversal(), originalStep.getHasContainers().toArray(new HasContainer[0]));
CopyStepUtil.copyAbstractStepModifiableFields(originalStep, this);

if (originalStep instanceof JanusGraphHasStep) {
JanusGraphHasStep originalJanusGraphHasStep = (JanusGraphHasStep) originalStep;
this.useMultiQuery = originalJanusGraphHasStep.useMultiQuery;
}
}

public JanusGraphHasStep(Traversal.Admin traversal, HasContainer... hasContainers) {
super(traversal, hasContainers);
}

@Override
protected boolean filter(final Traverser.Admin<S> traverser) {
if (useMultiQuery && traverser.get() instanceof Vertex) {
JanusGraphVertex vertex = JanusGraphTraversalUtil.getJanusGraphVertex(traverser);
if(!prefetchedVertices.contains(vertex)){
prefetchNextBatch(vertex);
}
}
return super.filter(traverser);
}

public <Q extends BaseVertexQuery> Q makeQuery(Q query) {
if(!batchPropertyPrefetching){
//TODO: return only specific properties (we should recursively traverse hasContainers and return any property keys)
// final String[] keys = getPropertyKeys();
// query.keys(keys);
}
// TODO: do we need to apply has containers to this multi query?
// if yes, then skipped vertices in the result should be filtered always as `false` and non-filtered vertices
// should be filtered as `true`

// for (final HasContainer condition : getHasContainers()) {
// query.has(condition.getKey(), JanusGraphPredicateUtils.convert(condition.getBiPredicate()), condition.getValue());
// }
((BasicVertexCentricQueryBuilder) query).profiler(queryProfiler);
return query;
}

/**
* This initialisation method is called when an attempt to retrieve a vertex from the cached multiQuery results
* doesn't find an entry.
*/
private void prefetchNextBatch(final JanusGraphVertex requiredVertex) {
if(txVertexCacheSize < 1){
return;
}
//TODO: prefetch both properties and / or vertex labels of the vertices depending on internal hasContainers

verticesToPrefetch.add(requiredVertex);
final JanusGraphMultiVertexQuery multiQuery = JanusGraphTraversalUtil.getTx(getTraversal()).multiQuery(verticesToPrefetch);

makeQuery(multiQuery);

try {
multiQuery.preFetch();
prefetchedVertices.clear();
prefetchedVertices.addAll(verticesToPrefetch);
verticesToPrefetch.clear();
} catch (JanusGraphException janusGraphException) {
if (janusGraphException.isCausedBy(InterruptedException.class)) {
throw new TraversalInterruptedException();
}
}
}

@Override
public void setUseMultiQuery(boolean useMultiQuery) {
this.useMultiQuery = useMultiQuery;
}

@Override
public void registerFutureVertexForPrefetching(Vertex futureVertex) {
verticesToPrefetch.add((JanusGraphVertex) futureVertex);
}

@Override
public void setMetrics(MutableMetrics metrics) {
queryProfiler = new TP3ProfileWrapper(metrics);
}

public void setBatchPropertyPrefetching(boolean batchPropertyPrefetching) {
this.batchPropertyPrefetching = batchPropertyPrefetching;
}

public void setTxVertexCacheSize(int txVertexCacheSize) {
this.txVertexCacheSize = txVertexCacheSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.janusgraph.graphdb.query.vertex.BasicVertexCentricQueryBuilder;
import org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphTraversalUtil;
import org.janusgraph.graphdb.tinkerpop.profile.TP3ProfileWrapper;
import org.janusgraph.graphdb.util.CopyStepUtil;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -63,7 +64,7 @@ public class JanusGraphPropertiesStep<E> extends PropertiesStep<E> implements Ha

public JanusGraphPropertiesStep(PropertiesStep<E> originalStep) {
super(originalStep.getTraversal(), originalStep.getReturnType(), originalStep.getPropertyKeys());
originalStep.getLabels().forEach(this::addLabel);
CopyStepUtil.copyAbstractStepModifiableFields(originalStep, this);

if (originalStep instanceof JanusGraphPropertiesStep) {
JanusGraphPropertiesStep originalJanusGraphPropertiesStep = (JanusGraphPropertiesStep) originalStep;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.janusgraph.graphdb.query.vertex.BasicVertexCentricQueryBuilder;
import org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphTraversalUtil;
import org.janusgraph.graphdb.tinkerpop.profile.TP3ProfileWrapper;
import org.janusgraph.graphdb.util.CopyStepUtil;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -61,7 +62,7 @@ public class JanusGraphVertexStep<E extends Element> extends VertexStep<E> imple

public JanusGraphVertexStep(VertexStep<E> originalStep) {
super(originalStep.getTraversal(), originalStep.getReturnClass(), originalStep.getDirection(), originalStep.getEdgeLabels());
originalStep.getLabels().forEach(this::addLabel);
CopyStepUtil.copyAbstractStepModifiableFields(originalStep, this);

if (originalStep instanceof JanusGraphVertexStep) {
JanusGraphVertexStep originalJanusGraphVertexStep = (JanusGraphVertexStep) originalStep;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.tinkerpop.gremlin.process.traversal.Traversal.Admin;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
Expand All @@ -31,6 +32,7 @@
import org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphTraversalUtil;
import org.janusgraph.graphdb.tinkerpop.optimize.step.HasStepFolder;
import org.janusgraph.graphdb.tinkerpop.optimize.step.JanusGraphEdgeVertexStep;
import org.janusgraph.graphdb.tinkerpop.optimize.step.JanusGraphHasStep;
import org.janusgraph.graphdb.tinkerpop.optimize.step.JanusGraphPropertiesStep;
import org.janusgraph.graphdb.tinkerpop.optimize.step.JanusGraphVertexStep;
import org.janusgraph.graphdb.tinkerpop.optimize.step.MultiQueriable;
Expand Down Expand Up @@ -64,6 +66,7 @@ public void apply(final Traversal.Admin<?, ?> traversal) {

applyJanusGraphVertexSteps(traversal, batchPropertyPrefetching, txVertexCacheSize);
applyJanusGraphPropertiesSteps(traversal);
applyJanusGraphHasSteps(traversal, batchPropertyPrefetching, txVertexCacheSize);
inspectLocalTraversals(traversal);
}

Expand Down Expand Up @@ -91,6 +94,18 @@ private void applyJanusGraphVertexSteps(Admin<?, ?> traversal, boolean batchProp
});
}

private void applyJanusGraphHasSteps(Admin<?, ?> traversal, boolean batchPropertyPrefetching, int txVertexCacheSize) {
TraversalHelper.getStepsOfAssignableClass(HasStep.class, traversal).forEach(originalStep -> {
if(originalStep instanceof JanusGraphHasStep){
return;
}
final JanusGraphHasStep janusGraphHasStep = new JanusGraphHasStep(originalStep);
janusGraphHasStep.setTxVertexCacheSize(txVertexCacheSize);
janusGraphHasStep.setBatchPropertyPrefetching(batchPropertyPrefetching);
TraversalHelper.replaceStep(originalStep, janusGraphHasStep, originalStep.getTraversal());
});
}

private void applyJanusGraphPropertiesSteps(Admin<?, ?> traversal) {
TraversalHelper.getStepsOfAssignableClass(PropertiesStep.class, traversal).forEach(originalStep -> {
final JanusGraphPropertiesStep propertiesStep = new JanusGraphPropertiesStep(originalStep);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.graphdb.util;

import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;

public class CopyStepUtil {

public static void copyAbstractStepModifiableFields(AbstractStep<?,?> originalStep, AbstractStep<?,?> copiedStep){
for(String label : originalStep.getLabels()){
copiedStep.addLabel(label);
}
}

}

0 comments on commit 9f31967

Please sign in to comment.