Skip to content

Commit

Permalink
Send vertex label to user log processor for all mutations
Browse files Browse the repository at this point in the history
Fixes #3263

Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
  • Loading branch information
porunov committed Nov 4, 2022
1 parent ae07bbd commit 48cd7ae
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 24 deletions.
18 changes: 18 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,24 @@ A new optimization has been added to compute aggregations (min, max, sum and avg
If the index backend is Elasticsearch, a `double` value is used to hold the result. As a result, aggregations on long numbers greater than 2^53 are approximate.
In this case, if the accurate result is essential, the optimization can be disabled by removing the strategy `JanusGraphMixedIndexAggStrategy`: `g.traversal().withoutStrategies(JanusGraphMixedIndexAggStrategy.class)`.

##### Breaking change for transaction logs processing

[Transaction Log](advanced-topics/transaction-log.md) processing has a breaking change.
Previously for all mutated vertices a default vertex label `vertex` has been used in `ChangeState` events.
The new approach stores vertex labels in logs as well which increases storage size for +8 bytes per each relation and
makes all previously stored transaction logs incompatible with the new structure.
To pass this breaking change it's necessary to ensure that transaction logs stored via previous JanusGraph versions are
not processed via JanusGraph version >= 1.0.0.
One of the possible ways to ensure previous logs are not processed is to use `setStartTimeNow`
to process only newest logs. Another way could be to process all previous logs, store log identifier state and start
processing logs from the latest log ensuring that the latest logs are send by the JanusGraph version >= 1.0.0.
For example:
```java
LogProcessorBuilder logProcessorBuilder = logProcessorFramework
.addLogProcessor("myLogProcessorIdentifier")
.setStartTimeNow();
```

### Version 0.6.3 (Release Date: ???)

```xml tab='Maven'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,11 @@
import org.janusgraph.graphdb.types.CompositeIndexType;
import org.janusgraph.graphdb.types.StandardEdgeLabelMaker;
import org.janusgraph.graphdb.types.StandardPropertyKeyMaker;
import org.janusgraph.graphdb.types.VertexLabelVertex;
import org.janusgraph.graphdb.types.system.BaseVertexLabel;
import org.janusgraph.graphdb.types.system.ImplicitKey;
import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex;
import org.janusgraph.graphdb.vertices.AbstractVertexUtil;
import org.janusgraph.graphdb.vertices.CacheVertex;
import org.janusgraph.testutil.FeatureFlag;
import org.janusgraph.testutil.JanusGraphFeature;
Expand Down Expand Up @@ -4871,6 +4873,8 @@ public void simpleLogTest(final boolean withLogFailure) throws InterruptedExcept

PropertyKey weight = tx.makePropertyKey("weight").dataType(Float.class).cardinality(Cardinality.SINGLE).make();
EdgeLabel knows = tx.makeEdgeLabel("knows").make();
String testVertexLabel = "testVertex";
tx.makeVertexLabel(testVertexLabel).make();
JanusGraphVertex n1 = tx.addVertex("weight", 10.5);
tx.addProperties(knows, weight);
newTx();
Expand All @@ -4885,7 +4889,7 @@ public void simpleLogTest(final boolean withLogFailure) throws InterruptedExcept
final long v1id = getId(v1);
txTimes[1] = times.getTime();
tx2 = graph.buildTransaction().logIdentifier(userLogName).start();
JanusGraphVertex v2 = tx2.addVertex("weight", 222.2);
JanusGraphVertex v2 = (JanusGraphVertex) tx2.traversal().addV(testVertexLabel).property("weight", 222.2).next();
v2.addEdge("knows", getV(tx2, v1id));
tx2.commit();
final long v2id = getId(v2);
Expand Down Expand Up @@ -4998,8 +5002,8 @@ public void read(Message message) {
assertEquals(0, userLogMsgCounter.get());
} else {
assertEquals(4, userLogMsgCounter.get());
assertEquals(7, userChangeCounter.get(Change.ADDED).get());
assertEquals(4, userChangeCounter.get(Change.REMOVED).get());
assertEquals(8, userChangeCounter.get(Change.ADDED).get());
assertEquals(5, userChangeCounter.get(Change.REMOVED).get());
}

clopen(option(VERBOSE_TX_RECOVERY), true);
Expand Down Expand Up @@ -5057,6 +5061,8 @@ public void read(Message message) {
txNo = 2;
//v2 addition transaction
assertEquals(1, Iterables.size(changes.getVertices(Change.ADDED)));
String vertexLabel = changes.getVertices(Change.ADDED).iterator().next().label();
assertEquals(testVertexLabel, vertexLabel);
assertEquals(0, Iterables.size(changes.getVertices(Change.REMOVED)));
assertEquals(2, Iterables.size(changes.getVertices(Change.ANY)));
assertEquals(2, Iterables.size(changes.getRelations(Change.ADDED)));
Expand All @@ -5077,6 +5083,8 @@ public void read(Message message) {
//v2 deletion transaction
assertEquals(0, Iterables.size(changes.getVertices(Change.ADDED)));
assertEquals(1, Iterables.size(changes.getVertices(Change.REMOVED)));
String vertexLabel = changes.getVertices(Change.REMOVED).iterator().next().label();
assertEquals(testVertexLabel, vertexLabel);
assertEquals(2, Iterables.size(changes.getVertices(Change.ANY)));
assertEquals(0, Iterables.size(changes.getRelations(Change.ADDED)));
assertEquals(2, Iterables.size(changes.getRelations(Change.REMOVED)));
Expand Down Expand Up @@ -5105,12 +5113,12 @@ public void read(Message message) {
final JanusGraphVertex v = Iterables.getOnlyElement(changes.getVertices(Change.ANY));
assertEquals(v1id, getId(v));
JanusGraphEdge e1
= Iterables.getOnlyElement(changes.getEdges(v, Change.REMOVED, Direction.OUT, "knows"));
= Iterables.getOnlyElement(changes.getEdges(v, Change.REMOVED, OUT, "knows"));
assertFalse(e1.property("weight").isPresent());
assertEquals(v, e1.vertex(Direction.IN));
e1 = Iterables.getOnlyElement(changes.getEdges(v, Change.ADDED, Direction.OUT, "knows"));
assertEquals(v, e1.vertex(IN));
e1 = Iterables.getOnlyElement(changes.getEdges(v, Change.ADDED, OUT, "knows"));
assertEquals(44.4, e1.<Float>value("weight").doubleValue(), 0.01);
assertEquals(v, e1.vertex(Direction.IN));
assertEquals(v, e1.vertex(IN));
}

//See only current state of graph in transaction
Expand All @@ -5123,7 +5131,7 @@ public void read(Message message) {
// assertTrue(txNo + " - " + v2, v2 == null || v2.isRemoved());
}
assertEquals(111.1, v11.<Float>value("weight").doubleValue(), 0.01);
assertCount(1, v11.query().direction(Direction.OUT).edges());
assertCount(1, v11.query().direction(OUT).edges());

userLogCount.incrementAndGet();
}).build();
Expand Down Expand Up @@ -8242,6 +8250,68 @@ public void testIndexStoreForceInvalidationFromDBCache() throws InterruptedExcep
graph2.close();
}

@Test
public void testIndexWithIndexOnlyConstraintForceInvalidationFromDBCache() throws InterruptedException, ExecutionException {
if (features.hasLocking() || !features.isDistributed()) {
return;
}

String indexPropName = "indexedProp";
String vertexLabelName = "vertexLabelForIndexOnlyConstraint";
String indexName = "indexWithIndexOnlyConstraint";
PropertyKey indexedProp = mgmt.makePropertyKey(indexPropName).dataType(Integer.class).cardinality(Cardinality.SINGLE).make();
VertexLabel vertexLabel = mgmt.makeVertexLabel(vertexLabelName).make();
mgmt.buildIndex(indexName, Vertex.class).addKey(indexedProp).indexOnly(vertexLabel).buildCompositeIndex();
finishSchema();
ManagementSystem.awaitGraphIndexStatus(graph, indexName).call();
mgmt.updateIndex(mgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
finishSchema();

StandardJanusGraph graph1 = openInstanceWithDBCacheEnabled("testIndexWithIndexOnlyConstraintForceInvalidationFromDBCache1");
StandardJanusGraph graph2 = openInstanceWithDBCacheEnabled("testIndexWithIndexOnlyConstraintForceInvalidationFromDBCache2");

JanusGraphVertex v1 = graph1.addVertex(vertexLabelName);
v1.property(indexPropName, 1);

graph1.tx().commit();

// Cache data
JanusGraphTransaction tx2 = graph2.newTransaction();
assertEquals(1L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next());
tx2.commit();

// Remove vertex
JanusGraphTransaction tx1 = graph1.newTransaction();
tx1.traversal().V(v1.id()).drop().iterate();
tx1.commit();

// Check that cached indexed vertex in graph2 was not refreshed
tx2 = graph2.newTransaction();
assertEquals(1L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next());

// Try to invalidate data without vertex label
invalidateUpdatedVertexProperty(graph2, v1.longId(), indexPropName, 1, -1);
tx2.rollback();

tx2 = graph2.newTransaction();
// Check that invalidation didn't work
assertEquals(1L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next());
tx2.rollback();

tx2 = graph2.newTransaction();
// Invalidate data using vertex label
invalidateUpdatedVertexProperty(graph2, v1.longId(), indexPropName, 1, -1, vertexLabelName);
tx2.commit();

tx2 = graph2.newTransaction();
// Check that invalidation worked
assertEquals(0L, tx2.traversal().V().hasLabel(vertexLabelName).has(indexPropName, 1).count().next());
tx2.rollback();

graph1.close();
graph2.close();
}

@Test
public void testFullDBCacheInvalidation() throws InterruptedException, ExecutionException {
if (features.hasLocking() || !features.isDistributed()) {
Expand Down Expand Up @@ -8339,10 +8409,18 @@ public void testFullDBCacheInvalidation() throws InterruptedException, Execution
}

private void invalidateUpdatedVertexProperty(StandardJanusGraph graph, long vertexIdUpdated, String propertyNameUpdated, Object previousPropertyValue, Object newPropertyValue){
invalidateUpdatedVertexProperty(graph, vertexIdUpdated, propertyNameUpdated, previousPropertyValue, newPropertyValue, null);
}

private void invalidateUpdatedVertexProperty(StandardJanusGraph graph, long vertexIdUpdated, String propertyNameUpdated, Object previousPropertyValue, Object newPropertyValue, String vertexLabelName){
JanusGraphTransaction tx = graph.newTransaction();
JanusGraphManagement graphMgmt = graph.openManagement();
PropertyKey propertyKey = graphMgmt.getPropertyKey(propertyNameUpdated);
CacheVertex cacheVertex = new CacheVertex((StandardJanusGraphTx) tx, vertexIdUpdated, ElementLifeCycle.Loaded);
if(vertexLabelName != null){
VertexLabel vertexLabel = graphMgmt.getVertexLabel(vertexLabelName);
AbstractVertexUtil.cacheInternalVertexLabel(cacheVertex, (VertexLabelVertex) vertexLabel);
}
StandardVertexProperty propertyPreviousVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, previousPropertyValue, ElementLifeCycle.Removed);
StandardVertexProperty propertyNewVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, newPropertyValue, ElementLifeCycle.New);
IndexSerializer indexSerializer = graph.getIndexSerializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.janusgraph.core.VertexLabel;
import org.janusgraph.core.log.Change;
import org.janusgraph.diskstorage.ReadBuffer;
import org.janusgraph.diskstorage.StaticBuffer;
Expand All @@ -26,6 +27,7 @@
import org.janusgraph.graphdb.database.serialize.DataOutput;
import org.janusgraph.graphdb.database.serialize.Serializer;
import org.janusgraph.graphdb.internal.InternalRelation;
import org.janusgraph.graphdb.internal.InternalVertex;
import org.janusgraph.graphdb.log.StandardTransactionId;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.transaction.TransactionConfiguration;
Expand Down Expand Up @@ -85,7 +87,10 @@ public StaticBuffer serializeModifications(Serializer serializer, LogTxStatus st
private static void logRelations(DataOutput out, final Collection<InternalRelation> relations, StandardJanusGraphTx tx) {
VariableLong.writePositive(out,relations.size());
for (InternalRelation rel : relations) {
VariableLong.writePositive(out,rel.getVertex(0).longId());
InternalVertex vertex = rel.getVertex(0);
VariableLong.writePositive(out,vertex.longId());
VertexLabel vertexLabel = vertex.vertexLabel();
VariableLong.writePositive(out, vertexLabel.hasId() ? vertexLabel.longId() : 0L);
org.janusgraph.diskstorage.Entry entry = tx.getEdgeSerializer().writeRelation(rel, 0, tx);
BufferUtil.writeEntry(out,entry);
}
Expand Down Expand Up @@ -238,9 +243,10 @@ private static List<Modification> readModifications(Change state, ReadBuffer in,
long size = VariableLong.readPositive(in);
List<Modification> mods = new ArrayList<>((int) size);
for (int i = 0; i < size; i++) {
long vid = VariableLong.readPositive(in);
long vertexId = VariableLong.readPositive(in);
long labelId = VariableLong.readPositive(in);
org.janusgraph.diskstorage.Entry entry = BufferUtil.readEntry(in,serializer);
mods.add(new Modification(state,vid,entry));
mods.add(new Modification(state,vertexId,labelId,entry));
}
return mods;
}
Expand Down Expand Up @@ -269,11 +275,13 @@ public static class Modification {

public final Change state;
public final long outVertexId;
public final long outVertexLabelId;
public final org.janusgraph.diskstorage.Entry relationEntry;

private Modification(Change state, long outVertexId, org.janusgraph.diskstorage.Entry relationEntry) {
private Modification(Change state, long outVertexId, long outVertexLabelId, org.janusgraph.diskstorage.Entry relationEntry) {
this.state = state;
this.outVertexId = outVertexId;
this.outVertexLabelId = outVertexLabelId;
this.relationEntry = relationEntry;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.janusgraph.core.EdgeLabel;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.VertexLabel;
import org.janusgraph.core.log.Change;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.graphdb.database.log.TransactionLogHeader;
import org.janusgraph.graphdb.idmanagement.IDManager;
import org.janusgraph.graphdb.internal.ElementLifeCycle;
import org.janusgraph.graphdb.internal.InternalRelation;
import org.janusgraph.graphdb.internal.InternalRelationType;
Expand All @@ -31,19 +33,30 @@
import org.janusgraph.graphdb.relations.StandardEdge;
import org.janusgraph.graphdb.relations.StandardVertexProperty;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.VertexLabelVertex;
import org.janusgraph.graphdb.vertices.AbstractVertex;
import org.janusgraph.graphdb.vertices.AbstractVertexUtil;

/**
* @author Matthias Broecheler (me@matthiasb.com)
*/
public class ModificationDeserializer {


public static InternalRelation parseRelation(TransactionLogHeader.Modification modification, StandardJanusGraphTx tx) {
Change state = modification.state;
assert state.isProper();
long outVertexId = modification.outVertexId;
Entry relEntry = modification.relationEntry;
InternalVertex outVertex = tx.getInternalVertex(outVertexId);
if(outVertex instanceof AbstractVertex){
long outVertexLabelId = modification.outVertexLabelId;
if(IDManager.VertexIDType.VertexLabel.is(outVertexLabelId)){
VertexLabel vertexLabel = tx.getExistingVertexLabel(outVertexLabelId);
if(vertexLabel instanceof VertexLabelVertex){
AbstractVertexUtil.cacheInternalVertexLabel((AbstractVertex) outVertex, (VertexLabelVertex) vertexLabel);
}
}
}
//Special relation parsing, compare to {@link RelationConstructor}
RelationCache relCache = tx.getEdgeSerializer().readRelation(relEntry, false, tx);
assert relCache.direction == Direction.OUT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.core.JanusGraphEdge;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.JanusGraphVertexProperty;
Expand All @@ -34,6 +33,7 @@
import org.janusgraph.graphdb.types.TypeDefinitionCategory;
import org.janusgraph.graphdb.types.TypeDefinitionDescription;
import org.janusgraph.graphdb.types.TypeDefinitionMap;
import org.janusgraph.graphdb.types.VertexLabelVertex;
import org.janusgraph.graphdb.types.indextype.CompositeIndexTypeWrapper;
import org.janusgraph.graphdb.types.indextype.MixedIndexTypeWrapper;
import org.janusgraph.graphdb.types.system.BaseKey;
Expand Down Expand Up @@ -73,7 +73,7 @@ public String name() {
}

@Override
protected Vertex getVertexLabelInternal() {
protected VertexLabelVertex getVertexLabelInternal() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public abstract class AbstractVertex extends AbstractElement implements Internal

private final StandardJanusGraphTx tx;

private volatile VertexLabelVertex internalCachedVertexLabel;

protected AbstractVertex(StandardJanusGraphTx tx, long id) {
super(id);
Expand Down Expand Up @@ -129,15 +130,21 @@ public String label() {
return vertexLabel().name();
}

protected Vertex getVertexLabelInternal() {
return Iterables.getOnlyElement(tx().query(this).noPartitionRestriction().type(BaseLabel.VertexLabelEdge).direction(Direction.OUT).vertices(),null);
protected VertexLabelVertex getVertexLabelInternal() {
// Only if we `internalCachedVertexLabel` is not null it means that the vertex is guaranteed to have that label
// because `label` is immutable. In all other cases we need to try to fetch the vertex label even if it's a
// repeated operation.
if(internalCachedVertexLabel == null){
internalCachedVertexLabel = (VertexLabelVertex) Iterables.getOnlyElement(tx().query(this)
.noPartitionRestriction().type(BaseLabel.VertexLabelEdge).direction(Direction.OUT).vertices(),null);
}
return internalCachedVertexLabel;
}

@Override
public VertexLabel vertexLabel() {
Vertex label = getVertexLabelInternal();
if (label==null) return BaseVertexLabel.DEFAULT_VERTEXLABEL;
else return (VertexLabelVertex)label;
VertexLabelVertex label = getVertexLabelInternal();
return label == null ? BaseVertexLabel.DEFAULT_VERTEXLABEL : label;
}

@Override
Expand Down Expand Up @@ -200,10 +207,9 @@ public <V> Iterator<VertexProperty<V>> properties(String... keys) {

public Iterator<Vertex> vertices(final Direction direction, final String... edgeLabels) {
return (Iterator)query().direction(direction).labels(edgeLabels).vertices().iterator();

}




void cacheInternalVertexLabel(VertexLabelVertex internalCachedVertexLabel){
this.internalCachedVertexLabel = internalCachedVertexLabel;
}
}
Loading

0 comments on commit 48cd7ae

Please sign in to comment.