Permalink
Browse files

Merge pull request #715 from dbs-leipzig/graph_construction_poc

[#703] Add basic graph construction via pattern
  • Loading branch information...
s1ck committed Dec 21, 2017
2 parents 811fde9 + 874d089 commit 5d06a757bf24f0174c251461f357736effffdc5c
@@ -67,6 +67,7 @@
import org.gradoop.flink.model.impl.operators.tostring.functions.VertexToIdString;
import org.gradoop.flink.model.impl.operators.transformation.Transformation;
import org.gradoop.flink.util.GradoopFlinkConfig;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.io.IOException;
import java.util.List;
@@ -178,6 +179,14 @@ public GraphCollection cypher(String query) {
return cypher(query, new GraphStatistics(1, 1, 1, 1));
}
/**
* {@inheritDoc}
*/
@Override
public GraphCollection cypher(String query, String constructionPattern) {
return cypher(query, constructionPattern, new GraphStatistics(1, 1, 1, 1));
}
/**
* {@inheritDoc}
*/
@@ -187,14 +196,34 @@ public GraphCollection cypher(String query, GraphStatistics graphStatistics) {
MatchStrategy.HOMOMORPHISM, MatchStrategy.ISOMORPHISM, graphStatistics);
}
/**
* {@inheritDoc}
*/
@Override
public GraphCollection cypher(String query, String constructionPattern,
GraphStatistics graphStatistics) {
return cypher(query, constructionPattern, true,
MatchStrategy.HOMOMORPHISM, MatchStrategy.ISOMORPHISM, graphStatistics);
}
/**
* {@inheritDoc}
*/
@Override
public GraphCollection cypher(String query, boolean attachData, MatchStrategy vertexStrategy,
MatchStrategy edgeStrategy, GraphStatistics graphStatistics) {
return callForCollection(new CypherPatternMatching(query, attachData,
vertexStrategy, edgeStrategy, graphStatistics));
return cypher(query, null, attachData, vertexStrategy, edgeStrategy, graphStatistics);
}
/**
* {@inheritDoc}
*/
@Override
public GraphCollection cypher(String query, String constructionPattern, boolean attachData,
MatchStrategy vertexStrategy, MatchStrategy edgeStrategy, GraphStatistics graphStatistics) {
return callForCollection(new CypherPatternMatching(query, constructionPattern, attachData,
vertexStrategy, edgeStrategy, graphStatistics));
}
/**
@@ -36,6 +36,7 @@
import org.gradoop.flink.model.impl.operators.matching.common.statistics.GraphStatistics;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser.TraverserStrategy;
import org.gradoop.flink.model.impl.operators.neighborhood.Neighborhood;
import org.s1ck.gdl.model.Graph;
import java.util.List;
@@ -62,6 +63,35 @@
*/
GraphCollection cypher(String query);
/**
* Evaluates the given query using the Cypher query engine. The engine uses default morphism
* strategies, which is vertex homomorphism and edge isomorphism. The vertex and edge data of
* the data graph elements is attached to the resulting vertices.
*
* Note, that this method used no statistics about the data graph which may result in bad
* runtime performance. Use {@link LogicalGraphOperators#cypher(String, GraphStatistics)} to
* provide statistics for the query planner.
*
* In addition, the operator can be supplied with a construction pattern allowing the creation
* of new graph elements based on variable bindings of the match pattern. Consider the following
* example:
*
* <pre>
* <code>graph.cypher(
* "MATCH (a:Author)-[:WROTE]->(:Paper)<-[:WROTE]-(b:Author) WHERE a <> b",
* "(a)-[:CO_AUTHOR]->(b)")
* </code>
* </pre>
*
* The query pattern is looking for pairs of authors that worked on the same paper. The
* construction pattern defines a new edge of type CO_AUTHOR between the two entities.
*
* @param query Cypher query string
* @param constructionPattern Construction pattern
* @return graph collection containing the output of the construct pattern
*/
GraphCollection cypher(String query, String constructionPattern);
/**
* Evaluates the given query using the Cypher query engine. The engine uses default morphism
* strategies, which is vertex homomorphism and edge isomorphism. The vertex and edge data of
@@ -73,6 +103,32 @@
*/
GraphCollection cypher(String query, GraphStatistics graphStatistics);
/**
* Evaluates the given query using the Cypher query engine. The engine uses default morphism
* strategies, which is vertex homomorphism and edge isomorphism. The vertex and edge data of
* the data graph elements is attached to the resulting vertices.
*
* In addition, the operator can be supplied with a construction pattern allowing the creation
* of new graph elements based on variable bindings of the match pattern. Consider the following
* example:
*
* <pre>
* <code>graph.cypher(
* "MATCH (a:Author)-[:WROTE]->(:Paper)<-[:WROTE]-(b:Author) WHERE a <> b",
* "(a)-[:CO_AUTHOR]->(b)")
* </code>
* </pre>
*
* The query pattern is looking for pairs of authors that worked on the same paper. The
* construction pattern defines a new edge of type CO_AUTHOR between the two entities.
*
* @param query Cypher query
* @param constructionPattern Construction pattern
* @param graphStatistics statistics about the data graph
* @return graph collection containing the output of the construct pattern
*/
GraphCollection cypher(String query, String constructionPattern, GraphStatistics graphStatistics);
/**
* Evaluates the given query using the Cypher query engine.
*
@@ -86,6 +142,20 @@
GraphCollection cypher(String query, boolean attachData,
MatchStrategy vertexStrategy, MatchStrategy edgeStrategy, GraphStatistics graphStatistics);
/**
* Evaluates the given query using the Cypher query engine.
*
* @param query Cypher query
* @param constructionPattern Construction pattern
* @param attachData attach original vertex and edge data to the result
* @param vertexStrategy morphism setting for vertex mapping
* @param edgeStrategy morphism setting for edge mapping
* @param graphStatistics statistics about the data graph
* @return graph collection containing matching subgraphs
*/
GraphCollection cypher(String query, String constructionPattern, boolean attachData,
MatchStrategy vertexStrategy, MatchStrategy edgeStrategy, GraphStatistics graphStatistics);
/**
* Evaluates the given GDL query using the Traverser query engine.
*
@@ -19,6 +19,7 @@
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.pojo.EdgeFactory;
import org.gradoop.common.model.impl.pojo.GraphElement;
import org.gradoop.common.model.impl.id.GradoopIdList;
@@ -55,6 +56,9 @@ public void reduce(Iterable<GE> values, Collector<GE> out) throws Exception {
@Override
public GE join(GE first, GE second) throws Exception {
if(first == null) {
return second;
}
first.getGraphIds().addAll(second.getGraphIds());
return first;
}
@@ -35,6 +35,7 @@
import org.gradoop.flink.model.impl.functions.utils.Cast;
import org.gradoop.flink.model.impl.functions.utils.IsInstance;
import org.gradoop.flink.model.impl.functions.utils.RightSide;
import org.gradoop.flink.model.impl.operators.matching.common.query.QueryHandler;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.EdgeTriple;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.FatVertex;
import org.gradoop.flink.util.GradoopFlinkConfig;
@@ -97,13 +98,14 @@ public static GraphCollection extractGraphCollectionWithData(
// attach data by joining first and merging the graph head ids
DataSet<Vertex> newVertices = inputGraph.getVertices()
.join(collection.getVertices())
.rightOuterJoin(collection.getVertices())
.where(new Id<>()).equalTo(new Id<>())
.with(new MergedGraphIds<>())
.withForwardedFieldsFirst("id;label;properties;");
DataSet<Edge> newEdges = inputGraph.getEdges()
.join(collection.getEdges())
.rightOuterJoin(collection.getEdges())
.where(new Id<>()).equalTo(new Id<>())
.with(new MergedGraphIds<>())
.withForwardedFieldsFirst("id;label;properties");
@@ -157,7 +159,7 @@ public static GraphCollection extractGraphCollectionWithData(
* @return EPGM vertices
*/
public static DataSet<Vertex> extractVertices(DataSet<FatVertex> result,
EPGMVertexFactory<Vertex> epgmVertexFactory) {
EPGMVertexFactory<Vertex> epgmVertexFactory) {
return extractVertexIds(result).map(new VertexFromId(epgmVertexFactory));
}
@@ -17,6 +17,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.Pair;
import org.gradoop.common.util.GradoopConstants;
import org.gradoop.flink.model.impl.operators.matching.common.query.predicates.CNF;
import org.gradoop.flink.model.impl.operators.matching.common.query.predicates.QueryPredicate;
@@ -121,6 +122,34 @@ public QueryHandler(String gdlString) {
getVertexById(e.getSourceVertexId()), e, getVertexById(e.getTargetVertexId())))
.collect(Collectors.toList());
}
/**
* Returns all variables contained in the pattern.
*
* @return all query variables
*/
public Set<String> getAllVariables() {
return Sets.union(getVertexVariables(), getEdgeVariables());
}
/**
* Returns all vertex variables contained in the pattern.
*
* @return all vertex variables
*/
public Set<String> getVertexVariables() {
return gdlHandler.getVertexCache().keySet();
}
/**
* Returns all edge variables contained in the pattern.
*
* @return all edge variables
*/
public Set<String> getEdgeVariables() {
return gdlHandler.getEdgeCache().keySet();
}
/**
* Returns all available predicates in Conjunctive Normal Form {@link CNF}. If there are no
* predicated defined in the query, a CNF containing zero predicates is returned.
@@ -454,6 +483,38 @@ public Edge getEdgeByVariable(String variable) {
return neighbors;
}
/**
* Returns a mapping between the given variables (if existent) and the corresponding element
* label.
*
* @param variables query variables
* @return mapping between existing variables and their corresponding label
*/
public Map<String, String> getLabelsForVariables(Collection<String> variables) {
return variables.stream()
.filter(var -> isEdge(var) || isVertex(var))
.map(var -> {
if (isEdge(var)) {
return Pair.of(var, getEdgeByVariable(var).getLabel());
} else {
return Pair.of(var, getVertexByVariable(var).getLabel());
}
}).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}
/**
* Returns a mapping from edge variable to the corresponding source and target variables.
*
* @return mapping from edge variable to source/target variable
*/
public Map<String, Pair<String, String>> getSourceTargetVariables() {
return getEdges().stream()
.map(e -> Pair.of(e.getVariable(), Pair
.of(getVertexById(e.getSourceVertexId()).getVariable(),
getVertexById(e.getTargetVertexId()).getVariable())))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}
/**
* Returns the ids of the given graph elements (vertex/edge).
*
Oops, something went wrong.

0 comments on commit 5d06a75

Please sign in to comment.