Skip to content
Permalink
Browse files

[#1345] Added changes from p-f to prepare gradoop-flink with generali…

…zations for the new gradoop-temporal module (#1346)

fixes #1345
  • Loading branch information...
ChrizZz110 committed Aug 9, 2019
1 parent 87224ea commit 183e31ca3669a4dcf34e59409d04af0373b4bfe9
@@ -20,12 +20,12 @@
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.hadoop.conf.Configuration;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.impl.metadata.MetaData;
import org.gradoop.common.model.impl.metadata.PropertyMetaData;
import org.gradoop.common.model.impl.pojo.EPGMElement;
import org.gradoop.flink.io.api.metadata.functions.ElementToPropertyMetaData;
import org.gradoop.flink.io.api.metadata.functions.ReducePropertyMetaData;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.util.GradoopFlinkConfig;

@@ -70,7 +70,8 @@
* @param graphs graph collection
* @return meta data information
*/
default DataSet<Tuple3<String, String, String>> tuplesFromCollection(GraphCollection graphs) {
default DataSet<Tuple3<String, String, String>> tuplesFromCollection(
BaseGraphCollection<?, ?, ?, ?, ?> graphs) {
return tuplesFromElements(graphs.getVertices())
.union(tuplesFromElements(graphs.getEdges()))
.union(tuplesFromElements(graphs.getGraphHeads()));
@@ -83,7 +84,7 @@
* @param <E> EPGM element type
* @return meta data information
*/
static <E extends EPGMElement> DataSet<Tuple3<String, String, String>> tuplesFromElements(
static <E extends Element> DataSet<Tuple3<String, String, String>> tuplesFromElements(
DataSet<E> elements) {
return elements
.map(new ElementToPropertyMetaData<>())
@@ -18,10 +18,10 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.pojo.EPGMElement;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.properties.Property;
import org.gradoop.flink.io.api.metadata.MetaDataSource;
import org.gradoop.flink.io.impl.csv.CSVConstants;
@@ -34,10 +34,10 @@
/**
* (element) -> (elementType, elementLabel, {key_1:type_1,key_2:type_2,...,key_n:type_n})
*
* @param <E> EPGM element type
* @param <E> element type
*/
@FunctionAnnotation.ForwardedFields("label->f1")
public class ElementToPropertyMetaData<E extends EPGMElement>
public class ElementToPropertyMetaData<E extends Element>
implements MapFunction<E, Tuple3<String, String, Set<String>>> {
/**
* Reduce object instantiations.
@@ -53,16 +53,14 @@ public ElementToPropertyMetaData() {

@Override
public Tuple3<String, String, Set<String>> map(E e) throws Exception {
Class<? extends EPGMElement> type = e.getClass();

if (type == EPGMEdge.class) {
if (e instanceof Edge) {
reuseTuple.f0 = MetaDataSource.EDGE_TYPE;
} else if (type == EPGMVertex.class) {
} else if (e instanceof Vertex) {
reuseTuple.f0 = MetaDataSource.VERTEX_TYPE;
} else if (type == EPGMGraphHead.class) {
} else if (e instanceof GraphHead) {
reuseTuple.f0 = MetaDataSource.GRAPH_TYPE;
} else {
throw new Exception("Unsupported element class");
throw new Exception("Unsupported element class: " + e.getClass().getName());
}
reuseTuple.f1 = StringEscaper.escape(e.getLabel(), CSVConstants.ESCAPED_CHARACTERS);
reuseTuple.f2.clear();
@@ -40,7 +40,7 @@
/**
* Path to meta data file that is used to write the output.
*/
private final String metaDataPath;
protected final String metaDataPath;

/**
* Creates a new CSV data sink. Computes the meta data based on the given graph.
@@ -125,7 +125,7 @@ public void write(GraphCollection graphCollection, boolean overwrite) throws IOE
*
* @return true, iff reuse is possible
*/
private boolean reuseMetadata() {
protected boolean reuseMetadata() {
return this.metaDataPath != null && !this.metaDataPath.isEmpty();
}
}
@@ -17,14 +17,18 @@

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.io.api.DataSource;
import org.gradoop.flink.io.impl.csv.functions.CSVLineToEdge;
import org.gradoop.flink.io.impl.csv.functions.CSVLineToElement;
import org.gradoop.flink.io.impl.csv.functions.CSVLineToGraphHead;
import org.gradoop.flink.io.impl.csv.functions.CSVLineToVertex;
import org.gradoop.flink.io.impl.csv.metadata.CSVMetaDataSource;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.GraphCollectionFactory;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
@@ -70,25 +74,57 @@ public LogicalGraph getLogicalGraph() {

@Override
public GraphCollection getGraphCollection() {
GraphCollectionFactory factory = getConfig().getGraphCollectionFactory();
GraphCollectionFactory collectionFactory = getConfig().getGraphCollectionFactory();
return getCollection(
new CSVLineToGraphHead(collectionFactory.getGraphHeadFactory()),
new CSVLineToVertex(collectionFactory.getVertexFactory()),
new CSVLineToEdge(collectionFactory.getEdgeFactory()),
collectionFactory);
}

/**
* Create a graph collection from CSV lines using {@link CSVLineToElement} mapper functions.
*
* @param csvToGraphHead A function mapping a CSV line to a graph head.
* @param csvToVertex A function mapping a CSV line to a vertex.
* @param csvToEdge A function mapping a CSV line to an edge.
* @param collectionFactory A factory used to create the final graph collection.
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The graph type.
* @param <GC> The graph collection type.
* @return A graph collection representing the graph data stored as CSV.
*/
protected <
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> GC getCollection(
CSVLineToElement<G> csvToGraphHead,
CSVLineToElement<V> csvToVertex,
CSVLineToElement<E> csvToEdge,
BaseGraphCollectionFactory<G, V, E, LG, GC> collectionFactory) {

// Read the meta data
DataSet<Tuple3<String, String, String>> metaData =
new CSVMetaDataSource().readDistributed(getMetaDataPath(), getConfig());

DataSet<EPGMGraphHead> graphHeads = getConfig().getExecutionEnvironment()
// Read the datasets of each graph element
DataSet<G> graphHeads = getConfig().getExecutionEnvironment()
.readTextFile(getGraphHeadCSVPath())
.map(new CSVLineToGraphHead(factory.getGraphHeadFactory()))
.withBroadcastSet(metaData, BC_METADATA);
.map(csvToGraphHead).withBroadcastSet(metaData, BC_METADATA);

DataSet<EPGMVertex> vertices = getConfig().getExecutionEnvironment()
DataSet<V> vertices = getConfig().getExecutionEnvironment()
.readTextFile(getVertexCSVPath())
.map(new CSVLineToVertex(factory.getVertexFactory()))
.withBroadcastSet(metaData, BC_METADATA);
.map(csvToVertex).withBroadcastSet(metaData, BC_METADATA);

DataSet<EPGMEdge> edges = getConfig().getExecutionEnvironment()
DataSet<E> edges = getConfig().getExecutionEnvironment()
.readTextFile(getEdgeCSVPath())
.map(new CSVLineToEdge(factory.getEdgeFactory()))
.withBroadcastSet(metaData, BC_METADATA);
.map(csvToEdge).withBroadcastSet(metaData, BC_METADATA);

return factory.fromDataSets(graphHeads, vertices, edges);
// Create the graph
return collectionFactory.fromDataSets(graphHeads, vertices, edges);
}
}
@@ -17,11 +17,11 @@

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.common.model.impl.metadata.MetaData;
import org.gradoop.common.model.impl.metadata.PropertyMetaData;
import org.gradoop.common.model.impl.pojo.EPGMElement;
import org.gradoop.common.model.impl.properties.Properties;
import org.gradoop.flink.io.impl.csv.CSVConstants;
import org.gradoop.flink.io.impl.csv.CSVDataSource;
@@ -31,25 +31,25 @@
import java.util.List;

/**
* Base class for reading an {@link EPGMElement} from CSV. Handles the {@link MetaData} which is
* Base class for reading an {@link Element} from CSV. Handles the {@link MetaData} which is
* required to parse the property values.
*
* @param <E> EPGM element type
* @param <E> element type
*/
abstract class CSVLineToElement<E extends EPGMElement> extends RichMapFunction<String, E> {
public abstract class CSVLineToElement<E extends Element> extends RichMapFunction<String, E> {
/**
* Stores the properties for the {@link EPGMElement} to be parsed.
* Stores the properties for the {@link Element} to be parsed.
*/
private final Properties properties;
/**
* Meta data that provides parsers for a specific {@link EPGMElement}.
* Meta data that provides parsers for a specific {@link Element}.
*/
private CSVMetaData metaData;

/**
* Constructor
*/
CSVLineToElement() {
public CSVLineToElement() {
this.properties = Properties.create();
}

@@ -70,7 +70,7 @@ public void open(Configuration parameters) throws Exception {
* @param propertyValueString string representation of elements' property values
* @return parsed properties
*/
Properties parseProperties(String type, String label, String propertyValueString) {
protected Properties parseProperties(String type, String label, String propertyValueString) {
String[] propertyValues = StringEscaper
.split(propertyValueString, CSVConstants.VALUE_DELIMITER);
List<PropertyMetaData> metaDataList = metaData.getPropertyMetaData(type, label);
@@ -90,7 +90,7 @@ Properties parseProperties(String type, String label, String propertyValueString
* @param gradoopIdsString The csv token string.
* @return gradoop ids contained in the string
*/
GradoopIdSet parseGradoopIds(String gradoopIdsString) {
protected GradoopIdSet parseGradoopIds(String gradoopIdsString) {
String[] gradoopIds = gradoopIdsString
.substring(1, gradoopIdsString.length() - 1)
.split(CSVConstants.LIST_DELIMITER);
@@ -61,7 +61,7 @@ public void open(Configuration parameters) throws Exception {
* @param type element type
* @return property value string
*/
String getPropertyString(E element, String type) {
protected String getPropertyString(E element, String type) {
return metaData.getPropertyMetaData(type, element.getLabel()).stream()
.map(propertyMetaData -> this.getPropertyValueString(propertyMetaData, element))
.collect(Collectors.joining(CSVConstants.VALUE_DELIMITER));
@@ -109,7 +109,7 @@ private String propertyValueToCsvString(PropertyValue p) {
* @param collection collection
* @return CSV string
*/
String collectionToCsvString(Collection<?> collection) {
protected String collectionToCsvString(Collection<?> collection) {
return collection.stream()
.map(o -> o instanceof PropertyValue ? escape((PropertyValue) o) : o.toString())
.collect(Collectors.joining(CSVConstants.LIST_DELIMITER, "[", "]"));
@@ -23,7 +23,6 @@
import org.gradoop.common.model.api.entities.ElementFactoryProvider;
import org.gradoop.flink.model.api.layouts.GraphCollectionLayoutFactory;
import org.gradoop.flink.model.api.layouts.LogicalGraphLayout;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;

import java.util.Collection;
@@ -46,6 +45,13 @@
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> extends ElementFactoryProvider<G, V, E> {

/**
* Get the layout factory responsible for creating a graph collection layout.
*
* @return The graph collection layout factory.
*/
GraphCollectionLayoutFactory<G, V, E> getLayoutFactory();

/**
* Sets the layout factory that is responsible for creating a graph collection layout.
*
@@ -108,7 +114,7 @@ GC fromIndexedDataSets(Map<String, DataSet<G>> graphHeads, Map<String, DataSet<V
* @param logicalGraphLayout input graphs
* @return graph collection
*/
GC fromGraphs(LogicalGraph... logicalGraphLayout);
GC fromGraphs(LogicalGraphLayout<G, V, E>... logicalGraphLayout);

/**
* Creates a graph collection from a graph transaction dataset.
@@ -42,6 +42,13 @@
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> extends ElementFactoryProvider<G, V, E> {

/**
* Get the layout factory responsible for creating a graph layout.
*
* @return The graph layout factory.
*/
LogicalGraphLayoutFactory<G, V, E> getLayoutFactory();

/**
* Sets the layout factory that is responsible for creating a graph layout.
*
@@ -60,6 +60,11 @@ public GraphCollectionFactory(GradoopFlinkConfig config) {
this.config = config;
}

@Override
public GraphCollectionLayoutFactory<EPGMGraphHead, EPGMVertex, EPGMEdge> getLayoutFactory() {
return layoutFactory;
}

@Override
public void setLayoutFactory(
GraphCollectionLayoutFactory<EPGMGraphHead, EPGMVertex, EPGMEdge> layoutFactory) {
@@ -116,7 +121,8 @@ public GraphCollection fromGraph(
}

@Override
public GraphCollection fromGraphs(LogicalGraph... logicalGraphLayouts) {
public GraphCollection fromGraphs(
LogicalGraphLayout<EPGMGraphHead, EPGMVertex, EPGMEdge>... logicalGraphLayouts) {
if (logicalGraphLayouts.length != 0) {
DataSet<EPGMGraphHead> graphHeads = null;
DataSet<EPGMVertex> vertices = null;
@@ -126,7 +132,7 @@ public GraphCollection fromGraphs(LogicalGraph... logicalGraphLayouts) {
return fromGraph(logicalGraphLayouts[0]);
}

for (LogicalGraph logicalGraph : logicalGraphLayouts) {
for (LogicalGraphLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> logicalGraph : logicalGraphLayouts) {
graphHeads = (graphHeads == null) ?
logicalGraph.getGraphHead() : graphHeads.union(logicalGraph.getGraphHead());
vertices = (vertices == null) ?
@@ -54,6 +54,11 @@ public LogicalGraphFactory(GradoopFlinkConfig config) {
this.config = config;
}

@Override
public LogicalGraphLayoutFactory<EPGMGraphHead, EPGMVertex, EPGMEdge> getLayoutFactory() {
return layoutFactory;
}

@Override
public void setLayoutFactory(LogicalGraphLayoutFactory<EPGMGraphHead, EPGMVertex, EPGMEdge> layoutFactory) {
Objects.requireNonNull(layoutFactory);
@@ -16,16 +16,16 @@
package org.gradoop.flink.model.impl.functions.graphcontainment;

import org.apache.flink.api.common.functions.MapFunction;
import org.gradoop.common.model.api.entities.GraphElement;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMGraphElement;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;

/**
* Adds the given graph head identifier to the graph element.
*
* @param <GE> EPGM graph element
* @param <GE> graph element
*/
public class AddToGraph<GE extends EPGMGraphElement> implements MapFunction<GE, GE> {
public class AddToGraph<GE extends GraphElement> implements MapFunction<GE, GE> {
/**
* Graph head identifier which gets added to the graph element.
*/
@@ -36,7 +36,7 @@
*
* @param graphHead graph head used for updating
*/
public AddToGraph(EPGMGraphHead graphHead) {
public AddToGraph(GraphHead graphHead) {
this.graphHeadId = graphHead.getId();
}

0 comments on commit 183e31c

Please sign in to comment.
You can’t perform that action at this time.