Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Super Node Series One: Introduce proxy node type #4420

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -8846,6 +8848,86 @@ public void performReindexAndVerifyEdgeCount(String indexName, String edgeLabel,
}
}

@Test
public void testNeighborCountWithProxyNode() {
mgmt.makePropertyKey("proxies").dataType(Long.class).cardinality(Cardinality.LIST).make();
mgmt.makeVertexLabel("proxy").setProxy().make();
mgmt.commit();

newTx();

Vertex v0 = graph.addVertex("vertexId", "v0");
Vertex v1 = graph.addVertex("vertexId", "v1");
Vertex v2 = graph.addVertex("vertexId", "v2");
Vertex v3a = graph.addVertex("vertexId", "v3a");
Vertex v3b = graph.addVertex("vertexId", "v3b");
Vertex v4 = graph.addVertex("vertexId", "v4");

v0.addEdge("normal-edge", v1);
v2.addEdge("normal-edge", v3a);
v1.addEdge("labelX", v4);

// assume now v1 becomes a super node and we decide to introduce proxy node(s). The previous edges are retained.
// assume the application logic adds an edge from v1 to v2 with labelX, another edge from v1 to v3a with labelY,
// and another edge from v1 to v3b with labelY.
// what happens under the hood is v1 connects to v2/v3a/v3b via vProxy.
Vertex vProxy = graph.addVertex(T.label, "proxy", "canonicalId", v1.id());
vProxy.addEdge("labelX", v2, "runDate", "01");
vProxy.addEdge("labelY", v3a, "runDate", "02");
vProxy.addEdge("labelY", v3b, "runDate", "02");
// be careful: we need to make sure we assign id right after vertex is created
v1.property("proxies", vProxy.id());
graph.tx().commit();

// ensure proxy node is not involved when we retrieve canonical node's properties
assertEquals(ImmutableList.of("v1"), graph.traversal().V(v1).values("vertexId").toList());

// queries without label constraints
assertEquals(4, graph.traversal().V(v1).out().count().next());
assertEquals(4, graph.traversal().V(v1).out().toList().size());
assertEquals(ImmutableSet.of("v2", "v3a", "v3b", "v4"), new HashSet<>(graph.traversal().V(v1).out().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v2", "v3a", "v3b", "v4"), new HashSet<>(graph.traversal().V(v1).outE().inV().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v0", "v2", "v3a", "v3b", "v4"), new HashSet<>(graph.traversal().V(v1).both().values("vertexId").toList()));
assertTrue(graph.traversal().V(v1).outE().where(__.otherV().has("vertexId", "v2")).hasNext());

assertEquals(2, graph.traversal().V(v3a).in().count().next());
assertEquals(2, graph.traversal().V(v3a).both().count().next());
assertEquals(0, graph.traversal().V(v3a).out().count().next());
assertEquals(2, graph.traversal().V(v3a).inE().count().next());
assertEquals(2, graph.traversal().V(v3a).bothE().count().next());
assertEquals(0, graph.traversal().V(v3a).outE().count().next());
assertEquals(ImmutableSet.of("v1", "v2"), new HashSet<>(graph.traversal().V(v3a).in().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v1", "v2"), new HashSet<>(graph.traversal().V(v3a).both().values("vertexId").toList()));

assertEquals(ImmutableSet.of("v1", "v3a"), new HashSet<>(graph.traversal().V(v2).both().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v1"), new HashSet<>(graph.traversal().V(v2).in().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a"), new HashSet<>(graph.traversal().V(v2).out().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a"), new HashSet<>(graph.traversal().V(v2).outE().inV().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a"), new HashSet<>(graph.traversal().V(v2).outE().otherV().values("vertexId").toList()));

// queries with label constraints
assertEquals(ImmutableSet.of("v2", "v4"), new HashSet<>(graph.traversal().V(v1).out("labelX").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a", "v3b"), new HashSet<>(graph.traversal().V(v1).out("labelY").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v2", "v4"), new HashSet<>(graph.traversal().V(v1).both("labelX").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a", "v3b"), new HashSet<>(graph.traversal().V(v1).both("labelY").values("vertexId").toList()));
assertEquals(2, graph.traversal().V(v1).out("labelX").count().next());
assertEquals(2, graph.traversal().V(v1).out("labelY").count().next());

assertEquals(ImmutableSet.of("v1"), new HashSet<>(graph.traversal().V(v3a).both("labelY").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v2"), new HashSet<>(graph.traversal().V(v3a).both("normal-edge").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v1"), new HashSet<>(graph.traversal().V(v3a).in("labelY").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v1"), new HashSet<>(graph.traversal().V(v3a).inE("labelY").outV().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v2"), new HashSet<>(graph.traversal().V(v3a).in("normal-edge").values("vertexId").toList()));
assertEquals(4, graph.traversal().V(v3a).in("labelY").out().count().next());
assertEquals(4, graph.traversal().V(v3a).in("labelY").out().toList().size());
assertEquals(4, graph.traversal().V(v3a).in("labelY").out().values("vertexId").toList().size());
assertEquals(ImmutableSet.of("v2", "v3a", "v3b", "v4"), new HashSet<>(graph.traversal().V(v3a).in("labelY").out().values("vertexId").toList()));

assertEquals("02", graph.traversal().V(v3a).inE("labelY").where(__.otherV().has("vertexId", "v1")).next().property("runDate").value());


}

@Test
public void testMultipleOrClauses() {
clopen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public interface VertexLabel extends JanusGraphVertex, JanusGraphSchemaType {
*/
boolean isStatic();

boolean isProxy();

//TTL

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public interface VertexLabelMaker {
*/
VertexLabelMaker setStatic();

VertexLabelMaker setProxy();

/**
* Creates a {@link VertexLabel} according to the specifications of this builder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphMixedIndexAggStrategy;
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphMixedIndexCountStrategy;
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphMultiQueryStrategy;
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphProxyTraversalStrategy;
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphStepStrategy;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.transaction.StandardTransactionBuilder;
Expand Down Expand Up @@ -160,7 +161,8 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph {
JanusGraphMixedIndexAggStrategy.instance(),
JanusGraphMixedIndexCountStrategy.instance(),
JanusGraphStepStrategy.instance(),
JanusGraphIoRegistrationStrategy.instance());
JanusGraphIoRegistrationStrategy.instance(),
JanusGraphProxyTraversalStrategy.instance());

//Register with cache
TraversalStrategies.GlobalCache.registerStrategies(StandardJanusGraph.class, graphStrategies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ private static IDManager.VertexIDType getVertexIDType(VertexLabel vertexLabel) {
return IDManager.VertexIDType.PartitionedVertex;
} else if (vertexLabel.isStatic()) {
return IDManager.VertexIDType.UnmodifiableVertex;
} else if (vertexLabel.isProxy()) {
return IDManager.VertexIDType.ProxyVertex;
} else {
return IDManager.VertexIDType.NormalVertex;
}
Expand All @@ -403,6 +405,7 @@ private class SimpleVertexIDBlockSizer implements IDBlockSizer {
public long getBlockSize(int idNamespace) {
switch (PoolType.getPoolType(idNamespace)) {
case NORMAL_VERTEX:
case PROXY_VERTEX:
return baseBlockSize;
case UNMODIFIABLE_VERTEX:
return Math.max(10,baseBlockSize/10);
Expand All @@ -426,7 +429,7 @@ public long getIdUpperBound(int idNamespace) {

private enum PoolType {

NORMAL_VERTEX, UNMODIFIABLE_VERTEX, PARTITIONED_VERTEX, RELATION, SCHEMA;
NORMAL_VERTEX, PROXY_VERTEX, UNMODIFIABLE_VERTEX, PARTITIONED_VERTEX, RELATION, SCHEMA;

public int getIDNamespace() {
return ordinal();
Expand All @@ -437,6 +440,7 @@ public long getCountBound(IDManager idManager) {
case NORMAL_VERTEX:
case UNMODIFIABLE_VERTEX:
case PARTITIONED_VERTEX:
case PROXY_VERTEX:
return idManager.getVertexCountBound();
case RELATION: return idManager.getRelationCountBound();
case SCHEMA: return IDManager.getSchemaCountBound();
Expand All @@ -449,6 +453,7 @@ public boolean hasOnePerPartition() {
case NORMAL_VERTEX:
case UNMODIFIABLE_VERTEX:
case RELATION:
case PROXY_VERTEX:
return true;
default: return false;
}
Expand All @@ -458,6 +463,7 @@ public static PoolType getPoolTypeFor(IDManager.VertexIDType idType) {
if (idType==IDManager.VertexIDType.NormalVertex) return NORMAL_VERTEX;
else if (idType== IDManager.VertexIDType.UnmodifiableVertex) return UNMODIFIABLE_VERTEX;
else if (idType== IDManager.VertexIDType.PartitionedVertex) return PARTITIONED_VERTEX;
else if (idType== IDManager.VertexIDType.ProxyVertex) return PROXY_VERTEX;
else if (IDManager.VertexIDType.Schema.isSubType(idType)) return SCHEMA;
else throw new IllegalArgumentException("Invalid id type: " + idType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class IDManager {
* 000 - * Normal vertices
* 010 - * Partitioned vertices
* 100 - * Unmodifiable (e.g. TTL'ed) vertices
* 110 - + Reserved for additional vertex type
* 110 - * Proxy vertices
* 1 - + Invisible
* 11 - * Invisible (user created/triggered) Vertex [for later]
* 01 - + Schema related vertices
Expand Down Expand Up @@ -125,6 +125,22 @@ final boolean isProper() {
return true;
}
},
ProxyVertex {
@Override
final long offset() {
return 3L;
}

@Override
final long suffix() {
return 6L;
}

@Override
final boolean isProper() {
return true;
}
},
Invisible {
@Override
final long offset() {
Expand Down Expand Up @@ -463,6 +479,7 @@ private static VertexIDType getUserVertexIDType(Object vertexId) {
if (VertexIDType.NormalVertex.is(vertexId)) type=VertexIDType.NormalVertex;
else if (VertexIDType.PartitionedVertex.is(vertexId)) type=VertexIDType.PartitionedVertex;
else if (VertexIDType.UnmodifiableVertex.is(vertexId)) type=VertexIDType.UnmodifiableVertex;
else if (VertexIDType.ProxyVertex.is(vertexId)) type=VertexIDType.ProxyVertex;
if (null == type) {
throw new InvalidIDException("Vertex ID " + vertexId + " has unrecognized type");
}
Expand All @@ -471,7 +488,7 @@ private static VertexIDType getUserVertexIDType(Object vertexId) {

public final boolean isUserVertexId(Object vertexId) {
if (vertexId instanceof Number) {
return (VertexIDType.NormalVertex.is(vertexId) || VertexIDType.PartitionedVertex.is(vertexId) || VertexIDType.UnmodifiableVertex.is(vertexId))
return (VertexIDType.NormalVertex.is(vertexId) || VertexIDType.PartitionedVertex.is(vertexId) || VertexIDType.UnmodifiableVertex.is(vertexId) || VertexIDType.ProxyVertex.is(vertexId))
&& ((((Number) vertexId).longValue() >>> (partitionBits+USERVERTEX_PADDING_BITWIDTH))>0);
} else {
return true;
Expand Down Expand Up @@ -639,6 +656,10 @@ public boolean isPartitionedVertex(Object id) {
return isUserVertexId(id) && VertexIDType.PartitionedVertex.is(id);
}

public boolean isProxyVertex(Object id) {
return isUserVertexId(id) && VertexIDType.ProxyVertex.is(id);
}

public long getRelationCountBound() {
return relationCountBound;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ public boolean equals(Object other) {
public String toString() {
return "type["+ relationType.toString()+"]";
}

public RelationType getRelationType() {
return this.relationType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.janusgraph.core.BaseVertexQuery;
import org.janusgraph.core.JanusGraphEdge;
import org.janusgraph.core.JanusGraphRelation;
Expand Down Expand Up @@ -53,8 +54,10 @@
import org.janusgraph.graphdb.relations.RelationComparator;
import org.janusgraph.graphdb.relations.StandardVertexProperty;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.system.BaseLabel;
import org.janusgraph.graphdb.types.system.ImplicitKey;
import org.janusgraph.graphdb.types.system.SystemRelationType;
import org.janusgraph.graphdb.util.CloseableAbstractIterator;
import org.janusgraph.graphdb.vertices.CacheVertex;
import org.janusgraph.util.datastructures.Interval;
import org.janusgraph.util.datastructures.PointInterval;
Expand All @@ -69,6 +72,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -340,7 +344,8 @@ protected Iterable<JanusGraphRelation> executeRelations(InternalVertex vertex, B
return ResultSetIterator.wrap(merge,baseQuery.getLimit());
} else vertex = tx.getCanonicalVertex(vertex);
}
return executeIndividualRelations(vertex,baseQuery);
Iterator iter = new EdgeProxyProcessor(vertex, baseQuery);
return () -> iter;
}

/**
Expand Down Expand Up @@ -374,7 +379,7 @@ protected Iterable<JanusGraphRelation> loadRelationsFromCache(InternalVertex ver
merge = ResultMergeSortIterator.mergeSort(merge, iterable, relationComparator, false);
}
}

return ResultSetIterator.wrap(merge, baseQuery.getLimit());
}
} else {
Expand Down Expand Up @@ -413,7 +418,8 @@ public Iterable<JanusGraphVertex> executeVertices(InternalVertex vertex, BaseVer
return ResultSetIterator.wrap(merge,baseQuery.getLimit());
} else vertex = tx.getCanonicalVertex(vertex);
}
return executeIndividualVertices(vertex,baseQuery);
Iterator iter = new VertexProxyProcessor(vertex, baseQuery);
return () -> iter;
}

private Iterable<JanusGraphVertex> executeIndividualVertices(InternalVertex vertex,
Expand Down Expand Up @@ -859,5 +865,79 @@ private int computeLimit(int remainingConditions, int baseLimit) {
return baseLimit;
}

// TODO: we should not use CloseableAbstractIterator here since it is unmodifiable
public class VertexProxyProcessor extends CloseableAbstractIterator<JanusGraphVertex> {

private List<Long> proxyIds = new ArrayList<>();
private int offset;
private Iterator<JanusGraphVertex> iter;
private BaseVertexCentricQuery baseQuery;

public VertexProxyProcessor(InternalVertex vertex, BaseVertexCentricQuery baseQuery) {
Condition condition = baseQuery.getCondition();
if (condition instanceof And && ((RelationCategory) ((And) condition).get(0)).name().equals(RelationCategory.EDGE.name())
|| condition instanceof RelationTypeCondition && ((RelationTypeCondition) condition).getRelationType().isEdgeLabel()
&& !(((RelationTypeCondition) condition).getRelationType() instanceof BaseLabel)) {
Iterator<VertexProperty<Long>> proxyIter = vertex.properties("proxies");
while (proxyIter.hasNext()) {
proxyIds.add(proxyIter.next().value());
}
}
iter = executeIndividualVertices(vertex, baseQuery).iterator();
this.baseQuery = baseQuery;
}

@Override
protected JanusGraphVertex computeNext() {
if (iter.hasNext()) {
JanusGraphVertex v = iter.next();
if (tx.getIdInspector().isProxyVertex(v.id())) {
long canonicalId = (long) v.property("canonicalId").value();
JanusGraphVertex canonicalV = tx.getVertex(canonicalId);
return canonicalV;
}
return v;
} else if (offset < proxyIds.size()) {
InternalVertex proxyV = (InternalVertex) tx.getVertex(proxyIds.get(offset++));
iter = executeIndividualVertices(proxyV, baseQuery).iterator();
return computeNext();
}
return endOfData();
}
}

public class EdgeProxyProcessor extends CloseableAbstractIterator<JanusGraphRelation> {

private List<Long> proxyIds = new ArrayList<>();
private int offset;
private Iterator<JanusGraphRelation> iter;
private BaseVertexCentricQuery baseQuery;

public EdgeProxyProcessor(InternalVertex vertex, BaseVertexCentricQuery baseQuery) {
Condition condition = baseQuery.getCondition();
if (condition instanceof And && ((RelationCategory) ((And) condition).get(0)).name() == RelationCategory.EDGE.name()
|| condition instanceof RelationTypeCondition && ((RelationTypeCondition) condition).getRelationType().isEdgeLabel()
&& !(((RelationTypeCondition) condition).getRelationType() instanceof BaseLabel)) {
Iterator<VertexProperty<Long>> proxyIter = vertex.properties("proxies");
while (proxyIter.hasNext()) {
proxyIds.add(proxyIter.next().value());
}
}
iter = executeIndividualRelations(vertex, baseQuery).iterator();
this.baseQuery = baseQuery;
}

@Override
protected JanusGraphRelation computeNext() {
if (iter.hasNext()) {
return iter.next();
} else if (offset < proxyIds.size()) {
InternalVertex proxyV = (InternalVertex) tx.getVertex(proxyIds.get(offset++));
iter = executeIndividualRelations(proxyV, baseQuery).iterator();
return computeNext();
}
return endOfData();
}
}

}
Loading
Loading