Skip to content

Commit

Permalink
Aggregation refactoring, fixes #311 (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
André Petermann authored and s1ck committed Aug 29, 2016
1 parent 5a22952 commit 540fa5d
Show file tree
Hide file tree
Showing 61 changed files with 2,187 additions and 1,614 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,17 @@
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.pojo.GraphHead;
import org.gradoop.examples.utils.ExampleOutput;
import org.gradoop.flink.model.api.functions.ApplyAggregateFunction;
import org.gradoop.flink.model.api.functions.TransformationFunction;
import org.gradoop.flink.model.api.functions.VertexAggregateFunction;
import org.gradoop.flink.model.impl.GraphCollection;
import org.gradoop.flink.model.impl.LogicalGraph;
import org.gradoop.flink.algorithms.btgs.BusinessTransactionGraphs;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.impl.operators.aggregation.ApplyAggregation;
import org.gradoop.flink.model.impl.operators.aggregation.functions.count.Count;
import org.gradoop.flink.model.impl.operators.aggregation.functions.bool.Or;
import org.gradoop.flink.model.impl.operators.transformation.ApplyTransformation;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex;
Expand Down Expand Up @@ -72,13 +68,12 @@ public static void main(String[] args) throws Exception {
.callForCollection(new BusinessTransactionGraphs());

btgs = btgs
.apply(new ApplyAggregation("isClosed", new IsClosedAggregateFunction()));
.apply(new ApplyAggregation(new IsClosedAggregateFunction()));

btgs = btgs.select(new IsClosedPredicateFunction());

btgs = btgs
.apply(new ApplyAggregation(
"soCount", new CountSalesOrdersAggregateFunction()));
.apply(new ApplyAggregation(new CountSalesOrdersAggregateFunction()));

out.add("Business Transaction Graphs with Measures", btgs);

Expand Down Expand Up @@ -126,52 +121,22 @@ public static LogicalGraph getIntegratedInstanceGraph() throws IOException {
/**
* Aggregate function to determine "isClosed" measure
*/
private static class IsClosedAggregateFunction
implements ApplyAggregateFunction {
private static class IsClosedAggregateFunction extends Or
implements VertexAggregateFunction {

@Override
public DataSet<Tuple2<GradoopId, PropertyValue>> execute(
GraphCollection collection) {

return collection.getVertices()
.flatMap(new FlatMapFunction<Vertex, Tuple2<GradoopId, Integer>>() {
@Override
public void flatMap(Vertex vertex,
Collector<Tuple2<GradoopId, Integer>> collector) throws Exception {

for (GradoopId graphId : vertex.getGraphIds()) {
Integer openQuotation = vertex.getLabel().equals("Quotation") &&
vertex.getPropertyValue("status").toString().equals("open") ?
1 : 0;

collector.collect(new Tuple2<>(graphId, openQuotation));
}
}
})
.groupBy(0).sum(1)
.map(
new MapFunction<Tuple2<GradoopId, Integer>,
Tuple2<GradoopId, PropertyValue>>() {

@Override
public Tuple2<GradoopId, PropertyValue> map(
Tuple2<GradoopId, Integer> openQuotations) throws
Exception {

Boolean isClosed = openQuotations.f1.equals(0);

return new Tuple2<>(
openQuotations.f0, PropertyValue.create(isClosed));
}
});
public String getAggregatePropertyKey() {
return "isClosed";
}

/**
* {@inheritDoc}
*/
@Override
public Number getDefaultValue() {
return 0;
public PropertyValue getVertexIncrement(Vertex vertex) {

boolean isClosedQuotation =
vertex.getLabel().equals("SalesQuotation") &&
!vertex.getPropertyValue("status").toString().equals("open");

return PropertyValue.create(isClosedQuotation);
}
}

Expand All @@ -191,48 +156,17 @@ public boolean filter(GraphHead graphHead) throws Exception {
* Aggregate function to count sales orders per graph.
*/
private static class CountSalesOrdersAggregateFunction
implements ApplyAggregateFunction {
extends Count implements VertexAggregateFunction {

@Override
public DataSet<Tuple2<GradoopId, PropertyValue>> execute(
GraphCollection collection) {

return collection.getVertices()
.flatMap(new FlatMapFunction<Vertex, Tuple2<GradoopId, Integer>>() {
@Override
public void flatMap(Vertex vertex,
Collector<Tuple2<GradoopId, Integer>> collector) throws Exception {

for (GradoopId graphId : vertex.getGraphIds()) {
Integer foundSalesOrder =
vertex.getLabel().equals("SalesOrder") ? 1 : 0;

collector.collect(new Tuple2<>(graphId, foundSalesOrder));
}
}
})
.groupBy(0).sum(1)
.map(
new MapFunction<Tuple2<GradoopId, Integer>,
Tuple2<GradoopId, PropertyValue>>() {

@Override
public Tuple2<GradoopId, PropertyValue> map(
Tuple2<GradoopId, Integer> salesOrderCount) throws
Exception {

return new Tuple2<>(
salesOrderCount.f0, PropertyValue.create(salesOrderCount.f1));
}
});
public PropertyValue getVertexIncrement(Vertex vertex) {
return PropertyValue.create(
vertex.getLabel().equals("SalesOrder") ? 1 : 0);
}

/**
* {@inheritDoc}
*/
@Override
public Number getDefaultValue() {
return 0;
public String getAggregatePropertyKey() {
return "salesOrderCount";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,34 @@

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.hadoop.util.Time;
import org.gradoop.common.cache.DistributedCache;
import org.gradoop.common.cache.api.DistributedCacheServer;
import org.gradoop.common.model.api.entities.EPGMAttributed;
import org.gradoop.common.model.api.entities.EPGMLabeled;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.GraphHead;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.common.model.impl.properties.PropertyValues;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.algorithms.btgs.BusinessTransactionGraphs;
import org.gradoop.flink.algorithms.fsm.TransactionalFSM;
import org.gradoop.flink.algorithms.fsm.config.FSMConfig;
import org.gradoop.flink.algorithms.fsm.gspan.functions.DecodeDFSCodes;
import org.gradoop.flink.io.impl.dot.DOTDataSink;
import org.gradoop.flink.io.impl.json.JSONDataSource;
import org.gradoop.flink.model.api.functions.ApplyAggregateFunction;
import org.gradoop.flink.model.api.functions.TransformationFunction;
import org.gradoop.flink.model.api.functions.VertexAggregateFunction;
import org.gradoop.flink.model.impl.GraphCollection;
import org.gradoop.flink.model.impl.LogicalGraph;
import org.gradoop.flink.model.impl.operators.aggregation.ApplyAggregation;
import org.gradoop.flink.model.impl.operators.aggregation.functions.sum.Sum;
import org.gradoop.flink.model.impl.operators.transformation
.ApplyTransformation;
import org.gradoop.flink.util.GradoopFlinkConfig;

import java.math.BigDecimal;
import java.util.Iterator;

/**
* Example workflow of paper "Scalable Business Intelligence with Graph
Expand Down Expand Up @@ -126,7 +120,7 @@ public static void main(String[] args) throws Exception {
.callForCollection(new BusinessTransactionGraphs());

// (3) aggregate financial result
btgs = btgs.apply(new ApplyAggregation(RESULT_KEY, new Result()));
btgs = btgs.apply(new ApplyAggregation(new Result()));

// (4) filter by loss (negative financialResult)
btgs = btgs.select(new Loss());
Expand All @@ -151,7 +145,7 @@ public static void main(String[] args) throws Exception {
// (7) Check, if frequent subgraph contains master data

frequentSubgraphs = frequentSubgraphs.apply(
new ApplyAggregation(MASTERDATA_KEY, new DetermineMasterDataSurplus()));
new ApplyAggregation(new DetermineMasterDataSurplus()));

// (8) Select graphs containing master data

Expand Down Expand Up @@ -186,7 +180,8 @@ public static void main(String[] args) throws Exception {
/**
* Calculate the financial result of business transaction graphs.
*/
private static class Result implements ApplyAggregateFunction {
private static class Result
extends Sum implements VertexAggregateFunction {

/**
* Property key for revenue values.
Expand All @@ -197,108 +192,47 @@ private static class Result implements ApplyAggregateFunction {
*/
private static final String EXPENSE_KEY = "expense";


@Override
public DataSet<Tuple2<GradoopId, PropertyValue>> execute(
GraphCollection collection) {

return collection.getVertices()
.flatMap(new FlatMapFunction<Vertex, Tuple2<GradoopId, BigDecimal>>() {

@Override
public void flatMap(Vertex value,
Collector<Tuple2<GradoopId, BigDecimal>> out) throws Exception {

if (value.hasProperty(REVENUE_KEY)) {
for (GradoopId graphId : value.getGraphIds()) {
out.collect(new Tuple2<>(
graphId, value.getPropertyValue(REVENUE_KEY).getBigDecimal())
);
}
} else if (value.hasProperty(EXPENSE_KEY)) {
for (GradoopId graphId : value.getGraphIds()) {
out.collect(new Tuple2<>(
graphId, value.getPropertyValue(EXPENSE_KEY).getBigDecimal()
.multiply(BigDecimal.valueOf(-1)))
);
}
}
}
})
.groupBy(0)
.reduceGroup(
new GroupReduceFunction<Tuple2<GradoopId, BigDecimal>,
Tuple2<GradoopId, PropertyValue>>() {

@Override
public void reduce(Iterable<Tuple2<GradoopId, BigDecimal>> values,
Collector<Tuple2<GradoopId, PropertyValue>> out) throws
Exception {

Iterator<Tuple2<GradoopId, BigDecimal>> iterator = values
.iterator();

Tuple2<GradoopId, BigDecimal> first = iterator.next();

GradoopId graphId = first.f0;
BigDecimal sum = first.f1;

while (iterator.hasNext()) {
sum = sum.add(iterator.next().f1);
}

out.collect(new Tuple2<>(graphId, PropertyValue.create(sum)));
}
});
public PropertyValue getVertexIncrement(Vertex vertex) {
PropertyValue increment;

if (vertex.hasProperty(REVENUE_KEY)) {
increment = vertex.getPropertyValue(REVENUE_KEY);

} else if (vertex.hasProperty(EXPENSE_KEY)) {
PropertyValue expense = vertex.getPropertyValue(EXPENSE_KEY);
increment = PropertyValues.Numeric
.multiply(expense, PropertyValue.create(-1));

} else {
increment = PropertyValue.create(0);
}

return increment;
}

@Override
public Number getDefaultValue() {
return BigDecimal.valueOf(0);
public String getAggregatePropertyKey() {
return RESULT_KEY;
}
}

/**
* Counts master data vertices less than the number of transactional vertices.
*/
private static class DetermineMasterDataSurplus
implements ApplyAggregateFunction {
extends Sum implements VertexAggregateFunction {

@Override
public DataSet<Tuple2<GradoopId, PropertyValue>> execute(
GraphCollection collection) {
return collection
.getVertices()
.map(new MapFunction<Vertex, Tuple2<GradoopId, Integer>>() {
@Override

public Tuple2<GradoopId, Integer> map(Vertex value) throws
Exception {

GradoopId graphId = value.getGraphIds().iterator().next();

return value.getLabel().startsWith(MASTER_PREFIX) ?
new Tuple2<>(graphId, 1) :
new Tuple2<>(graphId, -1);
}
})
.groupBy(0)
.sum(1)
.map(
new MapFunction<Tuple2<GradoopId, Integer>,
Tuple2<GradoopId, PropertyValue>>() {


@Override
public Tuple2<GradoopId, PropertyValue> map(
Tuple2<GradoopId, Integer> value) throws Exception {
return new Tuple2<>(value.f0, PropertyValue.create(value.f1));
}
});
public PropertyValue getVertexIncrement(Vertex vertex) {
return vertex.getLabel().startsWith(MASTER_PREFIX) ?
PropertyValue.create(1) : PropertyValue.create(-1);
}

@Override
public Number getDefaultValue() {
return 0;
public String getAggregatePropertyKey() {
return MASTERDATA_KEY;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public boolean filter(Edge edge) throws Exception {
}
})
.groupBy(Lists.newArrayList("gender", "city"))
.aggregate("vertexCount", new VertexCount())
.aggregate("edgeCount", new EdgeCount());
.aggregate(new VertexCount())
.aggregate(new EdgeCount());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public Edge execute(Edge current, Edge transformed) {
// 3b) separate communities
.splitBy(label)
// 4) compute vertex count per community
.apply(new ApplyAggregation(vertexCount, new VertexCount()))
.apply(new ApplyAggregation(new VertexCount()))
// 5) select graphs with more than minClusterSize vertices
.select(new FilterFunction<GraphHead>() {
@Override
Expand All @@ -231,9 +231,9 @@ public boolean filter(GraphHead g) throws Exception {
// 7) group that graph by vertex properties
.groupBy(Lists.newArrayList(city, gender))
// 8a) count vertices of grouped graph
.aggregate(vertexCount, new VertexCount())
.aggregate(new VertexCount())
// 8b) count edges of grouped graph
.aggregate(edgeCount, new EdgeCount());
.aggregate(new EdgeCount());
}

@Override
Expand Down
Loading

0 comments on commit 540fa5d

Please sign in to comment.