Skip to content
Permalink
Browse files

[#1220] Retain GraphHead in operators not changing the structure (#1235)

fixes #1220
  • Loading branch information...
timo95 authored and galpha committed May 7, 2019
1 parent 2e0f574 commit 76ff80cf7cfdd023b8759345ce777322b51c4307
Showing with 159 additions and 130 deletions.
  1. +7 −3 ...p-data-integration/src/main/java/org/gradoop/dataintegration/transformation/ConnectNeighbors.java
  2. +7 −1 gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/EdgeToVertex.java
  3. +5 −2 ...gration/src/main/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighbor.java
  4. +7 −2 gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/VertexToEdge.java
  5. +34 −19 ...ta-integration/src/test/java/org/gradoop/dataintegration/transformation/ConnectNeighborsTest.java
  6. +4 −4 ...p-data-integration/src/test/java/org/gradoop/dataintegration/transformation/EdgeToVertexTest.java
  7. +8 −7 ...ion/src/test/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighborTest.java
  8. +3 −3 ...p-data-integration/src/test/java/org/gradoop/dataintegration/transformation/VertexToEdgeTest.java
  9. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/hits/HITS.java
  10. +1 −1 ...oop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/labelpropagation/LabelPropagation.java
  11. +3 −4 ...ink/src/main/java/org/gradoop/flink/algorithms/gelly/shortestpaths/SingleSourceShortestPaths.java
  12. +2 −2 ...p-flink/src/main/java/org/gradoop/flink/algorithms/gelly/vertexdegrees/DistinctVertexDegrees.java
  13. +5 −10 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/exclusion/Exclusion.java
  14. +12 −16 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/exclusion/ReduceExclusion.java
  15. +10 −4 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/subgraph/Subgraph.java
  16. +3 −3 gradoop-flink/src/test/java/org/gradoop/flink/algorithms/gelly/hits/HITSTest.java
  17. +3 −3 ...rc/test/java/org/gradoop/flink/algorithms/gelly/labelpropagation/GradoopLabelPropagationTest.java
  18. +7 −7 ...src/test/java/org/gradoop/flink/algorithms/gelly/shortestpaths/SingleSourceShortestPathsTest.java
  19. +3 −3 ...ink/src/test/java/org/gradoop/flink/algorithms/gelly/vertexdegrees/DistinctVertexDegreesTest.java
  20. +20 −21 gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/exclusion/ExclusionTest.java
  21. +14 −14 gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/subgraph/SubgraphTest.java
@@ -24,7 +24,9 @@
import org.gradoop.dataintegration.transformation.impl.NeighborhoodVertex;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.LabelIsIn;
import org.gradoop.flink.model.impl.functions.graphcontainment.AddToGraphBroadcast;

import java.util.List;
import java.util.Objects;
@@ -88,10 +90,12 @@ public LogicalGraph execute(LogicalGraph graph) {

// calculate the new edges and add them to the original graph
DataSet<Edge> newEdges = neighborhood.flatMap(
new CreateCartesianNeighborhoodEdges<>(graph.getConfig().getEdgeFactory(), newEdgeLabel));
new CreateCartesianNeighborhoodEdges<>(graph.getFactory().getEdgeFactory(), newEdgeLabel))
.map(new AddToGraphBroadcast<>())
.withBroadcastSet(graph.getGraphHead().map(new Id<>()), AddToGraphBroadcast.GRAPH_ID);

return graph.getConfig().getLogicalGraphFactory()
.fromDataSets(graph.getVertices(), graph.getEdges().union(newEdges));
return graph.getFactory()
.fromDataSets(graph.getGraphHead(), graph.getVertices(), graph.getEdges().union(newEdges));
}

@Override
@@ -24,6 +24,8 @@
import org.gradoop.dataintegration.transformation.functions.CreateVertexFromEdges;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.graphcontainment.AddToGraphBroadcast;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of3;

import java.util.Objects;
@@ -88,15 +90,19 @@ public LogicalGraph execute(LogicalGraph graph) {

DataSet<Vertex> newVertices = newVerticesAndOriginIds
.map(new Value0Of3<>())
.map(new AddToGraphBroadcast<>())
.withBroadcastSet(graph.getGraphHead().map(new Id<>()), AddToGraphBroadcast.GRAPH_ID)
.union(graph.getVertices());

// create edges to the newly created vertex
DataSet<Edge> newEdges = newVerticesAndOriginIds
.flatMap(new CreateEdgesFromTriple<>(graph.getFactory().getEdgeFactory(),
edgeLabelSourceToNew, edgeLabelNewToTarget))
.map(new AddToGraphBroadcast<>())
.withBroadcastSet(graph.getGraphHead().map(new Id<>()), AddToGraphBroadcast.GRAPH_ID)
.union(graph.getEdges());

return graph.getFactory().fromDataSets(newVertices, newEdges);
return graph.getFactory().fromDataSets(graph.getGraphHead(), newVertices, newEdges);
}

@Override
@@ -26,6 +26,7 @@
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.LabelIsIn;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.graphcontainment.AddToGraphBroadcast;

import java.util.Objects;
import java.util.Set;
@@ -125,9 +126,11 @@ public LogicalGraph execute(LogicalGraph graph) {
// Update target vertices.
.coGroup(graph.getVertices())
.where(0).equalTo(new Id<>())
.with(new AccumulatePropagatedValues<>(targetVertexPropertyKey, targetVertexLabels));
.with(new AccumulatePropagatedValues<>(targetVertexPropertyKey, targetVertexLabels))
.map(new AddToGraphBroadcast<>())
.withBroadcastSet(graph.getGraphHead().map(new Id<>()), AddToGraphBroadcast.GRAPH_ID);

return graph.getFactory().fromDataSets(newVertices, graph.getEdges());
return graph.getFactory().fromDataSets(graph.getGraphHead(), newVertices, graph.getEdges());
}

@Override
@@ -24,6 +24,8 @@
import org.gradoop.dataintegration.transformation.impl.NeighborhoodVertex;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.graphcontainment.AddToGraphBroadcast;
import org.gradoop.flink.model.impl.operators.neighborhood.keyselector.IdInTuple;

import java.util.List;
@@ -85,9 +87,12 @@ public LogicalGraph execute(LogicalGraph graph) {
.where(new IdInTuple<>(0))
.equalTo(new IdInTuple<>(0))
.with(new EdgesFromLocalTransitiveClosure<>(newEdgeLabel,
graph.getFactory().getEdgeFactory()));
graph.getFactory().getEdgeFactory()))
.map(new AddToGraphBroadcast<>())
.withBroadcastSet(graph.getGraphHead().map(new Id<>()), AddToGraphBroadcast.GRAPH_ID);

return graph.getFactory().fromDataSets(graph.getVertices(), graph.getEdges().union(newEdges));
return graph.getFactory()
.fromDataSets(graph.getGraphHead(), graph.getVertices(), graph.getEdges().union(newEdges));
}

@Override
@@ -30,17 +30,34 @@
/**
* The loader used to get the test graphs.
*/
private FlinkAsciiGraphLoader loader = getLoaderFromString("input[" +
"(i:V)-->(c:Center)-->(o:V)" +
"(i2:V)-->(c)-->(o2:V)" +
"(:other)-->(c)-->(:other)" +
"] expectedIncoming [" +
"(i)-[:neighbor]->(i2)" +
"(i2)-[:neighbor]->(i)" +
"] expectedOutgoing [" +
"(o)-[:neighbor]->(o2)" +
"(o2)-[:neighbor]->(o)" +
"]");
private FlinkAsciiGraphLoader loader = getLoaderFromString("input:test [" +
"(i:V)-->(c:Center)-->(o:V)" +
"(i2:V)-->(c)-->(o2:V)" +
"(:other)-->(c)-->(:other)" +
"]" +
"expectedIncoming:test [" +
"(i)-[:neighbor]->(i2)" +
"(i2)-[:neighbor]->(i)" +
"(i:V)-->(c:Center)-->(o:V)" +
"(i2:V)-->(c)-->(o2:V)" +
"(:other)-->(c)-->(:other)" +
"]" +
"expectedOutgoing:test [" +
"(o)-[:neighbor]->(o2)" +
"(o2)-[:neighbor]->(o)" +
"(i:V)-->(c:Center)-->(o:V)" +
"(i2:V)-->(c)-->(o2:V)" +
"(:other)-->(c)-->(:other)" +
"]" +
"expectedUndirected:test [" +
"(i)-[:neighbor]->(i2)" +
"(i2)-[:neighbor]->(i)" +
"(o)-[:neighbor]->(o2)" +
"(o2)-[:neighbor]->(o)" +
"(i:V)-->(c:Center)-->(o:V)" +
"(i2:V)-->(c)-->(o2:V)" +
"(:other)-->(c)-->(:other)" +
"]");

/**
* Test using incoming edges.
@@ -52,9 +69,9 @@ public void testIncoming() throws Exception {
LogicalGraph input = loader.getLogicalGraphByVariable("input");
UnaryGraphToGraphOperator operator =
new ConnectNeighbors("Center", Neighborhood.EdgeDirection.INCOMING, "V", "neighbor");
LogicalGraph expected = loader.getLogicalGraphByVariable("expectedIncoming").combine(input);
LogicalGraph expected = loader.getLogicalGraphByVariable("expectedIncoming");

collectAndAssertTrue(expected.equalsByElementData(input.callForGraph(operator)));
collectAndAssertTrue(expected.equalsByData(input.callForGraph(operator)));
}

/**
@@ -67,9 +84,9 @@ public void testOutgoing() throws Exception {
LogicalGraph input = loader.getLogicalGraphByVariable("input");
UnaryGraphToGraphOperator operator =
new ConnectNeighbors("Center", Neighborhood.EdgeDirection.OUTGOING, "V", "neighbor");
LogicalGraph expected = loader.getLogicalGraphByVariable("expectedOutgoing").combine(input);
LogicalGraph expected = loader.getLogicalGraphByVariable("expectedOutgoing");

collectAndAssertTrue(expected.equalsByElementData(input.callForGraph(operator)));
collectAndAssertTrue(expected.equalsByData(input.callForGraph(operator)));
}

/**
@@ -82,10 +99,8 @@ public void testUndirected() throws Exception {
LogicalGraph input = loader.getLogicalGraphByVariable("input");
UnaryGraphToGraphOperator operator =
new ConnectNeighbors("Center", Neighborhood.EdgeDirection.UNDIRECTED, "V", "neighbor");
LogicalGraph expected = loader.getLogicalGraphByVariable("expectedOutgoing")
.combine(loader.getLogicalGraphByVariable("expectedIncoming"))
.combine(input);
LogicalGraph expected = loader.getLogicalGraphByVariable("expectedUndirected");

collectAndAssertTrue(expected.equalsByElementData(input.callForGraph(operator)));
collectAndAssertTrue(expected.equalsByData(input.callForGraph(operator)));
}
}
@@ -31,12 +31,12 @@
/**
* The loader with the graphs used in the tests.
*/
private final FlinkAsciiGraphLoader loader = getLoaderFromString("input[" +
private final FlinkAsciiGraphLoader loader = getLoaderFromString("input:test[" +
"(a:VertexA)-[e:edgeToTransform {testProp: 1, testProp2: \"\"}]->(b:VertexB)" +
"(a)-[e2:edgeToTransform]->(b)" +
"(a)-[en:anotherEdge]->(b)" +
"]" +
"expected [" +
"expected:test [" +
"(a)-[e]->(b)" +
"(a)-[e2]->(b)" +
"(a)-[en]->(b)" +
@@ -56,7 +56,7 @@ public void testWithVertexCreation() throws Exception {
LogicalGraph result = loader.getLogicalGraphByVariable("input").callForGraph(operator);
LogicalGraph expected = loader.getLogicalGraphByVariable("expected");

collectAndAssertTrue(result.equalsByElementData(expected));
collectAndAssertTrue(result.equalsByData(expected));
}

/**
@@ -72,7 +72,7 @@ public void testWithoutVertexCreation() throws Exception {
LogicalGraph result = loader.getLogicalGraphByVariable("input").callForGraph(operator);
LogicalGraph expected = loader.getLogicalGraphByVariable("input");

collectAndAssertTrue(result.equalsByElementData(expected));
collectAndAssertTrue(result.equalsByData(expected));
}

/**
@@ -47,11 +47,12 @@
* The loader with the graphs used in this test.
*/
private FlinkAsciiGraphLoader loader = getLoaderFromString(
"input1[" + "(s1:Source {p1: 1, p2: 1.1d})-[e1:edge1]->(t:Target {t: 0})" +
"input1:test[" + "(s1:Source {p1: 1, p2: 1.1d})-[e1:edge1]->(t:Target {t: 0})" +
"(s2:Source {p1: \"\"})-[e2:edge2]->(t)" +
"(s1)-[e12:edge1]->(t2:Target2 {t: 0})" +
"(s2)-[e22:edge2]->(t2)" +
"] input2 [" +
"]" +
"input2:test [" +
"(v:Vertex {t: 1})-->(v)" +
"]");

@@ -76,7 +77,7 @@ public void testPropagateDirected() throws Exception {
LogicalGraph result = input.callForGraph(operator)
.callForGraph(new PropertyTransformation<>("t", pv -> pv,
PropagatePropertyToNeighborTest::orderListProperty, pv -> pv));
collectAndAssertTrue(expected.equalsByElementData(result));
collectAndAssertTrue(expected.equalsByData(result));
}

/**
@@ -97,7 +98,7 @@ public void testPropagateAlongCertainEdges() throws Exception {
return v;
});
LogicalGraph result = input.callForGraph(operator);
collectAndAssertTrue(expected.equalsByElementData(result));
collectAndAssertTrue(expected.equalsByData(result));
}

/**
@@ -121,7 +122,7 @@ public void testPropagateToCertainVertices() throws Exception {
LogicalGraph result = input.callForGraph(operator)
.callForGraph(new PropertyTransformation<>("t", pv -> pv,
PropagatePropertyToNeighborTest::orderListProperty, pv -> pv));
collectAndAssertTrue(expected.equalsByElementData(result));
collectAndAssertTrue(expected.equalsByData(result));
}

/**
@@ -143,7 +144,7 @@ public void testPropagateToCertainVerticesAlongCertainEdges() throws Exception {
return v;
});
LogicalGraph result = input.callForGraph(operator);
collectAndAssertTrue(expected.equalsByElementData(result));
collectAndAssertTrue(expected.equalsByData(result));
}

/**
@@ -161,7 +162,7 @@ public void testPropagateInLoops() throws Exception {
return v;
});
LogicalGraph result = input.callForGraph(operator);
collectAndAssertTrue(expected.equalsByElementData(result));
collectAndAssertTrue(expected.equalsByData(result));
}

/**
@@ -32,14 +32,14 @@
*/
@Test
public void testWithEdgeCreation() throws Exception {
FlinkAsciiGraphLoader loader = getLoaderFromString("input[" +
FlinkAsciiGraphLoader loader = getLoaderFromString("input:test[" +
"(v0:Blue {a : 3})" +
"(v1:Green {a : 2})" +
"(v2:Blue {a : 4})" +
"(v0)-[{b : 2}]->(v1)" +
"(v1)-[{b : 4}]->(v2)" +
"]" +
"expected[" +
"expected:test[" +
"(v00:Blue {a : 3})" +
"(v01:Green {a : 2})" +
"(v02:Blue {a : 4})" +
@@ -54,6 +54,6 @@ public void testWithEdgeCreation() throws Exception {
VertexToEdge transformation = new VertexToEdge("Green", "foo");
LogicalGraph transformed = input.callForGraph(transformation);

collectAndAssertTrue(transformed.equalsByElementData(expected));
collectAndAssertTrue(transformed.equalsByData(expected));
}
}
@@ -111,7 +111,7 @@ public LogicalGraph executeInGelly(Graph<GradoopId, NullValue, NullValue> graph)
.with(new HITSToAttributes(authorityPropertyKey, hubPropertyKey));

return currentGraph.getConfig().getLogicalGraphFactory()
.fromDataSets(newVertices, currentGraph.getEdges());
.fromDataSets(currentGraph.getGraphHead(), newVertices, currentGraph.getEdges());
}

}
@@ -77,7 +77,7 @@ public LogicalGraph executeInGelly(Graph<GradoopId, PropertyValue, NullValue> gr

// return labeled graph
return currentGraph.getConfig().getLogicalGraphFactory()
.fromDataSets(labeledVertices, currentGraph.getEdges());
.fromDataSets(currentGraph.getGraphHead(), labeledVertices, currentGraph.getEdges());
}

/**
@@ -73,8 +73,7 @@ public SingleSourceShortestPaths(GradoopId srcVertexId, String propertyKeyEdge,
}

@Override
public LogicalGraph executeInGelly(Graph<GradoopId, NullValue, Double> graph)
throws Exception {
public LogicalGraph executeInGelly(Graph<GradoopId, NullValue, Double> graph) {

DataSet<Vertex> newVertices = new org.apache.flink.graph.library.SingleSourceShortestPaths
<GradoopId, NullValue>(srcVertexId, iterations)
@@ -83,7 +82,7 @@ public LogicalGraph executeInGelly(Graph<GradoopId, NullValue, Double> graph)
.where(0)
.equalTo(new Id<>())
.with(new SingleSourceShortestPathsAttribute(propertyKeyVertex));
return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(newVertices,
currentGraph.getEdges());
return currentGraph.getFactory()
.fromDataSets(currentGraph.getGraphHead(), newVertices, currentGraph.getEdges());
}
}
@@ -96,7 +96,7 @@ public LogicalGraph executeInGelly(Graph<GradoopId, NullValue, NullValue> graph)
.where(0).equalTo(new Id<>())
.with(new DistinctVertexDegreesToAttribute(propertyKey, propertyKeyIn, propertyKeyOut));

return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(newVertices,
currentGraph.getEdges());
return currentGraph.getFactory()
.fromDataSets(currentGraph.getGraphHead(), newVertices, currentGraph.getEdges());
}
}
@@ -45,18 +45,12 @@

/**
* Computes the exclusion graph from two logical graphs.
* Reduces the first input graph to contain only vertices and edges that don't exist in
* the second graph. The graph head of the first graph is retained. Vertex and edge equality
* is based on their respective identifiers.
*/
public class Exclusion implements BinaryGraphToGraphOperator {

/**
* Creates a new logical graph containing only vertices and edges that exist
* in the first input graph but not in the second input graph. Vertex and edge
* equality is based on their respective identifiers.
*
* @param firstGraph first input graph
* @param secondGraph second input graph
* @return first graph without elements from second graph
*/
@Override
public LogicalGraph execute(
LogicalGraph firstGraph, LogicalGraph secondGraph) {
@@ -76,6 +70,7 @@ public LogicalGraph execute(
.equalTo(new Id<>())
.with(new LeftSide<>());

return firstGraph.getConfig().getLogicalGraphFactory().fromDataSets(newVertexSet, newEdgeSet);
return firstGraph.getFactory()
.fromDataSets(firstGraph.getGraphHead(), newVertexSet, newEdgeSet);
}
}
Oops, something went wrong.

0 comments on commit 76ff80c

Please sign in to comment.
You can’t perform that action at this time.