Navigation Menu

Skip to content

Commit

Permalink
Add project strategy to edge induced subgraph
Browse files Browse the repository at this point in the history
* also adds validation strategy for regular subgraph
* fixes #796
  • Loading branch information
s1ck committed Jun 2, 2018
1 parent 64c46a0 commit 632f4da
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 38 deletions.
Expand Up @@ -306,7 +306,7 @@ public LogicalGraph transformEdges(
public LogicalGraph vertexInducedSubgraph(
FilterFunction<Vertex> vertexFilterFunction) {
Objects.requireNonNull(vertexFilterFunction);
return callForGraph(new Subgraph(vertexFilterFunction, null));
return callForGraph(new Subgraph(vertexFilterFunction, null, Subgraph.Strategy.VERTEX_INDUCED));
}

/**
Expand All @@ -316,19 +316,17 @@ public LogicalGraph vertexInducedSubgraph(
public LogicalGraph edgeInducedSubgraph(
FilterFunction<Edge> edgeFilterFunction) {
Objects.requireNonNull(edgeFilterFunction);
return callForGraph(new Subgraph(null, edgeFilterFunction));
return callForGraph(new Subgraph(null, edgeFilterFunction, Subgraph.Strategy.EDGE_INDUCED));
}

/**
* {@inheritDoc}
*/
@Override
public LogicalGraph subgraph(FilterFunction<Vertex> vertexFilterFunction,
FilterFunction<Edge> edgeFilterFunction) {
Objects.requireNonNull(vertexFilterFunction);
Objects.requireNonNull(edgeFilterFunction);
FilterFunction<Edge> edgeFilterFunction, Subgraph.Strategy strategy) {
return callForGraph(
new Subgraph(vertexFilterFunction, edgeFilterFunction));
new Subgraph(vertexFilterFunction, edgeFilterFunction, strategy));
}

/**
Expand Down
Expand Up @@ -36,8 +36,10 @@
import org.gradoop.flink.model.impl.operators.matching.common.statistics.GraphStatistics;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser.TraverserStrategy;
import org.gradoop.flink.model.impl.operators.neighborhood.Neighborhood;
import org.gradoop.flink.model.impl.operators.subgraph.Subgraph;

import java.util.List;
import java.util.Objects;

/**
* Defines the operators that are available on a {@link LogicalGraph}.
Expand Down Expand Up @@ -275,8 +277,29 @@ LogicalGraph transformGraphHead(
* @return logical graph which fulfils the given predicates and is a subgraph
* of that graph
*/
default LogicalGraph subgraph(FilterFunction<Vertex> vertexFilterFunction,
FilterFunction<Edge> edgeFilterFunction) {
Objects.requireNonNull(vertexFilterFunction);
Objects.requireNonNull(edgeFilterFunction);
return subgraph(vertexFilterFunction, edgeFilterFunction, Subgraph.Strategy.BOTH);
}

/**
* Returns a subgraph of the logical graph which contains only those vertices
* and edges that fulfil the given vertex and edge filter function
* respectively.
*
* Note, that the operator does not verify the consistency of the resulting
* graph. Use {#toGellyGraph().subgraph()} for that behaviour.
*
* @param vertexFilterFunction vertex filter function
* @param edgeFilterFunction edge filter function
* @param strategy execution strategy for the operator
* @return logical graph which fulfils the given predicates and is a subgraph
* of that graph
*/
LogicalGraph subgraph(FilterFunction<Vertex> vertexFilterFunction,
FilterFunction<Edge> edgeFilterFunction);
FilterFunction<Edge> edgeFilterFunction, Subgraph.Strategy strategy);

/**
* Applies the given aggregate function to the logical graph and stores the
Expand Down
Expand Up @@ -17,25 +17,37 @@

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.epgm.TargetId;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of2;
import org.gradoop.flink.model.impl.functions.tuple.Value1Of2;
import org.gradoop.flink.model.impl.functions.tuple.ValueInTuple1;
import org.gradoop.flink.model.impl.functions.utils.LeftSide;
import org.gradoop.flink.model.impl.functions.utils.RightSide;

import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.BOTH;
import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.BOTH_VERIFIED;
import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.VERTEX_INDUCED;
import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.EDGE_INDUCED;
import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.EDGE_INDUCED_PROJECT_FIRST;

/**
* Extracts a subgraph from a logical graph using the given filter functions.
*
* The operator is able to:
* 1) extract vertex-induced subgraph
* 2) extract edge-induced subgraph
* 2) extract edge-induced subgraph via join + union strategy
* 2) extract edge-induced subgraph via project + union + join strategy
* 3) extract subgraph based on vertex and edge filter function
*
* Note that option 3) does not verify the consistency of the resulting graph.
* 4) extract subgraph based on vertex and edge filter function without verification (no joins)
*/
public class Subgraph implements UnaryGraphToGraphOperator {

Expand All @@ -49,6 +61,42 @@ public class Subgraph implements UnaryGraphToGraphOperator {
*/
private final FilterFunction<Edge> edgeFilterFunction;

/**
* Execution strategy for the operator.
*/
private final Strategy strategy;

/**
* Available execution strategies.
*/
public enum Strategy {
/**
* Applies both filter functions on the input vertex and edge data set.
*/
BOTH,
/**
* Applies both filter functions on the input vertex and edge data set. In addition, this
* strategy verifies the consistency of the output graph by triplifying it projecting the
* vertices and edges.
*/
BOTH_VERIFIED,
/**
* Only applies the vertex filter function and adds the incident edges connecting those
* vertices via a join.
*/
VERTEX_INDUCED,
/**
* Only applies the edge filter function and computes the resulting vertices via:
* (E |><| V ON e.source = v.id) U (E |><| V on e.target = v.id)
*/
EDGE_INDUCED,
/**
* Only applies the edge filter function and computes the resulting vertices via:
* DISTINCT((π_source(E) U π_target(E))) |><| V
*/
EDGE_INDUCED_PROJECT_FIRST
}

/**
* Creates a new sub graph operator instance.
*
Expand All @@ -63,21 +111,56 @@ public class Subgraph implements UnaryGraphToGraphOperator {
*
* @param vertexFilterFunction vertex filter function
* @param edgeFilterFunction edge filter function
* @param strategy sets the execution strategy for the operator
*/
public Subgraph(FilterFunction<Vertex> vertexFilterFunction,
FilterFunction<Edge> edgeFilterFunction) {
if (vertexFilterFunction == null && edgeFilterFunction == null) {
FilterFunction<Edge> edgeFilterFunction, Strategy strategy) {

if ((strategy == BOTH || strategy == BOTH_VERIFIED) &&
vertexFilterFunction == null && edgeFilterFunction == null) {
throw new IllegalArgumentException("No filter functions was given.");
}

if (strategy == VERTEX_INDUCED && vertexFilterFunction == null) {
throw new IllegalArgumentException("No vertex filter functions was given.");
}

if ((strategy == EDGE_INDUCED || strategy == EDGE_INDUCED_PROJECT_FIRST) &&
edgeFilterFunction == null) {
throw new IllegalArgumentException("No vertex edge functions was given.");
}

this.strategy = strategy;

this.vertexFilterFunction = vertexFilterFunction;
this.edgeFilterFunction = edgeFilterFunction;
}

@Override
public LogicalGraph execute(LogicalGraph superGraph) {
return vertexFilterFunction != null && edgeFilterFunction != null ?
subgraph(superGraph) : vertexFilterFunction != null ?
vertexInducedSubgraph(superGraph) : edgeInducedSubgraph(superGraph);

LogicalGraph result;
switch (strategy) {
case BOTH:
result = subgraph(superGraph);
break;
case BOTH_VERIFIED:
result = verify(subgraph(superGraph));
break;
case VERTEX_INDUCED:
result = vertexInducedSubgraph(superGraph);
break;
case EDGE_INDUCED:
result = edgeInducedSubgraph(superGraph);
break;
case EDGE_INDUCED_PROJECT_FIRST:
result = edgeInducedSubgraphProjectFirst(superGraph);
break;
default:
throw new IllegalArgumentException("Strategy " + strategy + " is not supported");
}

return result;
}

/**
Expand All @@ -87,18 +170,16 @@ public LogicalGraph execute(LogicalGraph superGraph) {
* @param superGraph supergraph
* @return vertex-induced subgraph
*/
private LogicalGraph vertexInducedSubgraph(
LogicalGraph superGraph) {
DataSet<Vertex> filteredVertices = superGraph.getVertices()
.filter(vertexFilterFunction);
private LogicalGraph vertexInducedSubgraph(LogicalGraph superGraph) {
DataSet<Vertex> filteredVertices = superGraph.getVertices().filter(vertexFilterFunction);

DataSet<Edge> newEdges = superGraph.getEdges()
.join(filteredVertices)
.where(new SourceId<>()).equalTo(new Id<Vertex>())
.with(new LeftSide<Edge, Vertex>())
.where(new SourceId<>()).equalTo(new Id<>())
.with(new LeftSide<>())
.join(filteredVertices)
.where(new TargetId<>()).equalTo(new Id<Vertex>())
.with(new LeftSide<Edge, Vertex>());
.where(new TargetId<>()).equalTo(new Id<>())
.with(new LeftSide<>());

return superGraph.getConfig().getLogicalGraphFactory().fromDataSets(filteredVertices, newEdges);
}
Expand All @@ -110,22 +191,48 @@ private LogicalGraph vertexInducedSubgraph(
* @param superGraph supergraph
* @return edge-induced subgraph
*/
private LogicalGraph edgeInducedSubgraph(
LogicalGraph superGraph) {
DataSet<Edge> filteredEdges = superGraph.getEdges()
.filter(edgeFilterFunction);
private LogicalGraph edgeInducedSubgraph(LogicalGraph superGraph) {
DataSet<Edge> filteredEdges = superGraph.getEdges().filter(edgeFilterFunction);

DataSet<Vertex> newVertices = filteredEdges
DataSet<Vertex> filteredVertices = filteredEdges
.join(superGraph.getVertices())
.where(new SourceId<>()).equalTo(new Id<Vertex>())
.with(new RightSide<Edge, Vertex>())
.where(new SourceId<>()).equalTo(new Id<>())
.with(new RightSide<>())
.union(filteredEdges
.join(superGraph.getVertices())
.where(new TargetId<>()).equalTo(new Id<Vertex>())
.with(new RightSide<Edge, Vertex>()))
.distinct(new Id<Vertex>());
.where(new TargetId<>()).equalTo(new Id<>())
.with(new RightSide<>()))
.distinct(new Id<>());

return superGraph.getConfig().getLogicalGraphFactory()
.fromDataSets(filteredVertices, filteredEdges);
}

return superGraph.getConfig().getLogicalGraphFactory().fromDataSets(newVertices, filteredEdges);
/**
* Returns the subgraph of the given supergraph that is induced by the
* edges that fulfil the given filter function.
*
* @param superGraph supergraph
* @return edge-induced subgraph
*/
private LogicalGraph edgeInducedSubgraphProjectFirst(LogicalGraph superGraph) {
DataSet<Edge> filteredEdges = superGraph.getEdges().filter(edgeFilterFunction);

DataSet<Tuple1<GradoopId>> vertexIdentifiers = filteredEdges
.map(new SourceId<>())
.map(new ValueInTuple1<>())
.union(filteredEdges
.map(new TargetId<>())
.map(new ValueInTuple1<>()))
.distinct();

DataSet<Vertex> filteredVertices = vertexIdentifiers
.join(superGraph.getVertices())
.where(0).equalTo(new Id<>())
.with(new RightSide<>());

return superGraph.getConfig().getLogicalGraphFactory()
.fromDataSets(filteredVertices, filteredEdges);
}

/**
Expand All @@ -140,10 +247,38 @@ private LogicalGraph edgeInducedSubgraph(
* @return subgraph
*/
private LogicalGraph subgraph(LogicalGraph superGraph) {
return superGraph.getConfig().getLogicalGraphFactory().fromDataSets(
superGraph.getVertices().filter(vertexFilterFunction),
superGraph.getEdges().filter(edgeFilterFunction)
);
return superGraph.getConfig().getLogicalGraphFactory()
.fromDataSets(superGraph.getVertices().filter(vertexFilterFunction),
superGraph.getEdges().filter(edgeFilterFunction));
}

/**
* Verifies that the given graph is consistent, contains only edges that connect to vertices
* within the subgraph.
*
* @param subgraph supergraph
* @return verified subgraph
*/
private LogicalGraph verify(LogicalGraph subgraph) {

DataSet<Tuple2<Tuple2<Edge, Vertex>, Vertex>> verifiedTriples = subgraph.getEdges()
.join(subgraph.getVertices())
.where(new SourceId<>()).equalTo(new Id<>())
.join(subgraph.getVertices())
.where("0.targetId").equalTo(new Id<>());

DataSet<Edge> verifiedEdges = verifiedTriples
.map(new Value0Of2<>())
.map(new Value0Of2<>());

DataSet<Vertex> verifiedVertices = verifiedTriples
.map(new Value0Of2<>())
.map(new Value1Of2<>())
.union(verifiedTriples.map(new Value1Of2<>()))
.distinct(new Id<>());

return subgraph.getConfig().getLogicalGraphFactory()
.fromDataSets(verifiedVertices, verifiedEdges);
}

@Override
Expand Down

0 comments on commit 632f4da

Please sign in to comment.