Skip to content
Permalink
Browse files

[#1011] Remaining data-integration operators (#1123)

fixes #1011
  • Loading branch information...
merando authored and galpha committed Feb 14, 2019
1 parent f428cbe commit f113b2468d5487243dd5a68b969515bbdb1a4e92
Showing with 2,452 additions and 22 deletions.
  1. +101 −0 ...p-data-integration/src/main/java/org/gradoop/dataintegration/transformation/ConnectNeighbors.java
  2. +106 −0 gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/EdgeToVertex.java
  3. +137 −0 ...gration/src/main/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighbor.java
  4. +97 −0 gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/VertexToEdge.java
  5. +91 −0 ...rc/main/java/org/gradoop/dataintegration/transformation/functions/AccumulatePropagatedValues.java
  6. +73 −0 ...src/main/java/org/gradoop/dataintegration/transformation/functions/BuildIdPropertyValuePairs.java
  7. +38 −0 ...a/org/gradoop/dataintegration/transformation/functions/BuildTargetVertexIdPropertyValuePairs.java
  8. +89 −0 ...n/java/org/gradoop/dataintegration/transformation/functions/CreateCartesianNeighborhoodEdges.java
  9. +97 −0 ...ion/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateEdgesFromTriple.java
  10. +94 −0 ...ration/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateNeighborList.java
  11. +76 −0 ...ion/src/main/java/org/gradoop/dataintegration/transformation/functions/CreateVertexFromEdges.java
  12. +123 −0 ...in/java/org/gradoop/dataintegration/transformation/functions/EdgesFromLocalTransitiveClosure.java
  13. +19 −0 ...-integration/src/main/java/org/gradoop/dataintegration/transformation/functions/package-info.java
  14. +101 −0 ...-data-integration/src/main/java/org/gradoop/dataintegration/transformation/impl/Neighborhood.java
  15. +55 −0 ...integration/src/main/java/org/gradoop/dataintegration/transformation/impl/NeighborhoodVertex.java
  16. +19 −0 gradoop-data-integration/src/main/java/org/gradoop/dataintegration/transformation/package-info.java
  17. +91 −0 ...ta-integration/src/test/java/org/gradoop/dataintegration/transformation/ConnectNeighborsTest.java
  18. +103 −0 ...p-data-integration/src/test/java/org/gradoop/dataintegration/transformation/EdgeToVertexTest.java
  19. +182 −0 ...ion/src/test/java/org/gradoop/dataintegration/transformation/PropagatePropertyToNeighborTest.java
  20. +59 −0 ...p-data-integration/src/test/java/org/gradoop/dataintegration/transformation/VertexToEdgeTest.java
  21. +64 −0 ...est/java/org/gradoop/dataintegration/transformation/functions/AccumulatePropagatedValuesTest.java
  22. +58 −0 ...test/java/org/gradoop/dataintegration/transformation/functions/BuildIdPropertyValuePairsTest.java
  23. +65 −0 ...g/gradoop/dataintegration/transformation/functions/BuildTargetVertexIdPropertyValuePairsTest.java
  24. +111 −0 ...va/org/gradoop/dataintegration/transformation/functions/CreateCartesianNeighborhoodEdgesTest.java
  25. +88 −0 ...src/test/java/org/gradoop/dataintegration/transformation/functions/CreateEdgesFromTripleTest.java
  26. +65 −0 ...src/test/java/org/gradoop/dataintegration/transformation/functions/CreateVertexFromEdgesTest.java
  27. +119 −0 ...egration/src/test/java/org/gradoop/dataintegration/transformation/functions/NeighborhoodTest.java
  28. +10 −6 .../src/test/java/org/gradoop/dataintegration/transformation/impl/ExtractPropertyFromVertexTest.java
  29. +8 −9 ...ta-integration/src/test/java/org/gradoop/dataintegration/transformation/impl/InvertEdgesTest.java
  30. +1 −1 gradoop-examples/src/main/java/org/gradoop/benchmark/sna/SocialNetworkAnalyticsExample.java
  31. +19 −6 .../java/org/gradoop/flink/model/impl/{operators/subgraph/functions → functions/epgm}/LabelIsIn.java
  32. +93 −0 gradoop-flink/src/test/java/org/gradoop/flink/model/impl/functions/epgm/LabelIsInTest.java
@@ -0,0 +1,101 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.dataintegration.transformation;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.dataintegration.transformation.functions.CreateCartesianNeighborhoodEdges;
import org.gradoop.dataintegration.transformation.impl.Neighborhood;
import org.gradoop.dataintegration.transformation.impl.NeighborhoodVertex;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.LabelIsIn;

import java.util.List;
import java.util.Objects;

/**
* This graph transformation adds new edges to the graph. Those edges are created if a vertex of a
* user-defined label has two neighbors of another user-defined label. A bidirectional (two edges
* in gradoop) edge is then created between those two neighbors.
*/
public class ConnectNeighbors implements UnaryGraphToGraphOperator {

/**
* The label of the vertices the neighborhood is connected.
*/
private final String sourceVertexLabel;

/**
* The edge direction to consider.
*/
private final Neighborhood.EdgeDirection edgeDirection;

/**
* The label of the neighboring vertices that should be connected.
*/
private final String neighborhoodVertexLabel;

/**
* The label of the created edge between the neighbors.
*/
private final String newEdgeLabel;

/**
* The constructor to connect the neighbors of vertices with a certain label.
*
* @param sourceVertexLabel The label of the vertices the neighborhood is connected.
* @param edgeDirection The edge direction to consider.
* @param neighborhoodVertexLabel The label of the neighboring vertices that should be connected.
* @param newEdgeLabel The label of the created edge between the neighbors.
*/
public ConnectNeighbors(String sourceVertexLabel, Neighborhood.EdgeDirection edgeDirection,
String neighborhoodVertexLabel, String newEdgeLabel) {
this.sourceVertexLabel = Objects.requireNonNull(sourceVertexLabel);
this.edgeDirection = Objects.requireNonNull(edgeDirection);
this.neighborhoodVertexLabel = Objects.requireNonNull(neighborhoodVertexLabel);
this.newEdgeLabel = Objects.requireNonNull(newEdgeLabel);
}

@Override
public LogicalGraph execute(LogicalGraph graph) {

// determine the vertices the neighborhood should be calculated for
DataSet<Vertex> verticesByLabel = graph.getVerticesByLabel(sourceVertexLabel);

// prepare the graph
LogicalGraph reducedGraph = graph
.vertexInducedSubgraph(new LabelIsIn<>(sourceVertexLabel, neighborhoodVertexLabel));

// determine the neighborhood by edge direction
DataSet<Tuple2<Vertex, List<NeighborhoodVertex>>> neighborhood =
Neighborhood.getPerVertex(reducedGraph, verticesByLabel, edgeDirection);

// calculate the new edges and add them to the original graph
DataSet<Edge> newEdges = neighborhood.flatMap(
new CreateCartesianNeighborhoodEdges<>(graph.getConfig().getEdgeFactory(), newEdgeLabel));

return graph.getConfig().getLogicalGraphFactory()
.fromDataSets(graph.getVertices(), graph.getEdges().union(newEdges));
}

@Override
public String getName() {
return ConnectNeighbors.class.getName();
}
}
@@ -0,0 +1,106 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.dataintegration.transformation;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.dataintegration.transformation.functions.CreateEdgesFromTriple;
import org.gradoop.dataintegration.transformation.functions.CreateVertexFromEdges;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of3;

import java.util.Objects;

/**
* For edges of a specific label this graph transformation creates a new vertex containing the
* properties of the edge and two new edges respecting the direction of the original edge.
* The newly created edges and vertex labels are user-defined.
* <p>
* The original edges are still part of the resulting graph.
* Use a {@link org.apache.flink.api.common.functions.FilterFunction} on the original label to
* remove them.
*/
public class EdgeToVertex implements UnaryGraphToGraphOperator {

/**
* The label of the edges use for the transformation.
*/
private final String edgeLabel;

/**
* The label of the newly created vertex.
*/
private final String newVertexLabel;

/**
* The label of the newly created edge which points to the newly created vertex.
*/
private final String edgeLabelSourceToNew;

/**
* The label of the newly created edge which starts at the newly created vertex.
*/
private final String edgeLabelNewToTarget;

/**
* The constructor for the structural transformation.
*
* @param edgeLabel The label of the edges use for the transformation.
* (No edges will be transformed if this parameter is {@code null}).
* @param newVertexLabel The label of the newly created vertex.
* @param edgeLabelSourceToNew The label of the newly created edge which points to the newly
* created vertex.
* @param edgeLabelNewToTarget The label of the newly created edge which starts at the newly
* created vertex.
*/
public EdgeToVertex(String edgeLabel, String newVertexLabel, String edgeLabelSourceToNew,
String edgeLabelNewToTarget) {
this.edgeLabel = edgeLabel;
this.newVertexLabel = Objects.requireNonNull(newVertexLabel);
this.edgeLabelSourceToNew = Objects.requireNonNull(edgeLabelSourceToNew);
this.edgeLabelNewToTarget = Objects.requireNonNull(edgeLabelNewToTarget);
}

@Override
public LogicalGraph execute(LogicalGraph graph) {
DataSet<Edge> relevantEdges = graph.getEdgesByLabel(edgeLabel);

// create new vertices
DataSet<Tuple3<Vertex, GradoopId, GradoopId>> newVerticesAndOriginIds = relevantEdges
.map(new CreateVertexFromEdges<>(newVertexLabel, graph.getFactory().getVertexFactory()));

DataSet<Vertex> newVertices = newVerticesAndOriginIds
.map(new Value0Of3<>())
.union(graph.getVertices());

// create edges to the newly created vertex
DataSet<Edge> newEdges = newVerticesAndOriginIds
.flatMap(new CreateEdgesFromTriple<>(graph.getFactory().getEdgeFactory(),
edgeLabelSourceToNew, edgeLabelNewToTarget))
.union(graph.getEdges());

return graph.getFactory().fromDataSets(newVertices, newEdges);
}

@Override
public String getName() {
return EdgeToVertex.class.getName();
}
}
@@ -0,0 +1,137 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.dataintegration.transformation;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.dataintegration.transformation.functions.AccumulatePropagatedValues;
import org.gradoop.dataintegration.transformation.functions.BuildIdPropertyValuePairs;
import org.gradoop.dataintegration.transformation.functions.BuildTargetVertexIdPropertyValuePairs;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.LabelIsIn;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;

import java.util.Objects;
import java.util.Set;

/**
* A property of a vertex is propagated to its neighbors and aggregated in a Property List.
*/
public class PropagatePropertyToNeighbor implements UnaryGraphToGraphOperator {

/**
* The label of the vertex the property to propagate is part of.
*/
private final String vertexLabel;

/**
* The property key of the property to propagate.
*/
private final String propertyKey;

/**
* The property key where the PropertyValue list should be stored at the target vertices.
*/
private final String targetVertexPropertyKey;

/**
* Only edges with the inserted labels are used. If all labels are sufficient use {@code null}.
*/
private final Set<String> propagatingEdgeLabels;

/**
* Only vertices with the inserted labels will store the propagated values.
* If all vertices should do it use {@code null}.
*/
private final Set<String> targetVertexLabels;

/**
* The constructor for the propagate property transformation. Additionally it is possible to
* define which edge labels can be used for propagation and / or which vertices could be target
* of the Properties.
* <p>
* Using this constructor, properties will be propagated along all edges and to all
* target vertices. {@link #PropagatePropertyToNeighbor(String, String, String, Set, Set)}
* can be used when properties should only be propagated along certain edges (selected by their
* label) and / or to certain vertices (selected by their label). Using this constructor is
* equivalent to {@code PropagatePropertyToNeighbor(vertexLabel, propertyKey,
* targetVertexPropertyKey, null, null)}.
*
* @param vertexLabel The label of the vertex the property to propagate is part of.
* @param propertyKey The property key of the property to propagate.
* @param targetVertexPropertyKey The property key where the PropertyValue list should be stored
* at the target vertices.
*/
public PropagatePropertyToNeighbor(String vertexLabel, String propertyKey,
String targetVertexPropertyKey) {
this(vertexLabel, propertyKey, targetVertexPropertyKey, null, null);
}

/**
* The constructor for the propagate property transformation. Additionally it is possible to
* define which edge labels can be used for propagation and / or which vertices could be target
* of the Properties.
*
* @param vertexLabel The label of the vertex the property to propagate is part of.
* @param propertyKey The property key of the property to propagate.
* @param targetVertexPropertyKey The property key where the PropertyValue list should be stored
* at the target vertices.
* @param propagatingEdges Only edges with the inserted labels are used. If all labels
* are sufficient use {@code null}.
* @param targetVertexLabels Only vertices with the inserted labels will store the
* propagated values. If all vertices should, use {@code null}.
*/
public PropagatePropertyToNeighbor(String vertexLabel, String propertyKey,
String targetVertexPropertyKey, Set<String> propagatingEdges,
Set<String> targetVertexLabels) {
this.vertexLabel = Objects.requireNonNull(vertexLabel);
this.propertyKey = Objects.requireNonNull(propertyKey);
this.targetVertexPropertyKey = Objects.requireNonNull(targetVertexPropertyKey);
this.propagatingEdgeLabels = propagatingEdges;
this.targetVertexLabels = targetVertexLabels;
}

@Override
public LogicalGraph execute(LogicalGraph graph) {
// prepare the edge set, EdgeFilter if propagating edges are given
DataSet<Edge> propagateAlong = graph.getEdges();
if (propagatingEdgeLabels != null) {
propagateAlong = propagateAlong.filter(new LabelIsIn<>(propagatingEdgeLabels));
}

DataSet<Vertex> newVertices = graph.getVertices()
// Extract properties to propagate
.flatMap(new BuildIdPropertyValuePairs<>(vertexLabel, propertyKey))
// Propagate along edges.
.join(propagateAlong)
.where(0).equalTo(new SourceId<>())
.with(new BuildTargetVertexIdPropertyValuePairs<>())
// Update target vertices.
.coGroup(graph.getVertices())
.where(0).equalTo(new Id<>())
.with(new AccumulatePropagatedValues<>(targetVertexPropertyKey, targetVertexLabels));

return graph.getFactory().fromDataSets(newVertices, graph.getEdges());
}

@Override
public String getName() {
return PropagatePropertyToNeighbor.class.getName();
}
}
Oops, something went wrong.

0 comments on commit f113b24

Please sign in to comment.
You can’t perform that action at this time.