Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#841] encoding entity type within csv field in csv metadata #842

Merged
merged 3 commits into from Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -32,11 +32,11 @@ public class CSVConstants {
*/
public static final String ROW_DELIMITER = System.getProperty("line.separator");
/**
* Used to specify labels in metadata (vertex or edge label)
* Used to specify entity type (vertex or edge)
*/
public static final String VERTEX_PREFIX = "v_";
public static final String VERTEX_TYPE = "v";
/**
* Used to specify labels in metadata (vertex or edge label)
* Used to specify entity type (vertex or edge)
*/
public static final String EDGE_PREFIX = "e_";
public static final String EDGE_TYPE = "e";
}
Expand Up @@ -17,7 +17,7 @@

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.core.fs.FileSystem;
import org.gradoop.common.model.impl.pojo.Element;
Expand Down Expand Up @@ -83,7 +83,7 @@ public void write(LogicalGraph logicalGraph, boolean overwrite) throws IOExcepti
FileSystem.WriteMode writeMode = overwrite ?
FileSystem.WriteMode.OVERWRITE : FileSystem.WriteMode.NO_OVERWRITE;

DataSet<Tuple2<String, String>> metaData;
DataSet<Tuple3<String, String, String>> metaData;
if (!reuseMetadata()) {
metaData = createMetaData(logicalGraph);
} else {
Expand Down Expand Up @@ -132,7 +132,7 @@ private boolean reuseMetadata() {
* @param graph logical graph
* @return meta data information
*/
private DataSet<Tuple2<String, String>> createMetaData(LogicalGraph graph) {
private DataSet<Tuple3<String, String, String>> createMetaData(LogicalGraph graph) {
return createMetaData(graph.getVertices())
.union(createMetaData(graph.getEdges()));
}
Expand All @@ -144,15 +144,19 @@ private DataSet<Tuple2<String, String>> createMetaData(LogicalGraph graph) {
* @param <E> EPGM element type
* @return meta data information
*/
private <E extends Element> DataSet<Tuple2<String, String>> createMetaData(DataSet<E> elements) {
private <E extends Element> DataSet<Tuple3<String, String, String>> createMetaData(
DataSet<E> elements) {
return elements
.map(new ElementToPropertyMetaData<>())
.groupBy(0)
.groupBy(1)
.combineGroup(new ReducePropertyMetaData())
.groupBy(0)
.groupBy(1)
.reduceGroup(new ReducePropertyMetaData())
.map(tuple -> Tuple2.of(tuple.f0, MetaDataParser.getPropertiesMetaData(tuple.f1)))
.returns(new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.withForwardedFields("f0");
.map(tuple -> Tuple3.of(tuple.f0, tuple.f1, MetaDataParser.getPropertiesMetaData(tuple.f2)))
.returns(new TupleTypeInfo<>(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO))
.withForwardedFields("f1");
}
}
Expand Up @@ -16,7 +16,7 @@
package org.gradoop.flink.io.impl.csv;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.flink.io.api.DataSource;
Expand Down Expand Up @@ -52,7 +52,8 @@ public CSVDataSource(String csvPath, GradoopFlinkConfig config) {

@Override
public LogicalGraph getLogicalGraph() {
DataSet<Tuple2<String, String>> metaData = MetaData.fromFile(getMetaDataPath(), getConfig());
DataSet<Tuple3<String, String, String>> metaData =
MetaData.fromFile(getMetaDataPath(), getConfig());

DataSet<Vertex> vertices = getConfig().getExecutionEnvironment()
.readTextFile(getVertexCSVPath())
Expand Down
Expand Up @@ -18,7 +18,6 @@
import org.gradoop.common.model.api.entities.EPGMEdgeFactory;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.flink.io.impl.csv.CSVConstants;
import org.gradoop.flink.io.impl.csv.metadata.MetaData;

/**
Expand Down Expand Up @@ -53,9 +52,4 @@ public Edge map(String csvLine) throws Exception {
GradoopId.fromString(tokens[2]),
parseProperties(tokens[3], tokens[4]));
}

@Override
String getPrefix() {
return CSVConstants.EDGE_PREFIX;
}
}
Expand Up @@ -55,12 +55,6 @@ abstract class CSVLineToElement<E extends Element> extends RichMapFunction<Strin
this.properties = Properties.create();
}

/**
* Returns prefix for labels used in metadata
*
* @return prefix of the given elements
*/
abstract String getPrefix();

@Override
public void open(Configuration parameters) throws Exception {
Expand All @@ -79,7 +73,7 @@ public void open(Configuration parameters) throws Exception {
*/
Properties parseProperties(String label, String propertyValueString) {
String[] propertyValues = propertyValueString.split(valueDelimiter);
List<PropertyMetaData> metaDataList = metaData.getPropertyMetaData(getPrefix() + label);
List<PropertyMetaData> metaDataList = metaData.getPropertyMetaData(label);
properties.clear();
for (int i = 0; i < propertyValues.length; i++) {
if (propertyValues[i].length() > 0) {
Expand Down
Expand Up @@ -18,7 +18,6 @@
import org.gradoop.common.model.api.entities.EPGMVertexFactory;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.flink.io.impl.csv.CSVConstants;
import org.gradoop.flink.io.impl.csv.metadata.MetaData;

/**
Expand Down Expand Up @@ -53,9 +52,4 @@ public Vertex map(String csvLine) throws Exception {
tokens[1],
parseProperties(tokens[1], tokens[2]));
}

@Override
String getPrefix() {
return CSVConstants.VERTEX_PREFIX;
}
}
Expand Up @@ -17,7 +17,6 @@

import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.flink.io.impl.csv.CSVConstants;
import org.gradoop.flink.io.impl.csv.tuples.CSVEdge;

/**
Expand All @@ -43,9 +42,4 @@ public CSVEdge map(Edge edge) throws Exception {
csvEdge.setProperties(getPropertyString(edge));
return csvEdge;
}

@Override
String getPrefix() {
return CSVConstants.EDGE_PREFIX;
}
}
Expand Up @@ -32,7 +32,8 @@
* @param <E> EPGM element type
* @param <T> output tuple type
*/
public abstract class ElementToCSV<E extends Element, T extends Tuple> extends RichMapFunction<E , T> {
public abstract class ElementToCSV<E extends Element, T extends Tuple>
extends RichMapFunction<E , T> {
/**
* Constant for an empty string.
*/
Expand All @@ -49,21 +50,14 @@ public void open(Configuration parameters) throws Exception {
.getBroadcastVariable(CSVDataSource.BC_METADATA));
}

/**
* Returns prefix for labels used in metadata
*
* @return prefix of the given elements
*/
abstract String getPrefix();

/**
* Returns the concatenated property values of the specified element according to the meta data.
*
* @param element EPGM element
* @return property value string
*/
String getPropertyString(E element) {
return metaData.getPropertyMetaData(getPrefix() + element.getLabel()).stream()
return metaData.getPropertyMetaData(element.getLabel()).stream()
.map(metaData -> element.getPropertyValue(metaData.getKey()))
.map(value -> value == null ? EMPTY_STRING : value.toString())
.collect(Collectors.joining(CSVConstants.VALUE_DELIMITER));
Expand Down
Expand Up @@ -17,7 +17,7 @@

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Element;
import org.gradoop.common.model.impl.pojo.Vertex;
Expand All @@ -29,38 +29,39 @@
import java.util.Set;

/**
* (element) -> (elementLabel, {key_1:type_1,key_2:type_2,...,key_n:type_n})
* (element) -> (elementType, elementLabel, {key_1:type_1,key_2:type_2,...,key_n:type_n})
*
* @param <E> EPGM element type
*/
@FunctionAnnotation.ForwardedFields("label->f0")
public class ElementToPropertyMetaData<E extends Element> implements MapFunction<E, Tuple2<String, Set<String>>> {
@FunctionAnnotation.ForwardedFields("label->f1")
public class ElementToPropertyMetaData<E extends Element>
implements MapFunction<E, Tuple3<String, String, Set<String>>> {
/**
* Reduce object instantiations.
*/
private final Tuple2<String, Set<String>> reuseTuple;
private final Tuple3<String, String, Set<String>> reuseTuple;
/**
* Constructor
*/
public ElementToPropertyMetaData() {
reuseTuple = new Tuple2<>();
reuseTuple.f1 = new HashSet<>();
reuseTuple = new Tuple3<>();
reuseTuple.f2 = new HashSet<>();
}

@Override
public Tuple2<String, Set<String>> map(E e) throws Exception {
public Tuple3<String, String, Set<String>> map(E e) throws Exception {
if (e.getClass() == Edge.class) {
reuseTuple.f0 = CSVConstants.EDGE_PREFIX + e.getLabel();
reuseTuple.f0 = CSVConstants.EDGE_TYPE;
} else if (e.getClass() == Vertex.class) {
reuseTuple.f0 = CSVConstants.VERTEX_PREFIX + e.getLabel();
reuseTuple.f0 = CSVConstants.VERTEX_TYPE;
} else {
throw new Exception("Unsupported element class");
}

reuseTuple.f1.clear();
reuseTuple.f1 = e.getLabel();
reuseTuple.f2.clear();
if (e.getProperties() != null) {
for (Property property : e.getProperties()) {
reuseTuple.f1.add(MetaDataParser.getPropertyMetaData(property));
reuseTuple.f2.add(MetaDataParser.getPropertyMetaData(property));
}
}
return reuseTuple;
Expand Down
Expand Up @@ -18,7 +18,7 @@
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;

import java.util.Iterator;
Expand All @@ -27,35 +27,36 @@
/**
* Reduces all property meta data to a single element per label.
*/
@FunctionAnnotation.ForwardedFields("f0")
@FunctionAnnotation.ForwardedFields({"f0", "f1"})
public class ReducePropertyMetaData implements
GroupCombineFunction<Tuple2<String, Set<String>>, Tuple2<String, Set<String>>>,
GroupReduceFunction<Tuple2<String, Set<String>>, Tuple2<String, Set<String>>> {
GroupCombineFunction<Tuple3<String, String, Set<String>>, Tuple3<String, String, Set<String>>>,
GroupReduceFunction<Tuple3<String, String, Set<String>>, Tuple3<String, String, Set<String>>> {
/**
* Reduce object instantiations
*/
private final Tuple2<String, Set<String>> tuple = new Tuple2<>();
private final Tuple3<String, String, Set<String>> tuple = new Tuple3<>();

@Override
public void combine(Iterable<Tuple2<String, Set<String>>> iterable,
Collector<Tuple2<String, Set<String>>> collector) throws Exception {
public void combine(Iterable<Tuple3<String, String, Set<String>>> iterable,
Collector<Tuple3<String, String, Set<String>>> collector) throws Exception {

Iterator<Tuple2<String, Set<String>>> iterator = iterable.iterator();
Tuple2<String, Set<String>> first = iterator.next();
Set<String> keys = first.f1;
Iterator<Tuple3<String, String, Set<String>>> iterator = iterable.iterator();
Tuple3<String, String, Set<String>> first = iterator.next();
Set<String> keys = first.f2;

while (iterator.hasNext()) {
keys.addAll(iterator.next().f1);
keys.addAll(iterator.next().f2);
}

tuple.f0 = first.f0;
tuple.f1 = keys;
tuple.f1 = first.f1;
tuple.f2 = keys;
collector.collect(tuple);
}

@Override
public void reduce(Iterable<Tuple2<String, Set<String>>> iterable,
Collector<Tuple2<String, Set<String>>> collector) throws Exception {
public void reduce(Iterable<Tuple3<String, String, Set<String>>> iterable,
Collector<Tuple3<String, String, Set<String>>> collector) throws Exception {
combine(iterable, collector);
}
}
Expand Up @@ -17,7 +17,6 @@

import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.flink.io.impl.csv.CSVConstants;
import org.gradoop.flink.io.impl.csv.tuples.CSVVertex;

/**
Expand All @@ -41,9 +40,4 @@ public CSVVertex map(Vertex vertex) throws Exception {
csvVertex.setProperties(getPropertyString(vertex));
return csvVertex;
}

@Override
String getPrefix() {
return CSVConstants.VERTEX_PREFIX;
}
}