-
Notifications
You must be signed in to change notification settings - Fork 88
/
GraphCollection.java
400 lines (342 loc) · 13.6 KB
/
GraphCollection.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.epgm;
import org.apache.commons.lang.NotImplementedException;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.GraphHead;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.common.util.Order;
import org.gradoop.flink.io.api.DataSink;
import org.gradoop.flink.io.impl.gdl.GDLConsoleOutput;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.api.epgm.GraphCollectionOperators;
import org.gradoop.flink.model.api.functions.GraphHeadReduceFunction;
import org.gradoop.flink.model.api.layouts.GraphCollectionLayout;
import org.gradoop.flink.model.api.operators.ApplicableUnaryGraphToGraphOperator;
import org.gradoop.flink.model.api.operators.BinaryCollectionToCollectionOperator;
import org.gradoop.flink.model.api.operators.ReducibleBinaryGraphToGraphOperator;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphCollectionToBaseGraphCollectionOperator;
import org.gradoop.flink.model.api.operators.UnaryCollectionToGraphOperator;
import org.gradoop.flink.model.impl.functions.bool.Not;
import org.gradoop.flink.model.impl.functions.bool.Or;
import org.gradoop.flink.model.impl.functions.bool.True;
import org.gradoop.flink.model.impl.functions.epgm.BySameId;
import org.gradoop.flink.model.impl.functions.graphcontainment.InAnyGraph;
import org.gradoop.flink.model.impl.functions.graphcontainment.InGraph;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
import org.gradoop.flink.model.impl.operators.difference.Difference;
import org.gradoop.flink.model.impl.operators.difference.DifferenceBroadcast;
import org.gradoop.flink.model.impl.operators.distinction.DistinctById;
import org.gradoop.flink.model.impl.operators.distinction.DistinctByIsomorphism;
import org.gradoop.flink.model.impl.operators.distinction.GroupByIsomorphism;
import org.gradoop.flink.model.impl.operators.equality.CollectionEquality;
import org.gradoop.flink.model.impl.operators.equality.CollectionEqualityByGraphIds;
import org.gradoop.flink.model.impl.operators.intersection.Intersection;
import org.gradoop.flink.model.impl.operators.intersection.IntersectionBroadcast;
import org.gradoop.flink.model.impl.operators.limit.Limit;
import org.gradoop.flink.model.impl.operators.matching.transactional.TransactionalPatternMatching;
import org.gradoop.flink.model.impl.operators.matching.transactional.algorithm.PatternMatchingAlgorithm;
import org.gradoop.flink.model.impl.operators.selection.Selection;
import org.gradoop.flink.model.impl.operators.tostring.functions.EdgeToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.EdgeToIdString;
import org.gradoop.flink.model.impl.operators.tostring.functions.GraphHeadToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.GraphHeadToEmptyString;
import org.gradoop.flink.model.impl.operators.tostring.functions.VertexToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.VertexToIdString;
import org.gradoop.flink.model.impl.operators.union.Union;
import org.gradoop.flink.util.GradoopFlinkConfig;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
/**
* A graph collection graph is one of the base concepts of the Extended Property Graph Model. From
* a model perspective, the collection represents a set of logical graphs. From a data perspective
* this is reflected by providing three concepts:
*
* - a set of graph heads assigned to the graphs in that collection
* - a set of vertices which is the union of all vertex sets of the represented graphs
* - a set of edges which is the union of all edge sets of the represented graphs
*
* Furthermore, a graph collection provides operations that are performed on the underlying data.
* These operations result in either another graph collection or in a {@link LogicalGraph}.
*
* A graph collection is wrapping a {@link GraphCollectionLayout} which defines, how the collection
* is represented in Apache Flink. Note that the GraphCollection also implements that interface and
* just forward the calls to the layout. This is just for convenience and API synchronicity.
*/
public class GraphCollection implements
BaseGraphCollection<GraphHead, Vertex, Edge, LogicalGraph, GraphCollection>,
GraphCollectionOperators {
/**
* Layout for that graph collection
*/
private final GraphCollectionLayout<GraphHead, Vertex, Edge> layout;
/**
* Configuration
*/
private final GradoopFlinkConfig config;
/**
* Creates a graph collection from the given arguments.
*
* @param layout the graph collection layout
* @param config the Gradoop Flink configuration
*/
GraphCollection(GraphCollectionLayout<GraphHead, Vertex, Edge> layout,
GradoopFlinkConfig config) {
this.layout = Objects.requireNonNull(layout);
this.config = Objects.requireNonNull(config);
}
//----------------------------------------------------------------------------
// Data methods
//----------------------------------------------------------------------------
@Override
public boolean isGVELayout() {
return layout.isGVELayout();
}
@Override
public boolean isIndexedGVELayout() {
return layout.isIndexedGVELayout();
}
@Override
public boolean isTransactionalLayout() {
return layout.isTransactionalLayout();
}
@Override
public DataSet<Vertex> getVertices() {
return layout.getVertices();
}
@Override
public DataSet<Vertex> getVerticesByLabel(String label) {
return layout.getVerticesByLabel(label);
}
@Override
public DataSet<Edge> getEdges() {
return layout.getEdges();
}
@Override
public DataSet<Edge> getEdgesByLabel(String label) {
return layout.getEdgesByLabel(label);
}
@Override
public DataSet<GraphHead> getGraphHeads() {
return layout.getGraphHeads();
}
@Override
public DataSet<GraphHead> getGraphHeadsByLabel(String label) {
return layout.getGraphHeadsByLabel(label);
}
@Override
public DataSet<GraphTransaction> getGraphTransactions() {
return layout.getGraphTransactions();
}
//----------------------------------------------------------------------------
// Logical Graph / Graph Head Getters
//----------------------------------------------------------------------------
@Override
public LogicalGraph getGraph(final GradoopId graphID) {
// filter vertices and edges based on given graph id
DataSet<GraphHead> graphHead = getGraphHeads()
.filter(new BySameId<>(graphID));
DataSet<Vertex> vertices = getVertices()
.filter(new InGraph<>(graphID));
DataSet<Edge> edges = getEdges()
.filter(new InGraph<>(graphID));
return new LogicalGraph(
config.getLogicalGraphFactory().fromDataSets(graphHead, vertices, edges),
getConfig());
}
@Override
public GraphCollection getGraphs(final GradoopId... identifiers) {
GradoopIdSet graphIds = new GradoopIdSet();
graphIds.addAll(Arrays.asList(identifiers));
return getGraphs(graphIds);
}
@Override
public GraphCollection getGraphs(final GradoopIdSet identifiers) {
DataSet<GraphHead> newGraphHeads = this.getGraphHeads()
.filter(new FilterFunction<GraphHead>() {
@Override
public boolean filter(GraphHead graphHead) {
return identifiers.contains(graphHead.getId());
}
});
// build new vertex set
DataSet<Vertex> vertices = getVertices()
.filter(new InAnyGraph<>(identifiers));
// build new edge set
DataSet<Edge> edges = getEdges()
.filter(new InAnyGraph<>(identifiers));
return new GraphCollection(getFactory().fromDataSets(newGraphHeads, vertices, edges),
getConfig());
}
//----------------------------------------------------------------------------
// Unary Operators
//----------------------------------------------------------------------------
@Override
public GraphCollection select(final FilterFunction<GraphHead> predicate) {
return callForCollection(new Selection(predicate));
}
@Override
public GraphCollection sortBy(String propertyKey, Order order) {
throw new NotImplementedException();
}
@Override
public GraphCollection limit(int n) {
return callForCollection(new Limit(n));
}
@Override
public GraphCollection match(
String pattern,
PatternMatchingAlgorithm algorithm,
boolean returnEmbeddings) {
return new TransactionalPatternMatching(
pattern,
algorithm,
returnEmbeddings).execute(this);
}
//----------------------------------------------------------------------------
// Binary Operators
//----------------------------------------------------------------------------
@Override
public GraphCollection union(GraphCollection otherCollection) {
return callForCollection(new Union(), otherCollection);
}
@Override
public GraphCollection intersect(GraphCollection otherCollection) {
return callForCollection(new Intersection(), otherCollection);
}
@Override
public GraphCollection intersectWithSmallResult(
GraphCollection otherCollection) {
return callForCollection(new IntersectionBroadcast(),
otherCollection);
}
@Override
public GraphCollection difference(GraphCollection otherCollection) {
return callForCollection(new Difference(), otherCollection);
}
@Override
public GraphCollection differenceWithSmallResult(
GraphCollection otherCollection) {
return callForCollection(new DifferenceBroadcast(),
otherCollection);
}
@Override
public DataSet<Boolean> equalsByGraphIds(GraphCollection other) {
return new CollectionEqualityByGraphIds().execute(this, other);
}
@Override
public DataSet<Boolean> equalsByGraphElementIds(GraphCollection other) {
return new CollectionEquality(
new GraphHeadToEmptyString(),
new VertexToIdString(),
new EdgeToIdString(), true).execute(this, other);
}
@Override
public DataSet<Boolean> equalsByGraphElementData(GraphCollection other) {
return new CollectionEquality(
new GraphHeadToEmptyString(),
new VertexToDataString(),
new EdgeToDataString(), true).execute(this, other);
}
@Override
public DataSet<Boolean> equalsByGraphData(GraphCollection other) {
return new CollectionEquality(
new GraphHeadToDataString(),
new VertexToDataString(),
new EdgeToDataString(), true).execute(this, other);
}
//----------------------------------------------------------------------------
// Auxiliary Operators
//----------------------------------------------------------------------------
@Override
public GraphCollection callForCollection(
UnaryBaseGraphCollectionToBaseGraphCollectionOperator<GraphCollection> operator) {
return operator.execute(this);
}
@Override
public GraphCollection callForCollection(
BinaryCollectionToCollectionOperator op,
GraphCollection otherCollection) {
return op.execute(this, otherCollection);
}
@Override
public LogicalGraph callForGraph(UnaryCollectionToGraphOperator op) {
return op.execute(this);
}
@Override
public GraphCollection apply(ApplicableUnaryGraphToGraphOperator op) {
return callForCollection(op);
}
@Override
public LogicalGraph reduce(ReducibleBinaryGraphToGraphOperator op) {
return callForGraph(op);
}
//----------------------------------------------------------------------------
// Utility methods
//----------------------------------------------------------------------------
@Override
public GradoopFlinkConfig getConfig() {
return config;
}
@Override
public BaseGraphCollectionFactory<GraphHead, Vertex, Edge, LogicalGraph, GraphCollection>
getFactory() {
return config.getGraphCollectionFactory();
}
@Override
public DataSet<Boolean> isEmpty() {
return getGraphHeads()
.map(new True<>())
.distinct()
.union(getConfig().getExecutionEnvironment().fromElements(false))
.reduce(new Or())
.map(new Not());
}
@Override
public GraphCollection distinctById() {
return callForCollection(new DistinctById());
}
@Override
public GraphCollection distinctByIsomorphism() {
return callForCollection(new DistinctByIsomorphism());
}
@Override
public GraphCollection groupByIsomorphism(GraphHeadReduceFunction func) {
return callForCollection(new GroupByIsomorphism(func));
}
@Override
public void writeTo(DataSink dataSink) throws IOException {
dataSink.write(this);
}
@Override
public void writeTo(DataSink dataSink, boolean overWrite) throws IOException {
dataSink.write(this, overWrite);
}
/**
* Prints this graph collection to the console.
*
* @throws Exception forwarded DataSet print() Exception.
*/
public void print() throws Exception {
GDLConsoleOutput.print(this);
}
}