Skip to content
Permalink
Browse files

[1303] Generify Apply

  • Loading branch information...
timo95 committed Jun 26, 2019
1 parent 359633a commit 278b61874ec0943b895e8458c8413c5e42cc5895

This file was deleted.

@@ -19,9 +19,9 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.api.entities.GraphElement;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.common.model.impl.pojo.EPGMGraphElement;

/**
* graphIds (BC)
@@ -31,8 +31,8 @@
*
* @param <EL> graph element type
*/
public class ElementsOfSelectedGraphs<EL extends EPGMGraphElement> extends
RichFlatMapFunction<EL, Tuple2<GradoopId, EL>> {
public class ElementsOfSelectedGraphs<EL extends GraphElement>
extends RichFlatMapFunction<EL, Tuple2<GradoopId, EL>> {

/**
* constant string for "graph ids"
@@ -64,8 +64,7 @@ public void open(Configuration parameters) throws Exception {
}

@Override
public void flatMap(EL el, Collector
<Tuple2<GradoopId, EL>> collector) throws Exception {
public void flatMap(EL el, Collector<Tuple2<GradoopId, EL>> collector) throws Exception {
for (GradoopId graphId : el.getGraphIds()) {
if (graphIds.contains(graphId)) {
reuse.f0 = graphId;
@@ -17,14 +17,15 @@

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.operators.ApplicableUnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.operators.ApplicableUnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.functions.epgm.ElementsOfSelectedGraphs;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
@@ -42,12 +43,23 @@
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Takes a collection of logical graphs and user defined aggregate functions as
* input. The aggregate functions are applied on each logical graph contained in
* Takes a collection of base graphs and user defined aggregate functions as
* input. The aggregate functions are applied on each base graph contained in
* the collection and the aggregate is stored as additional properties at the graphs.
*
* @param <G> type of the graph head
* @param <V> the vertex type
* @param <E> the edge type
* @param <LG> type of the logical graph instance
* @param <GC> type of the graph collection
*/
public class ApplyAggregation
implements ApplicableUnaryGraphToGraphOperator {
public class ApplyAggregation<
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>>
implements ApplicableUnaryBaseGraphToBaseGraphOperator<GC> {

/**
* User-defined aggregate functions which get applied on a graph collection.
@@ -67,8 +79,8 @@ public ApplyAggregation(final AggregateFunction... aggregateFunctions) {
}

@Override
public GraphCollection executeForGVELayout(GraphCollection collection) {
DataSet<EPGMGraphHead> graphHeads = collection.getGraphHeads();
public GC executeForGVELayout(GC collection) {
DataSet<G> graphHeads = collection.getGraphHeads();
DataSet<GradoopId> graphIds = graphHeads.map(new Id<>());

DataSet<Tuple2<GradoopId, Map<String, PropertyValue>>> aggregate =
@@ -80,18 +92,17 @@ public GraphCollection executeForGVELayout(GraphCollection collection) {
graphHeads = graphHeads
.coGroup(aggregate)
.where(new Id<>()).equalTo(0)
.with(new SetAggregateProperties(aggregateFunctions));
.with(new SetAggregateProperties<>(aggregateFunctions));

return collection.getConfig().getGraphCollectionFactory()
.fromDataSets(graphHeads, collection.getVertices(), collection.getEdges());
return collection.getFactory().fromDataSets(graphHeads, collection.getVertices(), collection.getEdges());
}

@Override
public GraphCollection executeForTxLayout(GraphCollection collection) {
public GC executeForTxLayout(GC collection) {
DataSet<GraphTransaction> updatedTransactions = collection.getGraphTransactions()
.map(new AggregateTransactions(aggregateFunctions));

return collection.getConfig().getGraphCollectionFactory().fromTransactions(updatedTransactions);
return collection.getFactory().fromTransactions(updatedTransactions);
}

/**
@@ -102,7 +113,7 @@ public GraphCollection executeForTxLayout(GraphCollection collection) {
* @return partition aggregate value
*/
private DataSet<Tuple2<GradoopId, Map<String, PropertyValue>>> aggregateVertices(
DataSet<EPGMVertex> vertices, DataSet<GradoopId> graphIds) {
DataSet<V> vertices, DataSet<GradoopId> graphIds) {
return vertices
.flatMap(new ElementsOfSelectedGraphs<>())
.withBroadcastSet(graphIds, ElementsOfSelectedGraphs.GRAPH_IDS)
@@ -120,7 +131,7 @@ public GraphCollection executeForTxLayout(GraphCollection collection) {
* @return partition aggregate value
*/
private DataSet<Tuple2<GradoopId, Map<String, PropertyValue>>> aggregateEdges(
DataSet<EPGMEdge> edges, DataSet<GradoopId> graphIds) {
DataSet<E> edges, DataSet<GradoopId> graphIds) {
return edges
.flatMap(new ElementsOfSelectedGraphs<>())
.withBroadcastSet(graphIds, ElementsOfSelectedGraphs.GRAPH_IDS)
@@ -18,8 +18,8 @@
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMElement;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.functions.AggregateFunction;

@@ -34,8 +34,8 @@
*
* @param <T> element type
*/
public class ApplyAggregateElements<T extends EPGMElement> implements GroupCombineFunction
<Tuple2<GradoopId, T>, Tuple2<GradoopId, Map<String, PropertyValue>>> {
public class ApplyAggregateElements<T extends Element>
implements GroupCombineFunction<Tuple2<GradoopId, T>, Tuple2<GradoopId, Map<String, PropertyValue>>> {

/**
* Aggregate functions.
@@ -18,8 +18,8 @@
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.functions.AggregateFunction;

@@ -30,9 +30,11 @@

/**
* Sets aggregate values of graph heads.
*
* @param <G> type of the graph head
*/
public class SetAggregateProperties implements
CoGroupFunction<EPGMGraphHead, Tuple2<GradoopId, Map<String, PropertyValue>>, EPGMGraphHead> {
public class SetAggregateProperties<G extends GraphHead>
implements CoGroupFunction<G, Tuple2<GradoopId, Map<String, PropertyValue>>, G> {

/**
* default values used to replace aggregate values in case of NULL.
@@ -61,10 +63,10 @@ public SetAggregateProperties(final Set<AggregateFunction> aggregateFunctions) {
}

@Override
public void coGroup(Iterable<EPGMGraphHead> left,
Iterable<Tuple2<GradoopId, Map<String, PropertyValue>>> right, Collector<EPGMGraphHead> out) {
public void coGroup(Iterable<G> left,
Iterable<Tuple2<GradoopId, Map<String, PropertyValue>>> right, Collector<G> out) {

for (EPGMGraphHead leftElem : left) {
for (G leftElem : left) {
boolean rightEmpty = true;
for (Tuple2<GradoopId, Map<String, PropertyValue>> rightElem : right) {
Map<String, PropertyValue> values = rightElem.f1;
@@ -16,24 +16,33 @@
package org.gradoop.flink.model.impl.operators.transformation;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.operators.ApplicableUnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.api.functions.TransformationFunction;
import org.gradoop.flink.model.api.operators.ApplicableUnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
import org.gradoop.flink.model.impl.operators.transformation.functions.TransformGraphTransaction;
import org.gradoop.flink.util.GradoopFlinkConfig;

/**
* Applies the transformation operator on on all logical graphs in a graph
* collection.
* Applies the transformation operator on on all base graphs in a graph collection.
*
* @param <G> type of the graph head
* @param <V> the vertex type
* @param <E> the edge type
* @param <LG> type of the logical graph instance
* @param <GC> type of the graph collection
*/
public class ApplyTransformation
extends Transformation<EPGMGraphHead, EPGMVertex, EPGMEdge, LogicalGraph, GraphCollection>
implements ApplicableUnaryGraphToGraphOperator {
public class ApplyTransformation<
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> extends Transformation<G, V, E, LG, GC>
implements ApplicableUnaryBaseGraphToBaseGraphOperator<GC> {

/**
* Creates a new operator instance.
@@ -42,21 +51,21 @@
* @param vertexModFunc vertex transformation function
* @param edgeModFunc edge transformation function
*/
public ApplyTransformation(TransformationFunction<EPGMGraphHead> graphHeadModFunc,
TransformationFunction<EPGMVertex> vertexModFunc,
TransformationFunction<EPGMEdge> edgeModFunc) {
public ApplyTransformation(TransformationFunction<G> graphHeadModFunc,
TransformationFunction<V> vertexModFunc,
TransformationFunction<E> edgeModFunc) {
super(graphHeadModFunc, vertexModFunc, edgeModFunc);
}


@Override
public GraphCollection executeForGVELayout(GraphCollection collection) {
// the resulting logical graph holds multiple graph heads
LogicalGraph modifiedGraph = executeInternal(
public GC executeForGVELayout(GC collection) {
// the resulting graph holds multiple graph heads
LG modifiedGraph = executeInternal(
collection.getGraphHeads(),
collection.getVertices(),
collection.getEdges(),
collection.getConfig().getLogicalGraphFactory());
collection.getGraphFactory());

return collection.getFactory().fromDataSets(
modifiedGraph.getGraphHead(),
@@ -65,21 +74,23 @@ public GraphCollection executeForGVELayout(GraphCollection collection) {
}

@Override
public GraphCollection executeForTxLayout(GraphCollection collection) {
DataSet<GraphTransaction> graphTransactions = collection.getGraphTransactions();

GradoopFlinkConfig config = collection.getConfig();
public GC executeForTxLayout(GC collection) {
if (collection instanceof GraphCollection) {
DataSet<GraphTransaction> graphTransactions = collection.getGraphTransactions();

DataSet<GraphTransaction> transformedGraphTransactions = graphTransactions
.map(new TransformGraphTransaction(
collection.getFactory().getGraphHeadFactory(),
graphHeadTransFunc,
collection.getFactory().getVertexFactory(),
vertexTransFunc,
collection.getFactory().getEdgeFactory(),
edgeTransFunc
));
DataSet<GraphTransaction> transformedGraphTransactions = graphTransactions
.map(new TransformGraphTransaction<>(
collection.getFactory().getGraphHeadFactory(),
graphHeadTransFunc,
collection.getFactory().getVertexFactory(),
vertexTransFunc,
collection.getFactory().getEdgeFactory(),
edgeTransFunc
));

return config.getGraphCollectionFactory().fromTransactions(transformedGraphTransactions);
return collection.getFactory().fromTransactions(transformedGraphTransactions);
} else {
return executeForGVELayout(collection);
}
}
}

0 comments on commit 278b618

Please sign in to comment.
You can’t perform that action at this time.