Permalink
Browse files

[#1075] Minimal csv importer (#1099)

fixes #1075
  • Loading branch information...
cmoesler authored and galpha committed Jan 9, 2019
1 parent ff04bdc commit c9862990f533522521ffb4cafccc2677aea337fc
Showing with 926 additions and 2,732 deletions.
  1. +187 −0 ...a-integration/src/main/java/org/gradoop/dataintegration/importer/impl/csv/MinimalCSVImporter.java
  2. +94 −0 ...ion/src/main/java/org/gradoop/dataintegration/importer/impl/csv/functions/CsvRowToProperties.java
  3. +48 −0 ...ion/src/main/java/org/gradoop/dataintegration/importer/impl/csv/functions/PropertiesToVertex.java
  4. +19 −0 ...tegration/src/main/java/org/gradoop/dataintegration/importer/impl/csv/functions/package-info.java
  5. +19 −0 ...op-data-integration/src/main/java/org/gradoop/dataintegration/importer/impl/csv/package-info.java
  6. +332 −0 ...tegration/src/test/java/org/gradoop/dataintegration/importer/impl/csv/MinimalCSVImporterTest.java
  7. +8 −2 ...ta-integration/src/test/java/org/gradoop/dataintegration/transformation/impl/InvertEdgesTest.java
  8. +5 −0 gradoop-data-integration/src/test/resources/csv/expected.gdl
  9. +4 −0 gradoop-data-integration/src/test/resources/csv/expectedEmptyPropertyValues.gdl
  10. +4 −0 gradoop-data-integration/src/test/resources/csv/input.csv
  11. +5 −0 gradoop-data-integration/src/test/resources/csv/inputEmptyLines.csv
  12. +3 −0 gradoop-data-integration/src/test/resources/csv/inputEmptyPropertyValues.csv
  13. +3 −0 gradoop-data-integration/src/test/resources/csv/inputWithoutHeader.csv
  14. +5 −0 gradoop-examples/pom.xml
  15. +68 −0 gradoop-examples/src/main/java/org/gradoop/examples/minimalimport/MinimalCSVImportExample.java
  16. +19 −0 gradoop-examples/src/main/java/org/gradoop/examples/minimalimport/package-info.java
  17. +0 −1,892 gradoop-examples/src/main/resources/data/csv/foodbroker/edges.csv
  18. +0 −1 gradoop-examples/src/main/resources/data/csv/foodbroker/graphs.csv
  19. +0 −33 gradoop-examples/src/main/resources/data/csv/foodbroker/metadata.csv
  20. +0 −804 gradoop-examples/src/main/resources/data/csv/foodbroker/vertices.csv
  21. +89 −0 gradoop-examples/src/main/resources/data/csv/minimalcsv/person.csv
  22. +14 −0 pom.xml
@@ -0,0 +1,187 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.dataintegration.importer.impl.csv;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.dataintegration.importer.impl.csv.functions.PropertiesToVertex;
import org.gradoop.dataintegration.importer.impl.csv.functions.CsvRowToProperties;
import org.gradoop.flink.io.api.DataSource;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.util.GradoopFlinkConfig;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

/**
* Read a csv file and import each row as a vertex in EPGM representation.
*/
public class MinimalCSVImporter implements DataSource {

/**
* Token delimiter of the CSV file.
*/
private String tokenSeparator;

/**
* Path to the csv file.
*/
private String path;

/**
* The charset used in the csv file, e.g., "UTF-8".
*/
private String charset;

/**
* The property names for all columns in the file. If {@code null}, the first line will be
* interpreted as header row.
*/
private List<String> columnNames;

/**
* Flag to specify if each row of the file should be checked for reoccurring of the column
* property names.
*/
private boolean checkReoccurringHeader;

/**
* Gradoop Flink configuration.
*/
private GradoopFlinkConfig config;

/**
* Create a new MinimalCSVImporter instance by the given parameters.
*
* @param path the path to the csv file
* @param tokenSeparator the token delimiter of the csv file
* @param config GradoopFlinkConfig configuration
* @param columnNames property identifiers for each column
* @param checkReoccurringHeader if each row of the file should be checked for reoccurring of
* the column property names
*/
public MinimalCSVImporter(String path, String tokenSeparator, GradoopFlinkConfig config,
List<String> columnNames, boolean checkReoccurringHeader) {
this(path, tokenSeparator, config, checkReoccurringHeader);
this.columnNames = Objects.requireNonNull(columnNames);
}

/**
* Create a new MinimalCSVImporter instance by the given parameters.
* Use the UTF-8 charset as default. The first line of the file will be set as the property
* names for each column.
*
* @param path the path to the csv file
* @param tokenSeparator the token delimiter of the csv file
* @param config GradoopFlinkConfig configuration
* @param checkReoccurringHeader if each row of the file should be checked for reoccurring of
* the column property names.
*/
public MinimalCSVImporter(String path, String tokenSeparator, GradoopFlinkConfig config,
boolean checkReoccurringHeader) {
this(path, tokenSeparator, config, "UTF-8", checkReoccurringHeader);
}

/**
* Create a new MinimalCSVImporter with a user set charset. The first line of the file
* will set as the property names for each column.
*
* @param path the path to the csv file
* @param tokenSeparator the token delimiter of the csv file
* @param config GradoopFlinkConfig configuration
* @param charset the charset used in the csv file
* @param checkReoccurringHeader if each row of the file should be checked for reoccurring of
* the column property names.
*/
public MinimalCSVImporter(String path, String tokenSeparator, GradoopFlinkConfig config,
String charset, boolean checkReoccurringHeader) {
this.path = Objects.requireNonNull(path);
this.tokenSeparator = Objects.requireNonNull(tokenSeparator);
this.config = Objects.requireNonNull(config);
this.charset = Objects.requireNonNull(charset);
this.checkReoccurringHeader = checkReoccurringHeader;
}

/**
* Import each row of the file as a vertex and create a logical graph from it.
* If no column property names are set, read the first line of the file as header and set this
* values as column names.
*
* @return a logical graph with a vertex per csv line and no edges
* @throws IOException on failure
*/
@Override
public LogicalGraph getLogicalGraph() throws IOException {
DataSet<Vertex> vertices;

if (columnNames == null) {
vertices = readCSVFile(readHeaderRow(), checkReoccurringHeader);
} else {
vertices = readCSVFile(columnNames, checkReoccurringHeader);
}

return config.getLogicalGraphFactory().fromDataSets(vertices);
}

@Override
public GraphCollection getGraphCollection() throws IOException {
return config.getGraphCollectionFactory().fromGraph(getLogicalGraph());
}

/**
* Reads the csv file specified by {@link MinimalCSVImporter#path} and converts each valid line
* to a {@link Vertex}.
*
* @param propertyNames list of the property identifier names
* @param checkReoccurringHeader set to true if each row of the file should be checked for
* reoccurring of the column property names
* @return a {@link DataSet} of all vertices from one specific file
*/
private DataSet<Vertex> readCSVFile(List<String> propertyNames, boolean checkReoccurringHeader) {
return config.getExecutionEnvironment()
.readTextFile(path)
.flatMap(new CsvRowToProperties(tokenSeparator, propertyNames, checkReoccurringHeader))
.map(new PropertiesToVertex<>(config.getVertexFactory()))
.returns(config.getVertexFactory().getType());
}

/**
* Reads the fist row of a csv file and creates a list including all column entries that
* will be used as property names.
*
* @return the property names
* @throws IOException if an error occurred while open the stream
*/
private List<String> readHeaderRow() throws IOException {
try (final BufferedReader reader =
new BufferedReader(new InputStreamReader(new FileInputStream(path), charset))) {
String headerLine = reader.readLine();
if (headerLine == null || headerLine.isEmpty()) {
throw new IOException("The csv file '" + path + "' does not contain any rows.");
}

return Arrays.asList(headerLine.split(tokenSeparator));
} catch (IOException ex) {
throw new IOException("Error while opening a stream to '" + path + "'.", ex);
}
}
}
@@ -0,0 +1,94 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.dataintegration.importer.impl.csv.functions;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.properties.Properties;
import org.gradoop.common.model.impl.properties.PropertyValue;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

/**
* Map the values of a token separated row of a file to the properties of a EPGM element.
* Each property value will be mapped to the property name the user set via parameter.
*/
public class CsvRowToProperties implements FlatMapFunction<String, Properties> {

/**
* Token separator for the csv file.
*/
private String tokenSeparator;

/**
* The name of the properties
*/
private List<String> propertyNames;

/**
* True, if the user want to check if each row of the file is equals to the header row
*/
private boolean checkReoccurringHeader;

/**
* Reduce object instantiations
*/
private Properties reuseProperties;

/**
* Create a new RowToVertexMapper
*
* @param tokenDelimiter in the file is used
* @param propertyNames list of the property names
* @param checkReoccurringHeader should the row checked for a occurring of the column names?
*/
public CsvRowToProperties(String tokenDelimiter, List<String> propertyNames,
boolean checkReoccurringHeader) {
this.tokenSeparator = Objects.requireNonNull(tokenDelimiter);
this.propertyNames = Objects.requireNonNull(propertyNames);
this.checkReoccurringHeader = checkReoccurringHeader;
this.reuseProperties = new Properties();
}

@Override
public void flatMap(String line, Collector<Properties> out) {
// Check if the line is an empty line
if (line.isEmpty()) {
return;
}

String[] propertyValues = line.split(tokenSeparator);

// If the line to read is equals to the header and the checkReoccurringHeader flag is set to
// TRUE, we do not import this line.
if (checkReoccurringHeader && propertyNames.containsAll(Arrays.asList(propertyValues))) {
return;
}

// clear the properties
reuseProperties.clear();

for (int i = 0; i < propertyValues.length; i++) {
// if a value is empty, do not add a property
if (!propertyValues[i].isEmpty()) {
reuseProperties.set(propertyNames.get(i), PropertyValue.create(propertyValues[i]));
}
}
out.collect(reuseProperties);
}
}
@@ -0,0 +1,48 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.dataintegration.importer.impl.csv.functions;

import org.apache.flink.api.common.functions.MapFunction;
import org.gradoop.common.model.api.entities.EPGMVertex;
import org.gradoop.common.model.api.entities.EPGMVertexFactory;
import org.gradoop.common.model.impl.properties.Properties;

/**
* Create an ImportVertex for each tuple of an id and properties.
*
* @param <V> the vertex type
*/
public class PropertiesToVertex<V extends EPGMVertex> implements MapFunction<Properties, V> {
/**
* Reduce object instantiations.
*/
private V vertex;

/**
* Create a new CreateImportVertexCSV function
*
* @param vertexFactory the factory that is responsible for creating a vertex
*/
public PropertiesToVertex(EPGMVertexFactory<V> vertexFactory) {
this.vertex = vertexFactory.createVertex();
}

@Override
public V map(final Properties value) {
vertex.setProperties(value);
return vertex;
}
}
@@ -0,0 +1,19 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Functions related to the simple csv import process.
*/
package org.gradoop.dataintegration.importer.impl.csv.functions;
@@ -0,0 +1,19 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains all classes related to import an external csv file into EPGM.
*/
package org.gradoop.dataintegration.importer.impl.csv;
Oops, something went wrong.

0 comments on commit c986299

Please sign in to comment.