From 632f4da55a893fb29330dbedef29681327babe46 Mon Sep 17 00:00:00 2001 From: Martin Junghanns Date: Sat, 2 Jun 2018 11:22:51 +0200 Subject: [PATCH] Add project strategy to edge induced subgraph * also adds validation strategy for regular subgraph * fixes #796 --- .../flink/model/api/epgm/LogicalGraph.java | 10 +- .../model/api/epgm/LogicalGraphOperators.java | 25 ++- .../impl/operators/subgraph/Subgraph.java | 197 +++++++++++++++--- .../impl/operators/subgraph/SubgraphTest.java | 46 ++++ 4 files changed, 240 insertions(+), 38 deletions(-) diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/api/epgm/LogicalGraph.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/api/epgm/LogicalGraph.java index 95da87bbc1b1..5da83219b6c5 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/api/epgm/LogicalGraph.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/api/epgm/LogicalGraph.java @@ -306,7 +306,7 @@ public LogicalGraph transformEdges( public LogicalGraph vertexInducedSubgraph( FilterFunction vertexFilterFunction) { Objects.requireNonNull(vertexFilterFunction); - return callForGraph(new Subgraph(vertexFilterFunction, null)); + return callForGraph(new Subgraph(vertexFilterFunction, null, Subgraph.Strategy.VERTEX_INDUCED)); } /** @@ -316,7 +316,7 @@ public LogicalGraph vertexInducedSubgraph( public LogicalGraph edgeInducedSubgraph( FilterFunction edgeFilterFunction) { Objects.requireNonNull(edgeFilterFunction); - return callForGraph(new Subgraph(null, edgeFilterFunction)); + return callForGraph(new Subgraph(null, edgeFilterFunction, Subgraph.Strategy.EDGE_INDUCED)); } /** @@ -324,11 +324,9 @@ public LogicalGraph edgeInducedSubgraph( */ @Override public LogicalGraph subgraph(FilterFunction vertexFilterFunction, - FilterFunction edgeFilterFunction) { - Objects.requireNonNull(vertexFilterFunction); - Objects.requireNonNull(edgeFilterFunction); + FilterFunction edgeFilterFunction, Subgraph.Strategy strategy) { return callForGraph( - new Subgraph(vertexFilterFunction, edgeFilterFunction)); + new Subgraph(vertexFilterFunction, edgeFilterFunction, strategy)); } /** diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/api/epgm/LogicalGraphOperators.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/api/epgm/LogicalGraphOperators.java index f48b0f2ab18e..dbfbf919f56d 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/api/epgm/LogicalGraphOperators.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/api/epgm/LogicalGraphOperators.java @@ -36,8 +36,10 @@ import org.gradoop.flink.model.impl.operators.matching.common.statistics.GraphStatistics; import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser.TraverserStrategy; import org.gradoop.flink.model.impl.operators.neighborhood.Neighborhood; +import org.gradoop.flink.model.impl.operators.subgraph.Subgraph; import java.util.List; +import java.util.Objects; /** * Defines the operators that are available on a {@link LogicalGraph}. @@ -275,8 +277,29 @@ LogicalGraph transformGraphHead( * @return logical graph which fulfils the given predicates and is a subgraph * of that graph */ + default LogicalGraph subgraph(FilterFunction vertexFilterFunction, + FilterFunction edgeFilterFunction) { + Objects.requireNonNull(vertexFilterFunction); + Objects.requireNonNull(edgeFilterFunction); + return subgraph(vertexFilterFunction, edgeFilterFunction, Subgraph.Strategy.BOTH); + } + + /** + * Returns a subgraph of the logical graph which contains only those vertices + * and edges that fulfil the given vertex and edge filter function + * respectively. + * + * Note, that the operator does not verify the consistency of the resulting + * graph. Use {#toGellyGraph().subgraph()} for that behaviour. + * + * @param vertexFilterFunction vertex filter function + * @param edgeFilterFunction edge filter function + * @param strategy execution strategy for the operator + * @return logical graph which fulfils the given predicates and is a subgraph + * of that graph + */ LogicalGraph subgraph(FilterFunction vertexFilterFunction, - FilterFunction edgeFilterFunction); + FilterFunction edgeFilterFunction, Subgraph.Strategy strategy); /** * Applies the given aggregate function to the logical graph and stores the diff --git a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/subgraph/Subgraph.java b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/subgraph/Subgraph.java index 219b12bc8b19..ccc7ad3d9d7f 100644 --- a/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/subgraph/Subgraph.java +++ b/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/subgraph/Subgraph.java @@ -17,6 +17,9 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; import org.gradoop.common.model.impl.pojo.Edge; import org.gradoop.common.model.impl.pojo.Vertex; import org.gradoop.flink.model.api.epgm.LogicalGraph; @@ -24,18 +27,27 @@ import org.gradoop.flink.model.impl.functions.epgm.Id; import org.gradoop.flink.model.impl.functions.epgm.SourceId; import org.gradoop.flink.model.impl.functions.epgm.TargetId; +import org.gradoop.flink.model.impl.functions.tuple.Value0Of2; +import org.gradoop.flink.model.impl.functions.tuple.Value1Of2; +import org.gradoop.flink.model.impl.functions.tuple.ValueInTuple1; import org.gradoop.flink.model.impl.functions.utils.LeftSide; import org.gradoop.flink.model.impl.functions.utils.RightSide; +import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.BOTH; +import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.BOTH_VERIFIED; +import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.VERTEX_INDUCED; +import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.EDGE_INDUCED; +import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.EDGE_INDUCED_PROJECT_FIRST; + /** * Extracts a subgraph from a logical graph using the given filter functions. * * The operator is able to: * 1) extract vertex-induced subgraph - * 2) extract edge-induced subgraph + * 2) extract edge-induced subgraph via join + union strategy + * 2) extract edge-induced subgraph via project + union + join strategy * 3) extract subgraph based on vertex and edge filter function - * - * Note that option 3) does not verify the consistency of the resulting graph. + * 4) extract subgraph based on vertex and edge filter function without verification (no joins) */ public class Subgraph implements UnaryGraphToGraphOperator { @@ -49,6 +61,42 @@ public class Subgraph implements UnaryGraphToGraphOperator { */ private final FilterFunction edgeFilterFunction; + /** + * Execution strategy for the operator. + */ + private final Strategy strategy; + + /** + * Available execution strategies. + */ + public enum Strategy { + /** + * Applies both filter functions on the input vertex and edge data set. + */ + BOTH, + /** + * Applies both filter functions on the input vertex and edge data set. In addition, this + * strategy verifies the consistency of the output graph by triplifying it projecting the + * vertices and edges. + */ + BOTH_VERIFIED, + /** + * Only applies the vertex filter function and adds the incident edges connecting those + * vertices via a join. + */ + VERTEX_INDUCED, + /** + * Only applies the edge filter function and computes the resulting vertices via: + * (E |><| V ON e.source = v.id) U (E |><| V on e.target = v.id) + */ + EDGE_INDUCED, + /** + * Only applies the edge filter function and computes the resulting vertices via: + * DISTINCT((π_source(E) U π_target(E))) |><| V + */ + EDGE_INDUCED_PROJECT_FIRST + } + /** * Creates a new sub graph operator instance. * @@ -63,21 +111,56 @@ public class Subgraph implements UnaryGraphToGraphOperator { * * @param vertexFilterFunction vertex filter function * @param edgeFilterFunction edge filter function + * @param strategy sets the execution strategy for the operator */ public Subgraph(FilterFunction vertexFilterFunction, - FilterFunction edgeFilterFunction) { - if (vertexFilterFunction == null && edgeFilterFunction == null) { + FilterFunction edgeFilterFunction, Strategy strategy) { + + if ((strategy == BOTH || strategy == BOTH_VERIFIED) && + vertexFilterFunction == null && edgeFilterFunction == null) { throw new IllegalArgumentException("No filter functions was given."); } + + if (strategy == VERTEX_INDUCED && vertexFilterFunction == null) { + throw new IllegalArgumentException("No vertex filter functions was given."); + } + + if ((strategy == EDGE_INDUCED || strategy == EDGE_INDUCED_PROJECT_FIRST) && + edgeFilterFunction == null) { + throw new IllegalArgumentException("No vertex edge functions was given."); + } + + this.strategy = strategy; + this.vertexFilterFunction = vertexFilterFunction; this.edgeFilterFunction = edgeFilterFunction; } @Override public LogicalGraph execute(LogicalGraph superGraph) { - return vertexFilterFunction != null && edgeFilterFunction != null ? - subgraph(superGraph) : vertexFilterFunction != null ? - vertexInducedSubgraph(superGraph) : edgeInducedSubgraph(superGraph); + + LogicalGraph result; + switch (strategy) { + case BOTH: + result = subgraph(superGraph); + break; + case BOTH_VERIFIED: + result = verify(subgraph(superGraph)); + break; + case VERTEX_INDUCED: + result = vertexInducedSubgraph(superGraph); + break; + case EDGE_INDUCED: + result = edgeInducedSubgraph(superGraph); + break; + case EDGE_INDUCED_PROJECT_FIRST: + result = edgeInducedSubgraphProjectFirst(superGraph); + break; + default: + throw new IllegalArgumentException("Strategy " + strategy + " is not supported"); + } + + return result; } /** @@ -87,18 +170,16 @@ public LogicalGraph execute(LogicalGraph superGraph) { * @param superGraph supergraph * @return vertex-induced subgraph */ - private LogicalGraph vertexInducedSubgraph( - LogicalGraph superGraph) { - DataSet filteredVertices = superGraph.getVertices() - .filter(vertexFilterFunction); + private LogicalGraph vertexInducedSubgraph(LogicalGraph superGraph) { + DataSet filteredVertices = superGraph.getVertices().filter(vertexFilterFunction); DataSet newEdges = superGraph.getEdges() .join(filteredVertices) - .where(new SourceId<>()).equalTo(new Id()) - .with(new LeftSide()) + .where(new SourceId<>()).equalTo(new Id<>()) + .with(new LeftSide<>()) .join(filteredVertices) - .where(new TargetId<>()).equalTo(new Id()) - .with(new LeftSide()); + .where(new TargetId<>()).equalTo(new Id<>()) + .with(new LeftSide<>()); return superGraph.getConfig().getLogicalGraphFactory().fromDataSets(filteredVertices, newEdges); } @@ -110,22 +191,48 @@ private LogicalGraph vertexInducedSubgraph( * @param superGraph supergraph * @return edge-induced subgraph */ - private LogicalGraph edgeInducedSubgraph( - LogicalGraph superGraph) { - DataSet filteredEdges = superGraph.getEdges() - .filter(edgeFilterFunction); + private LogicalGraph edgeInducedSubgraph(LogicalGraph superGraph) { + DataSet filteredEdges = superGraph.getEdges().filter(edgeFilterFunction); - DataSet newVertices = filteredEdges + DataSet filteredVertices = filteredEdges .join(superGraph.getVertices()) - .where(new SourceId<>()).equalTo(new Id()) - .with(new RightSide()) + .where(new SourceId<>()).equalTo(new Id<>()) + .with(new RightSide<>()) .union(filteredEdges .join(superGraph.getVertices()) - .where(new TargetId<>()).equalTo(new Id()) - .with(new RightSide())) - .distinct(new Id()); + .where(new TargetId<>()).equalTo(new Id<>()) + .with(new RightSide<>())) + .distinct(new Id<>()); + + return superGraph.getConfig().getLogicalGraphFactory() + .fromDataSets(filteredVertices, filteredEdges); + } - return superGraph.getConfig().getLogicalGraphFactory().fromDataSets(newVertices, filteredEdges); + /** + * Returns the subgraph of the given supergraph that is induced by the + * edges that fulfil the given filter function. + * + * @param superGraph supergraph + * @return edge-induced subgraph + */ + private LogicalGraph edgeInducedSubgraphProjectFirst(LogicalGraph superGraph) { + DataSet filteredEdges = superGraph.getEdges().filter(edgeFilterFunction); + + DataSet> vertexIdentifiers = filteredEdges + .map(new SourceId<>()) + .map(new ValueInTuple1<>()) + .union(filteredEdges + .map(new TargetId<>()) + .map(new ValueInTuple1<>())) + .distinct(); + + DataSet filteredVertices = vertexIdentifiers + .join(superGraph.getVertices()) + .where(0).equalTo(new Id<>()) + .with(new RightSide<>()); + + return superGraph.getConfig().getLogicalGraphFactory() + .fromDataSets(filteredVertices, filteredEdges); } /** @@ -140,10 +247,38 @@ private LogicalGraph edgeInducedSubgraph( * @return subgraph */ private LogicalGraph subgraph(LogicalGraph superGraph) { - return superGraph.getConfig().getLogicalGraphFactory().fromDataSets( - superGraph.getVertices().filter(vertexFilterFunction), - superGraph.getEdges().filter(edgeFilterFunction) - ); + return superGraph.getConfig().getLogicalGraphFactory() + .fromDataSets(superGraph.getVertices().filter(vertexFilterFunction), + superGraph.getEdges().filter(edgeFilterFunction)); + } + + /** + * Verifies that the given graph is consistent, contains only edges that connect to vertices + * within the subgraph. + * + * @param subgraph supergraph + * @return verified subgraph + */ + private LogicalGraph verify(LogicalGraph subgraph) { + + DataSet, Vertex>> verifiedTriples = subgraph.getEdges() + .join(subgraph.getVertices()) + .where(new SourceId<>()).equalTo(new Id<>()) + .join(subgraph.getVertices()) + .where("0.targetId").equalTo(new Id<>()); + + DataSet verifiedEdges = verifiedTriples + .map(new Value0Of2<>()) + .map(new Value0Of2<>()); + + DataSet verifiedVertices = verifiedTriples + .map(new Value0Of2<>()) + .map(new Value1Of2<>()) + .union(verifiedTriples.map(new Value1Of2<>())) + .distinct(new Id<>()); + + return subgraph.getConfig().getLogicalGraphFactory() + .fromDataSets(verifiedVertices, verifiedEdges); } @Override diff --git a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/subgraph/SubgraphTest.java b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/subgraph/SubgraphTest.java index 85dfe1fa0713..9c2212e44696 100644 --- a/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/subgraph/SubgraphTest.java +++ b/gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/subgraph/SubgraphTest.java @@ -56,6 +56,33 @@ public void testExistingSubgraph() throws Exception { collectAndAssertTrue(output.equalsByElementData(expected)); } + @Test + public void testExistingSubgraphWithVerification() throws Exception { + FlinkAsciiGraphLoader loader = getSocialNetworkLoader(); + + loader.appendToDatabaseFromString("expected[" + + "(alice)-[akb]->(bob)-[bkc]->(carol)-[ckd]->(dave)" + + "(alice)<-[bka]-(bob)<-[ckb]-(carol)<-[dkc]-(dave)" + + "(eve)-[eka]->(alice)" + + "(eve)-[ekb]->(bob)" + + "(frank)-[fkc]->(carol)" + + "(frank)-[fkd]->(dave)" + + "]"); + + LogicalGraph input = loader.getDatabase().getDatabaseGraph(); + + LogicalGraph expected = + loader.getLogicalGraphByVariable("expected"); + + LogicalGraph output = input + .subgraph( + v -> v.getLabel().equals("Person"), + e -> e.getLabel().equals("knows"), + Subgraph.Strategy.BOTH_VERIFIED); + + collectAndAssertTrue(output.equalsByElementData(expected)); + } + /** * Extracts a subgraph where only vertices fulfill the filter function. */ @@ -139,6 +166,25 @@ public void testEdgeInducedSubgraph() throws Exception { collectAndAssertTrue(output.equalsByElementData(expected)); } + @Test + public void testEdgeInducedSubgraphProjectFirst() throws Exception { + FlinkAsciiGraphLoader loader = getSocialNetworkLoader(); + + loader.appendToDatabaseFromString("expected[" + + "(databases)<-[ghtd]-(gdbs)-[ghtg1]->(graphs)" + + "(graphs)<-[ghtg2]-(gps)-[ghth]->(hadoop)" + + "]"); + + LogicalGraph input = loader.getDatabase().getDatabaseGraph(); + + LogicalGraph expected = loader.getLogicalGraphByVariable("expected"); + + LogicalGraph output = input.subgraph(null, + e -> e.getLabel().equals("hasTag"), Subgraph.Strategy.EDGE_INDUCED_PROJECT_FIRST); + + collectAndAssertTrue(output.equalsByElementData(expected)); + } + @Test public void testCollectionSubgraph() throws Exception { FlinkAsciiGraphLoader loader = getSocialNetworkLoader();