Skip to content
Permalink
Browse files

[#1323] Use getFactory instead of getConfig in operators, when possib…

…le. (#1328)

fixes #1323
  • Loading branch information...
boggledekoog authored and ChrizZz110 committed Jul 17, 2019
1 parent de49d51 commit 26eabd0829b5d2fd8afc9ad30a272f0793b21b46
Showing with 106 additions and 103 deletions.
  1. +2 −1 ...a-integration/src/main/java/org/gradoop/dataintegration/importer/impl/csv/MinimalCSVImporter.java
  2. +2 −1 ...integration/src/main/java/org/gradoop/dataintegration/importer/impl/json/MinimalJSONImporter.java
  3. +1 −2 ...tion/src/main/java/org/gradoop/dataintegration/transformation/impl/ExtractPropertyFromVertex.java
  4. +2 −2 ...gration/src/test/java/org/gradoop/dataintegration/importer/impl/json/MinimalJSONImporterTest.java
  5. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/btgs/BusinessTransactionGraphs.java
  6. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/fsm/TransactionalFSM.java
  7. +1 −1 ...-flink/src/main/java/org/gradoop/flink/algorithms/fsm/transactional/tle/TransactionalFSMBase.java
  8. +1 −1 ...radoop/flink/algorithms/gelly/clusteringcoefficient/GellyGlobalClusteringCoefficientDirected.java
  9. +1 −1 ...doop/flink/algorithms/gelly/clusteringcoefficient/GellyGlobalClusteringCoefficientUndirected.java
  10. +1 −1 ...gradoop/flink/algorithms/gelly/clusteringcoefficient/GellyLocalClusteringCoefficientDirected.java
  11. +1 −1 ...adoop/flink/algorithms/gelly/clusteringcoefficient/GellyLocalClusteringCoefficientUndirected.java
  12. +1 −1 ...ava/org/gradoop/flink/algorithms/gelly/connectedcomponents/AnnotateWeaklyConnectedComponents.java
  13. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/hits/HITS.java
  14. +1 −1 ...oop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/labelpropagation/LabelPropagation.java
  15. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/pagerank/PageRank.java
  16. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/randomjump/KRandomJumpGellyVCI.java
  17. +1 −1 ...link/src/main/java/org/gradoop/flink/algorithms/gelly/trianglecounting/GellyTriangleCounting.java
  18. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/io/impl/csv/CSVDataSink.java
  19. +1 −2 gradoop-flink/src/main/java/org/gradoop/flink/io/impl/csv/CSVDataSource.java
  20. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/io/impl/csv/indexed/IndexedCSVDataSink.java
  21. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/io/impl/deprecated/json/JSONDataSink.java
  22. +2 −1 ...src/main/java/org/gradoop/flink/io/impl/deprecated/logicalgraphcsv/LogicalGraphCSVDataSource.java
  23. +7 −4 ...n/java/org/gradoop/flink/io/impl/deprecated/logicalgraphcsv/LogicalGraphIndexedCSVDataSource.java
  24. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/io/impl/dot/DOTDataSink.java
  25. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/io/impl/gdl/GDLDataSink.java
  26. +8 −4 gradoop-flink/src/main/java/org/gradoop/flink/io/impl/graph/GraphDataSource.java
  27. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/io/impl/tlf/TLFDataSink.java
  28. +2 −2 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/aggregation/ApplyAggregation.java
  29. +1 −1 ...in/java/org/gradoop/flink/model/impl/operators/base/BinaryCollectionToCollectionOperatorBase.java
  30. +1 −1 ...oop-flink/src/main/java/org/gradoop/flink/model/impl/operators/combination/ReduceCombination.java
  31. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/distinction/DistinctById.java
  32. +4 −3 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/equality/GraphEquality.java
  33. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/exclusion/ReduceExclusion.java
  34. +2 −2 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/fusion/VertexFusion.java
  35. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/limit/Limit.java
  36. +3 −5 ...flink/model/impl/operators/matching/single/preserving/explorative/ExplorativePatternMatching.java
  37. +10 −11 ...n/java/org/gradoop/flink/model/impl/operators/matching/single/simulation/dual/DualSimulation.java
  38. +2 −2 ...a/org/gradoop/flink/model/impl/operators/matching/transactional/TransactionalPatternMatching.java
  39. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/overlap/ReduceOverlap.java
  40. +7 −7 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/rollup/RollUp.java
  41. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/PageRankSampling.java
  42. +1 −1 ...src/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomNonUniformVertexSampling.java
  43. +1 −1 ...gradoop/flink/model/impl/operators/sampling/functions/FilterVerticesWithDegreeOtherThanGiven.java
  44. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/selection/Selection.java
  45. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/selection/SelectionBase.java
  46. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/split/Split.java
  47. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/AverageDegree.java
  48. +1 −1 ...-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/AverageIncomingDegree.java
  49. +1 −1 ...-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/AverageOutgoingDegree.java
  50. +1 −1 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/statistics/GraphDensity.java
  51. +5 −8 ...link/src/main/java/org/gradoop/flink/model/impl/operators/transformation/ApplyTransformation.java
  52. +1 −1 gradoop-flink/src/test/java/org/gradoop/flink/io/impl/csv/CSVDataSinkTest.java
  53. +1 −1 gradoop-flink/src/test/java/org/gradoop/flink/io/impl/csv/indexed/IndexedCSVDataSinkTest.java
  54. +2 −2 gradoop-flink/src/test/java/org/gradoop/flink/model/impl/GradoopFlinkTestUtils.java
  55. +3 −3 gradoop-flink/src/test/java/org/gradoop/flink/model/impl/GraphTransactionTest.java
  56. +1 −1 gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/difference/DifferenceTest.java
  57. +1 −1 gradoop-flink/src/test/java/org/gradoop/flink/model/impl/operators/fusion/VertexFusionUtils.java
  58. +1 −1 ...p-store/gradoop-accumulo/src/main/java/org/gradoop/storage/impl/accumulo/io/AccumuloDataSink.java
  59. +1 −1 gradoop-store/gradoop-hbase/src/main/java/org/gradoop/storage/impl/hbase/io/HBaseDataSink.java
@@ -147,7 +147,8 @@ public LogicalGraph getLogicalGraph() throws IOException {

@Override
public GraphCollection getGraphCollection() throws IOException {
return config.getGraphCollectionFactory().fromGraph(getLogicalGraph());
LogicalGraph logicalGraph = getLogicalGraph();
return logicalGraph.getCollectionFactory().fromGraph(logicalGraph);
}

/**
@@ -65,6 +65,7 @@ public LogicalGraph getLogicalGraph() {

@Override
public GraphCollection getGraphCollection() {
return config.getGraphCollectionFactory().fromGraph(getLogicalGraph());
LogicalGraph logicalGraph = getLogicalGraph();
return logicalGraph.getCollectionFactory().fromGraph(logicalGraph);
}
}
@@ -165,8 +165,7 @@ public LogicalGraph execute(LogicalGraph logicalGraph) {
.union(edges);
}

return logicalGraph.getConfig()
.getLogicalGraphFactory()
return logicalGraph.getFactory()
.fromDataSets(logicalGraph.getGraphHead(), vertices, edges);
}
}
@@ -61,7 +61,7 @@ public void testReadDir() throws Exception {
LogicalGraph read = dataImport.getLogicalGraph();
LogicalGraph expected = loader.getLogicalGraph();
GraphCollection expectedCollection =
getConfig().getGraphCollectionFactory().fromGraph(expected);
expected.getCollectionFactory().fromGraph(expected);

collectAndAssertTrue(expected.equalsByElementData(read));
collectAndAssertTrue(dataImport.getGraphCollection()
@@ -79,7 +79,7 @@ public void testReadFile() throws Exception {
LogicalGraph read = dataImport.getLogicalGraph();
LogicalGraph expected = loader.getLogicalGraphByVariable("expected2");
GraphCollection expectedCollection =
getConfig().getGraphCollectionFactory().fromGraph(expected);
expected.getCollectionFactory().fromGraph(expected);

collectAndAssertTrue(expected.equalsByElementData(read));
collectAndAssertTrue(dataImport.getGraphCollection()
@@ -144,7 +144,7 @@ public GraphCollection execute(LogicalGraph iig) {
.where(new Id<>()).equalTo(0)
.with(new SetBtgIds<>());

return iig.getConfig().getGraphCollectionFactory()
return iig.getCollectionFactory()
.fromDataSets(graphHeads, transVertices.union(masterVertices), btgEdges);
}
}
@@ -66,7 +66,7 @@ public GraphCollection execute(GraphCollection collection) {
DataSet<GraphTransaction> output = dimSpan.execute(input);

// convert to Gradoop graph collection
return collection.getConfig().getGraphCollectionFactory().fromTransactions(output);
return collection.getFactory().fromTransactions(output);
}

@Override
@@ -77,7 +77,7 @@ public GraphCollection execute(GraphCollection collection) {

DataSet<GraphTransaction> output = execute(input);

return config.getGraphCollectionFactory()
return collection.getFactory()
.fromTransactions(output);
}

@@ -61,7 +61,7 @@ protected LogicalGraph executeInternal(Graph<GradoopId, NullValue, NullValue> ge
.map(new WritePropertyToGraphHeadMap(ClusteringCoefficientBase.PROPERTY_KEY_GLOBAL,
PropertyValue.create(globalValue)));

return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
return currentGraph.getFactory().fromDataSets(
resultHead, currentGraph.getVertices(), currentGraph.getEdges());
}
}
@@ -62,7 +62,7 @@ protected LogicalGraph executeInternal(Graph<GradoopId, NullValue, NullValue> ge
.map(new WritePropertyToGraphHeadMap(ClusteringCoefficientBase.PROPERTY_KEY_GLOBAL,
PropertyValue.create(globalValue)));

return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
return currentGraph.getFactory().fromDataSets(
resultHead, currentGraph.getVertices(), currentGraph.getEdges());
}
}
@@ -57,7 +57,7 @@ protected LogicalGraph executeInternal(Graph<GradoopId, NullValue, NullValue> ge
.where(0).equalTo(new Id<>())
.with(new LocalCCResultTupleToVertexJoin());

return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
return currentGraph.getFactory().fromDataSets(
currentGraph.getGraphHead(), resultVertices, currentGraph.getEdges());
}
}
@@ -58,7 +58,7 @@ protected LogicalGraph executeInternal(Graph<GradoopId, NullValue, NullValue> ge
.where(0).equalTo(new Id<>())
.with(new LocalCCResultTupleToVertexJoin());

return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
return currentGraph.getFactory().fromDataSets(
currentGraph.getGraphHead(), resultVertices, currentGraph.getEdges());
}
}
@@ -104,7 +104,7 @@ public LogicalGraph executeInGelly(Graph<GradoopId, GradoopId, NullValue> graph)
.with(new VertexPropertyToEdgePropertyJoin(propertyKey));
}

return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
return currentGraph.getFactory().fromDataSets(
currentGraph.getGraphHead(), annotatedVertices, edges);
}
}
@@ -110,7 +110,7 @@ public LogicalGraph executeInGelly(Graph<GradoopId, NullValue, NullValue> graph)
.where(new HitsResultKeySelector()).equalTo(new Id<>())
.with(new HITSToAttributes(authorityPropertyKey, hubPropertyKey));

return currentGraph.getConfig().getLogicalGraphFactory()
return currentGraph.getFactory()
.fromDataSets(currentGraph.getGraphHead(), newVertices, currentGraph.getEdges());
}

@@ -76,7 +76,7 @@ public LogicalGraph executeInGelly(Graph<GradoopId, PropertyValue, NullValue> gr
.with(new LPVertexJoin(propertyKey));

// return labeled graph
return currentGraph.getConfig().getLogicalGraphFactory()
return currentGraph.getFactory()
.fromDataSets(currentGraph.getGraphHead(), labeledVertices, currentGraph.getEdges());
}

@@ -96,7 +96,7 @@ public LogicalGraph executeInGelly(Graph<GradoopId, NullValue, NullValue> graph)
.join(currentGraph.getVertices())
.where(new PageRankResultKey()).equalTo(new Id<>())
.with(new PageRankToAttribute(propertyKey));
return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
return currentGraph.getFactory().fromDataSets(
currentGraph.getGraphHead(), newVertices, currentGraph.getEdges());
}
}
@@ -220,7 +220,7 @@ public LogicalGraph executeInGelly(Graph<Long, VCIVertexValue, Long> gellyGraph)
.with(new VertexWithVisitedSourceTargetIdJoin(SamplingConstants.PROPERTY_KEY_SAMPLED));

// return graph
return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
return currentGraph.getFactory().fromDataSets(
currentGraph.getGraphHead(), visitedVertices, visitedEdges);
}

@@ -62,7 +62,7 @@ public LogicalGraph executeInGelly(Graph<GradoopId, NullValue, NullValue> graph)
.map(new WritePropertyToGraphHeadMap(
PROPERTY_KEY_TRIANGLES, PropertyValue.create(triangles.count())));

return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
return currentGraph.getFactory().fromDataSets(
resultHead, currentGraph.getVertices(), currentGraph.getEdges());
}
}
@@ -77,7 +77,7 @@ public void write(GraphCollection graphCollection) throws IOException {

@Override
public void write(LogicalGraph logicalGraph, boolean overwrite) throws IOException {
write(logicalGraph.getConfig().getGraphCollectionFactory().fromGraph(logicalGraph), overwrite);
write(logicalGraph.getCollectionFactory().fromGraph(logicalGraph), overwrite);
}

@Override
@@ -63,7 +63,7 @@ public CSVDataSource(String csvPath, GradoopFlinkConfig config) {
@Override
public LogicalGraph getLogicalGraph() {
GraphCollection collection = getGraphCollection();
return getConfig().getLogicalGraphFactory()
return collection.getGraphFactory()
.fromDataSets(
collection.getGraphHeads().first(1), collection.getVertices(), collection.getEdges());
}
@@ -89,7 +89,6 @@ public GraphCollection getGraphCollection() {
.map(new CSVLineToEdge(factory.getEdgeFactory()))
.withBroadcastSet(metaData, BC_METADATA);


return factory.fromDataSets(graphHeads, vertices, edges);
}
}
@@ -80,7 +80,7 @@ public void write(GraphCollection graphCollection) throws IOException {

@Override
public void write(LogicalGraph logicalGraph, boolean overwrite) throws IOException {
write(logicalGraph.getConfig().getGraphCollectionFactory().fromGraph(logicalGraph), overwrite);
write(logicalGraph.getCollectionFactory().fromGraph(logicalGraph), overwrite);
}

@Override
@@ -76,7 +76,7 @@ public void write(GraphCollection graphCollection) throws IOException {

@Override
public void write(LogicalGraph logicalGraph, boolean overwrite) throws IOException {
write(logicalGraph.getConfig().getGraphCollectionFactory().fromGraph(logicalGraph), overwrite);
write(logicalGraph.getCollectionFactory().fromGraph(logicalGraph), overwrite);
}

@Override
@@ -69,7 +69,8 @@ public LogicalGraph getLogicalGraph() {

@Override
public GraphCollection getGraphCollection() {
return getConfig().getGraphCollectionFactory().fromGraph(getLogicalGraph());
LogicalGraph logicalGraph = getLogicalGraph();
return logicalGraph.getCollectionFactory().fromGraph(logicalGraph);
}
}

@@ -26,6 +26,7 @@
import org.gradoop.flink.io.api.DataSource;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.epgm.LogicalGraphFactory;
import org.gradoop.flink.util.GradoopFlinkConfig;

import java.io.IOException;
@@ -81,8 +82,9 @@ public LogicalGraph getLogicalGraph() throws IOException {
MetaData metaData = MetaData.fromFile(getMetaDataPath(), hdfsConfig);

ExecutionEnvironment env = getConfig().getExecutionEnvironment();
VertexFactory<EPGMVertex> vertexFactory = getConfig().getLogicalGraphFactory().getVertexFactory();
EdgeFactory<EPGMEdge> edgeFactory = getConfig().getLogicalGraphFactory().getEdgeFactory();
LogicalGraphFactory factory = getConfig().getLogicalGraphFactory();
VertexFactory<EPGMVertex> vertexFactory = factory.getVertexFactory();
EdgeFactory<EPGMEdge> edgeFactory = factory.getEdgeFactory();

Map<String, DataSet<EPGMVertex>> vertices = metaData.getVertexLabels().stream()
.map(l -> Tuple2.of(l, env.readTextFile(getVertexCSVPath(l))
@@ -96,11 +98,12 @@ public LogicalGraph getLogicalGraph() throws IOException {
.withBroadcastSet(MetaData.fromFile(getMetaDataPath(), getConfig()), BC_METADATA)))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));

return getConfig().getLogicalGraphFactory().fromIndexedDataSets(vertices, edges);
return factory.fromIndexedDataSets(vertices, edges);
}

@Override
public GraphCollection getGraphCollection() throws IOException {
return getConfig().getGraphCollectionFactory().fromGraph(getLogicalGraph());
LogicalGraph logicalGraph = getLogicalGraph();
return logicalGraph.getCollectionFactory().fromGraph(logicalGraph);
}
}
@@ -87,7 +87,7 @@ public void write(GraphCollection graphCollection) throws

@Override
public void write(LogicalGraph graph, boolean overwrite) throws IOException {
write(graph.getConfig().getGraphCollectionFactory().fromGraph(graph), overwrite);
write(graph.getCollectionFactory().fromGraph(graph), overwrite);
}

@Override
@@ -57,7 +57,7 @@ public void write(GraphCollection graphCollection) throws IOException {

@Override
public void write(LogicalGraph logicalGraph, boolean overwrite) throws IOException {
write(logicalGraph.getConfig().getGraphCollectionFactory().fromGraph(logicalGraph), overwrite);
write(logicalGraph.getCollectionFactory().fromGraph(logicalGraph), overwrite);
}

@Override
@@ -31,6 +31,7 @@
import org.gradoop.flink.io.impl.graph.tuples.ImportVertex;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.epgm.LogicalGraphFactory;
import org.gradoop.flink.model.impl.functions.tuple.Project3To0And1;
import org.gradoop.flink.model.impl.functions.tuple.Value2Of3;
import org.gradoop.flink.util.GradoopFlinkConfig;
@@ -115,9 +116,11 @@ public LogicalGraph getLogicalGraph() {
TypeInformation<K> externalIdType = ((TupleTypeInfo<?>) importVertices
.getType()).getTypeAt(0);

LogicalGraphFactory factory = config.getLogicalGraphFactory();

DataSet<Tuple3<K, GradoopId, EPGMVertex>> vertexTriples = importVertices
.map(new InitVertex<K>(
config.getLogicalGraphFactory().getVertexFactory(), lineagePropertyKey, externalIdType));
factory.getVertexFactory(), lineagePropertyKey, externalIdType));

DataSet<EPGMVertex> epgmVertices = vertexTriples
.map(new Value2Of3<K, GradoopId, EPGMVertex>());
@@ -129,16 +132,17 @@ public LogicalGraph getLogicalGraph() {
.join(vertexIdPair)
.where(1).equalTo(0)
.with(new InitEdge<K>(
config.getLogicalGraphFactory().getEdgeFactory(), lineagePropertyKey, externalIdType))
factory.getEdgeFactory(), lineagePropertyKey, externalIdType))
.join(vertexIdPair)
.where(0).equalTo(0)
.with(new UpdateEdge<EPGMEdge, K>());

return config.getLogicalGraphFactory().fromDataSets(epgmVertices, epgmEdges);
return factory.fromDataSets(epgmVertices, epgmEdges);
}

@Override
public GraphCollection getGraphCollection() throws IOException {
return config.getGraphCollectionFactory().fromGraph(getLogicalGraph());
LogicalGraph logicalGraph = getLogicalGraph();
return logicalGraph.getCollectionFactory().fromGraph(logicalGraph);
}
}
@@ -74,7 +74,7 @@ public void write(GraphCollection graphCollection) throws

@Override
public void write(LogicalGraph logicalGraph, boolean overwrite) throws IOException {
write(logicalGraph.getConfig().getGraphCollectionFactory().fromGraph(logicalGraph), overwrite);
write(logicalGraph.getCollectionFactory().fromGraph(logicalGraph), overwrite);
}

@Override
@@ -82,7 +82,7 @@ public GraphCollection executeForGVELayout(GraphCollection collection) {
.where(new Id<>()).equalTo(0)
.with(new SetAggregateProperties(aggregateFunctions));

return collection.getConfig().getGraphCollectionFactory()
return collection.getFactory()
.fromDataSets(graphHeads, collection.getVertices(), collection.getEdges());
}

@@ -91,7 +91,7 @@ public GraphCollection executeForTxLayout(GraphCollection collection) {
DataSet<GraphTransaction> updatedTransactions = collection.getGraphTransactions()
.map(new AggregateTransactions(aggregateFunctions));

return collection.getConfig().getGraphCollectionFactory().fromTransactions(updatedTransactions);
return collection.getFactory().fromTransactions(updatedTransactions);
}

/**
@@ -51,7 +51,7 @@ public GraphCollection execute(
final DataSet<EPGMVertex> newVertices = computeNewVertices(newGraphHeads);
final DataSet<EPGMEdge> newEdges = computeNewEdges(newVertices);

return firstCollection.getConfig().getGraphCollectionFactory()
return firstCollection.getFactory()
.fromDataSets(newGraphHeads, newVertices, newEdges);
}

@@ -28,7 +28,7 @@

@Override
public LogicalGraph execute(GraphCollection collection) {
return collection.getConfig().getLogicalGraphFactory().fromDataSets(
return collection.getGraphFactory().fromDataSets(
collection.getVertices(), collection.getEdges());
}
}
@@ -27,7 +27,7 @@

@Override
public GraphCollection execute(GraphCollection collection) {
return collection.getConfig().getGraphCollectionFactory().fromDataSets(
return collection.getFactory().fromDataSets(
collection.getGraphHeads().distinct(new Id<>()),
collection.getVertices(),
collection.getEdges());
@@ -19,8 +19,9 @@
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.api.operators.BinaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.epgm.GraphCollectionFactory;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.tostring.api.EdgeToString;
import org.gradoop.flink.model.impl.operators.tostring.api.GraphHeadToString;
@@ -61,8 +62,8 @@ public GraphEquality(GraphHeadToString<EPGMGraphHead> graphHeadToString,

@Override
public DataSet<Boolean> execute(LogicalGraph firstGraph, LogicalGraph secondGraph) {
GraphCollectionFactory collectionFactory = firstGraph.getConfig()
.getGraphCollectionFactory();
BaseGraphCollectionFactory<EPGMGraphHead, EPGMVertex, EPGMEdge, LogicalGraph, GraphCollection>
collectionFactory = firstGraph.getCollectionFactory();
return collectionEquality
.execute(collectionFactory.fromGraph(firstGraph), collectionFactory.fromGraph(secondGraph));
}
@@ -68,7 +68,7 @@ public LogicalGraph execute(GraphCollection collection) {
.filter(new NotInGraphsBroadcast<>())
.withBroadcastSet(excludedGraphIds, NotInGraphsBroadcast.GRAPH_IDS);

return collection.getConfig().getLogicalGraphFactory()
return collection.getGraphFactory()
.fromDataSets(collection.getGraphHeads().filter(new BySameId<>(startId)), vertices, edges);
}
}

0 comments on commit 26eabd0

Please sign in to comment.
You can’t perform that action at this time.