/
CSVDataSink.java
131 lines (113 loc) · 4.64 KB
/
CSVDataSink.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.io.impl.csv;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.gradoop.flink.io.api.DataSink;
import org.gradoop.flink.io.impl.csv.functions.EdgeToCSVEdge;
import org.gradoop.flink.io.impl.csv.functions.GraphHeadToCSVGraphHead;
import org.gradoop.flink.io.impl.csv.functions.VertexToCSVVertex;
import org.gradoop.flink.io.impl.csv.metadata.CSVMetaDataSink;
import org.gradoop.flink.io.impl.csv.metadata.CSVMetaDataSource;
import org.gradoop.flink.io.impl.csv.tuples.CSVEdge;
import org.gradoop.flink.io.impl.csv.tuples.CSVGraphHead;
import org.gradoop.flink.io.impl.csv.tuples.CSVVertex;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.util.GradoopFlinkConfig;
import java.io.IOException;
/**
* A graph data sink for CSV files.
*/
public class CSVDataSink extends CSVBase implements DataSink {
/**
* Path to meta data file that is used to write the output.
*/
private final String metaDataPath;
/**
* Creates a new CSV data sink. Computes the meta data based on the given graph.
*
* @param csvPath directory to write to
* @param config Gradoop Flink configuration
*/
public CSVDataSink(String csvPath, GradoopFlinkConfig config) {
super(csvPath, config);
this.metaDataPath = null;
}
/**
* Creates a new CSV data sink. Uses the specified meta data to write the CSV output.
*
* @param csvPath directory to write CSV files to
* @param metaDataPath path to meta data CSV file
* @param config Gradoop Flink configuration
*/
public CSVDataSink(String csvPath, String metaDataPath, GradoopFlinkConfig config) {
super(csvPath, config);
this.metaDataPath = metaDataPath;
}
@Override
public void write(LogicalGraph logicalGraph) throws IOException {
write(logicalGraph, false);
}
@Override
public void write(GraphCollection graphCollection) throws IOException {
write(graphCollection, false);
}
@Override
public void write(LogicalGraph logicalGraph, boolean overwrite) throws IOException {
write(logicalGraph.getCollectionFactory().fromGraph(logicalGraph), overwrite);
}
@Override
public void write(GraphCollection graphCollection, boolean overwrite) throws IOException {
FileSystem.WriteMode writeMode = overwrite ?
FileSystem.WriteMode.OVERWRITE : FileSystem.WriteMode.NO_OVERWRITE;
DataSet<Tuple3<String, String, String>> metaData;
CSVMetaDataSource source = new CSVMetaDataSource();
if (!reuseMetadata()) {
metaData = source.tuplesFromCollection(graphCollection);
} else {
metaData = source.readDistributed(metaDataPath, getConfig());
}
DataSet<CSVGraphHead> csvGraphHeads = graphCollection.getGraphHeads()
.map(new GraphHeadToCSVGraphHead())
.withBroadcastSet(metaData, BC_METADATA);
DataSet<CSVVertex> csvVertices = graphCollection.getVertices()
.map(new VertexToCSVVertex())
.withBroadcastSet(metaData, BC_METADATA);
DataSet<CSVEdge> csvEdges = graphCollection.getEdges()
.map(new EdgeToCSVEdge())
.withBroadcastSet(metaData, BC_METADATA);
// Write metadata only if the path is not the same or reuseMetadata is false.
if (!getMetaDataPath().equals(metaDataPath) || !reuseMetadata()) {
new CSVMetaDataSink().writeDistributed(getMetaDataPath(), metaData, writeMode);
}
csvGraphHeads.writeAsCsv(getGraphHeadCSVPath(), CSVConstants.ROW_DELIMITER,
CSVConstants.TOKEN_DELIMITER, writeMode);
csvVertices.writeAsCsv(getVertexCSVPath(), CSVConstants.ROW_DELIMITER,
CSVConstants.TOKEN_DELIMITER, writeMode);
csvEdges.writeAsCsv(getEdgeCSVPath(), CSVConstants.ROW_DELIMITER,
CSVConstants.TOKEN_DELIMITER, writeMode);
}
/**
* Returns true, if the meta data shall be reused.
*
* @return true, iff reuse is possible
*/
private boolean reuseMetadata() {
return this.metaDataPath != null && !this.metaDataPath.isEmpty();
}
}