Skip to content
Permalink
Browse files

[#1411] added methods TemporalGraph.fromGraph(...) with TimeExtractors (

#1432)

fixes #1411
  • Loading branch information
xnqio committed Feb 10, 2020
1 parent a32d88c commit 27b12fa5c6e36c466335af69d5b4f0e2a95b0cab
@@ -184,7 +184,7 @@ public static TemporalGraph getTemporalGraph(GradoopFlinkConfig config) {
LogicalGraph networkGraph = loader.getLogicalGraph();

// transform to temporal graph by extracting time intervals from vertices
return TemporalGraph.fromLogicalGraph(networkGraph)
return TemporalGraph.fromGraph(networkGraph)
.transformVertices(TemporalCitiBikeGraph::extractTripPeriod);
}

@@ -18,8 +18,12 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.io.impl.gdl.GDLConsoleOutput;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.api.epgm.BaseGraphFactory;
import org.gradoop.flink.model.api.layouts.LogicalGraphLayout;
@@ -31,9 +35,10 @@
import org.gradoop.flink.model.impl.operators.tostring.functions.GraphHeadToEmptyString;
import org.gradoop.temporal.io.api.TemporalDataSink;
import org.gradoop.temporal.model.api.TemporalGraphOperators;
import org.gradoop.temporal.model.api.functions.TimeIntervalExtractor;
import org.gradoop.temporal.model.impl.functions.tpgm.TemporalEdgeToEdge;
import org.gradoop.temporal.model.impl.functions.tpgm.TemporalGraphHeadToGraphHead;
import org.gradoop.temporal.model.impl.functions.tpgm.TemporalVertexToVertex;
import org.gradoop.temporal.model.impl.functions.tpgm.TemporalGraphHeadToGraphHead;
import org.gradoop.temporal.model.impl.operators.tostring.TemporalEdgeToDataString;
import org.gradoop.temporal.model.impl.operators.tostring.TemporalGraphHeadToDataString;
import org.gradoop.temporal.model.impl.operators.tostring.TemporalVertexToDataString;
@@ -213,16 +218,56 @@ public LogicalGraph toLogicalGraph() {
}

/**
* Convenience API function to create a {@link TemporalGraph} from an existing {@link LogicalGraph} with
* Convenience API function to create a {@link TemporalGraph} from an existing {@link BaseGraph} with
* default values for the temporal attributes.
*
* @param logicalGraph the existing logical graph instance
* @param baseGraph the existing graph instance
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The type of the graph.
* @param <GC> The type of the Graph collection
* @return a temporal graph with default temporal values
* @see TemporalGraphFactory#fromNonTemporalGraph(BaseGraph)
*/
public static TemporalGraph fromLogicalGraph(LogicalGraph logicalGraph) {
return TemporalGradoopConfig.fromGradoopFlinkConfig(logicalGraph.getConfig()).getTemporalGraphFactory()
.fromNonTemporalGraph(logicalGraph);
public static <
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> TemporalGraph fromGraph(LG baseGraph) {
return TemporalGradoopConfig.fromGradoopFlinkConfig(baseGraph.getConfig()).getTemporalGraphFactory()
.fromNonTemporalGraph(baseGraph);
}

/**
* Function to create a {@link TemporalGraph} from an existing {@link BaseGraph} with valid times
* depending on the three given {@link TimeIntervalExtractor} instances
*
* @param baseGraph the existing Graph
* @param graphTimeExtractor mapFunction to generate valid times for graphHead
* @param vertexTimeExtractor mapFunction to generate valid times for vertices
* @param edgeTimeExtractor mapFunction to generate valid times for edges
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The type of the graph.
* @param <GC> The type of the Graph collection
* @return a temporal graph with new valid time values
*/
public static <
G extends GraphHead,
V extends Vertex, E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> TemporalGraph fromGraph(LG baseGraph,
TimeIntervalExtractor<G> graphTimeExtractor,
TimeIntervalExtractor<V> vertexTimeExtractor,
TimeIntervalExtractor<E> edgeTimeExtractor) {
TemporalGradoopConfig temporalGradoopConfig = TemporalGradoopConfig.fromGradoopFlinkConfig(
baseGraph.getConfig());
return temporalGradoopConfig.getTemporalGraphFactory().fromNonTemporalDataSets(
baseGraph.getGraphHead(), graphTimeExtractor, baseGraph.getVertices(), vertexTimeExtractor,
baseGraph.getEdges(), edgeTimeExtractor);
}

/**
@@ -17,7 +17,11 @@

import org.apache.flink.api.java.DataSet;
import org.apache.flink.util.Preconditions;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.io.impl.gdl.GDLConsoleOutput;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.api.epgm.BaseGraphFactory;
@@ -32,9 +36,10 @@
import org.gradoop.flink.util.GradoopFlinkConfig;
import org.gradoop.temporal.io.api.TemporalDataSink;
import org.gradoop.temporal.model.api.TemporalGraphCollectionOperators;
import org.gradoop.temporal.model.api.functions.TimeIntervalExtractor;
import org.gradoop.temporal.model.impl.functions.tpgm.TemporalEdgeToEdge;
import org.gradoop.temporal.model.impl.functions.tpgm.TemporalGraphHeadToGraphHead;
import org.gradoop.temporal.model.impl.functions.tpgm.TemporalVertexToVertex;
import org.gradoop.temporal.model.impl.functions.tpgm.TemporalGraphHeadToGraphHead;
import org.gradoop.temporal.model.impl.operators.tostring.TemporalEdgeToDataString;
import org.gradoop.temporal.model.impl.operators.tostring.TemporalGraphHeadToDataString;
import org.gradoop.temporal.model.impl.operators.tostring.TemporalVertexToDataString;
@@ -224,15 +229,63 @@ public GraphCollection toGraphCollection() {

/**
* Convenience API function to create a {@link TemporalGraphCollection} from an existing
* {@link GraphCollection} with default values for the temporal attributes.
* {@link BaseGraphCollection} with default values for the temporal attributes.
*
* @param baseGraphCollection the existing {@link BaseGraphCollection} instance
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The type of the graph.
* @param <GC> The type of the graph collection.
* @return a temporal graph collection with default temporal values
* @see TemporalGraphCollectionFactory#fromNonTemporalGraphCollection(BaseGraphCollection)
*/
public static <
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>>
TemporalGraphCollection fromGraphCollection(GC baseGraphCollection) {
return TemporalGradoopConfig.fromGradoopFlinkConfig(baseGraphCollection.getConfig())
.getTemporalGraphCollectionFactory().fromNonTemporalGraphCollection(baseGraphCollection);
}

/**
* Convenience API function to create a {@link TemporalGraphCollection} from an existing
* {@link BaseGraphCollection} with valid times depending on the three given
* {@link TimeIntervalExtractor} functions
*
* @param graphCollection the existing graph collection instance
* @return a temporal graph colection with default temporal values
* @param baseGraphCollection the existing BaseGraphCollection instance
* @param graphTimeExtractor the time extractor function for the Graph Heads in the
* graph collection instance
* @param vertexTimeExtractor the time extractor function for the vertices in the graph
* collection instance
* @param edgeTimeExtractor the time extractor function for the edges in the graph
* collection instance
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The type of the graph.
* @param <GC> The type of the graph collection.
* @return a temporal graph collection with assigned valid time values
* @see TemporalGraphCollectionFactory#fromNonTemporalGraphCollection(BaseGraphCollection)
*/
public static TemporalGraphCollection fromGraphCollection(GraphCollection graphCollection) {
return TemporalGradoopConfig.fromGradoopFlinkConfig(graphCollection.getConfig())
.getTemporalGraphCollectionFactory().fromNonTemporalGraphCollection(graphCollection);
public static <
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>>
TemporalGraphCollection fromGraphCollection(GC baseGraphCollection,
TimeIntervalExtractor<G> graphTimeExtractor,
TimeIntervalExtractor<V> vertexTimeExtractor,
TimeIntervalExtractor<E> edgeTimeExtractor) {
TemporalGradoopConfig temporalGradoopConfig = TemporalGradoopConfig.
fromGradoopFlinkConfig(baseGraphCollection.getConfig());
return temporalGradoopConfig.getTemporalGraphCollectionFactory().fromNonTemporalDataSets(
baseGraphCollection.getGraphHeads(), graphTimeExtractor, baseGraphCollection.getVertices(),
vertexTimeExtractor, baseGraphCollection.getEdges(), edgeTimeExtractor);
}

/**
@@ -20,6 +20,7 @@
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.temporal.io.api.TemporalDataSink;
import org.gradoop.temporal.io.api.TemporalDataSource;
@@ -29,6 +30,7 @@
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import org.gradoop.temporal.util.TemporalGradoopTestBase;
import org.gradoop.temporal.util.TemporalGradoopTestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -224,7 +226,7 @@ public void testToGraphCollection() throws Exception {
}

/**
* Test the {@link TemporalGraphCollection#fromGraphCollection(GraphCollection)} method.
* Test the {@link TemporalGraphCollection#fromGraphCollection(BaseGraphCollection)} method.
*/
@Test
public void testFromGraphCollection() throws Exception {
@@ -262,4 +264,27 @@ public void testFromGraphCollection() throws Exception {
loadedVertices.forEach(this::checkDefaultTemporalElement);
loadedEdges.forEach(this::checkDefaultTemporalElement);
}

/**
* Test the
* {@link TemporalGraphCollection#fromGraphCollection} method with TimeInterval
* extractors as parameters
*
* @throws Exception if loading the graph from the csv data source fails
*/
@Test
public void testFromGraphCollectionWithTimeExtractors() throws Exception {

String path = getFilePath("/data/csv/socialnetwork/");
TemporalCSVDataSource csvDataSource = new TemporalCSVDataSource(path, getConfig());
TemporalGraphCollection expected = csvDataSource.getTemporalGraphCollection();
GraphCollection graphCollection = getTemporalSocialNetworkLoader().getGraphCollection();

TemporalGraphCollection check = TemporalGraphCollection.fromGraphCollection(graphCollection,
g -> TemporalGradoopTestUtils.extractTime(g),
v -> TemporalGradoopTestUtils.extractTime(v),
e -> TemporalGradoopTestUtils.extractTime(e));

collectAndAssertTrue(check.equalsByGraphElementData(expected));
}
}
@@ -20,10 +20,14 @@
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.combination.ReduceCombination;
import org.gradoop.temporal.io.api.TemporalDataSource;
import org.gradoop.temporal.io.impl.csv.TemporalCSVDataSource;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import org.gradoop.temporal.util.TemporalGradoopTestBase;
import org.gradoop.temporal.util.TemporalGradoopTestUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@@ -156,11 +160,13 @@ public void testToLogicalGraph() throws Exception {
}

/**
* Test the {@link TemporalGraph#fromLogicalGraph(LogicalGraph)} method.
* Test the {@link TemporalGraph#fromGraph} method.
*
* @throws Exception if loading the graph fails
*/
@Test
public void testFromLogicalGraph() throws Exception {
TemporalGraph temporalGraph = TemporalGraph.fromLogicalGraph(testLogicalGraph);
public void testFromGraph() throws Exception {
TemporalGraph temporalGraph = TemporalGraph.fromGraph(testLogicalGraph);

Collection<TemporalGraphHead> loadedGraphHeads = new ArrayList<>();
Collection<TemporalVertex> loadedVertices = new ArrayList<>();
@@ -194,4 +200,27 @@ public void testFromLogicalGraph() throws Exception {
loadedVertices.forEach(this::checkDefaultTemporalElement);
loadedEdges.forEach(this::checkDefaultTemporalElement);
}

/**
* Test the {@link TemporalGraph#fromGraph} method with TimeInterval Extractors as parameters
*
* @throws Exception if loading the graph from the csv data source fails
*/
@Test
public void testFromGraphWithTimeIntervalExtractors() throws Exception {

String path = getFilePath("/data/csv/socialnetwork/");
TemporalDataSource csvDataSource = new TemporalCSVDataSource(path, getConfig());
TemporalGraphCollection temporalGraphCollection = csvDataSource.getTemporalGraphCollection();
TemporalGraph expected = temporalGraphCollection.reduce(new ReduceCombination<>());
LogicalGraph logicalGraph =
getTemporalSocialNetworkLoader().getGraphCollection().reduce(new ReduceCombination<>());

TemporalGraph check = TemporalGraph.fromGraph(logicalGraph,
g -> TemporalGradoopTestUtils.extractTime(g),
v -> TemporalGradoopTestUtils.extractTime(v),
e -> TemporalGradoopTestUtils.extractTime(e));

collectAndAssertTrue(check.equalsByElementData(expected));
}
}
@@ -233,7 +233,7 @@ public void testWithMinAggregation() throws Exception {
@Test
public void testDurationWithDefaultValues() throws Exception {
LogicalGraph logicalGraph = getSocialNetworkLoader().getLogicalGraph();
TemporalGraph temporalGraph = TemporalGraph.fromLogicalGraph(logicalGraph);
TemporalGraph temporalGraph = TemporalGraph.fromGraph(logicalGraph);
temporalGraph = temporalGraph.aggregate(
new MinDuration("minDur", VALID_TIME),
new MaxDuration("maxDur", VALID_TIME));
@@ -0,0 +1,13 @@
5db060a0d5be072518839393;[5db060a0d5be072518839378,5db060a0d5be072518839376];5db060a0d5be072518839380;5db060a0d5be07251883937f;knows;1543600000000|2014;(1571840160143,9223372036854775807),(1543600000000,9223372036854775807)
5db060a0d5be072518839394;[5db060a0d5be072518839377];5db060a0d5be072518839384;5db060a0d5be072518839381;knows;1543900000000|2015;(1571840160143,9223372036854775807),(1543900000000,9223372036854775807)
5db060a0d5be072518839391;[5db060a0d5be072518839378,5db060a0d5be072518839376];5db060a0d5be07251883937f;5db060a0d5be072518839380;knows;1543600000000|2014;(1571840160143,9223372036854775807),(1543600000000,9223372036854775807)
5db060a0d5be072518839392;[5db060a0d5be072518839376];5db060a0d5be072518839383;5db060a0d5be072518839380;knows;1543900000000|2015;(1571840160143,9223372036854775807),(1543900000000,9223372036854775807)
5db060a0d5be072518839397;[5db060a0d5be072518839377,5db060a0d5be072518839378];5db060a0d5be072518839382;5db060a0d5be072518839381;knows;1543700000000|2014;(1571840160143,9223372036854775807),(1543700000000,9223372036854775807)
5db060a0d5be072518839398;[5db060a0d5be072518839378];5db060a0d5be072518839380;5db060a0d5be072518839381;knows;1543800000000|2013;(1571840160143,9223372036854775807),(1543800000000,9223372036854775807)
5db060a0d5be072518839395;[5db060a0d5be072518839377,5db060a0d5be072518839378,5db060a0d5be072518839379];5db060a0d5be072518839381;5db060a0d5be072518839382;knows;1543700000000|2014;(1571840160143,9223372036854775807),(1543700000000,9223372036854775807)
5db060a0d5be072518839396;[5db060a0d5be072518839377];5db060a0d5be072518839384;5db060a0d5be072518839382;knows;1543900000000|2015;(1571840160143,9223372036854775807),(1543900000000,9223372036854775807)
5db060a0d5be07251883939b;[5db060a0d5be072518839379];5db060a0d5be07251883937e;5db060a0d5be072518839382;hasMember;1543600000000|1543800000000;(1571840160143,9223372036854775807),(1543600000000,1543800000000)
5db060a0d5be07251883939c;[5db060a0d5be072518839379];5db060a0d5be07251883937e;5db060a0d5be072518839381;hasMember;|;(1571840160143,9223372036854775807),(-9223372036854775808,9223372036854775807)
5db060a0d5be072518839399;[5db060a0d5be072518839378];5db060a0d5be072518839381;5db060a0d5be072518839380;knows;1543800000000|2013;(1571840160143,9223372036854775807),(1543800000000,9223372036854775807)
5db060a0d5be07251883939a;[5db060a0d5be072518839379];5db060a0d5be07251883937e;5db060a0d5be072518839382;hasModerator;1543700000000|1543800000000|2013;(1571840160143,9223372036854775807),(1543700000000,1543800000000)
5db060a0d5be072518839390;[5db060a0d5be072518839376];5db060a0d5be072518839383;5db060a0d5be07251883937f;knows;1543800000000|2013;(1571840160143,9223372036854775807),(1543800000000,9223372036854775807)
@@ -0,0 +1,4 @@
5db060a0d5be072518839377;Community;Hadoop|3;(1571840160140,9223372036854775807),(-9223372036854775808,9223372036854775807)
5db060a0d5be072518839378;Community;Graphs|4;(1571840160140,9223372036854775807),(-9223372036854775808,9223372036854775807)
5db060a0d5be072518839376;Community;Databases|3;(1571840160140,9223372036854775807),(-9223372036854775808,9223372036854775807)
5db060a0d5be072518839379;Forum;;(1571840160140,9223372036854775807),(-9223372036854775808,9223372036854775807)
@@ -0,0 +1,7 @@
v;Forum;title:string
e;hasModerator;__valFrom:long,__valTo:long,since:int
e;knows;__valFrom:long,since:int
v;Person;__valFrom:long,age:int,city:string,gender:string,locIP:string,name:string,speaks:string
g;Community;interest:string,vertexCount:int
g;Forum;
e;hasMember;__valFrom:long,__valTo:long
@@ -0,0 +1,7 @@
5db060a0d5be072518839383;[5db060a0d5be072518839376];Person;1543800000000|35|Dresden|f||Eve|English;(1571840160141,9223372036854775807),(1543800000000,9223372036854775807)
5db060a0d5be072518839384;[5db060a0d5be072518839377];Person;1543900000000|35|Berlin|m|127.0.0.1|Frank|;(1571840160141,9223372036854775807),(1543900000000,9223372036854775807)
5db060a0d5be072518839381;[5db060a0d5be072518839377,5db060a0d5be072518839378,5db060a0d5be072518839379];Person;1543600000000|30|Dresden|f||Carol|;(1571840160141,9223372036854775807),(1543600000000,9223372036854775807)
5db060a0d5be072518839382;[5db060a0d5be072518839377,5db060a0d5be072518839378,5db060a0d5be072518839379];Person;1543700000000|40|Dresden|m||Dave|;(1571840160141,9223372036854775807),(1543700000000,9223372036854775807)
5db060a0d5be07251883937f;[5db060a0d5be072518839378,5db060a0d5be072518839376];Person;1543400000000|20|Leipzig|f||Alice|;(1571840160141,9223372036854775807),(1543400000000,9223372036854775807)
5db060a0d5be072518839380;[5db060a0d5be072518839378,5db060a0d5be072518839376];Person;1543500000000|30|Leipzig|m||Bob|;(1571840160141,9223372036854775807),(1543500000000,9223372036854775807)
5db060a0d5be07251883937e;[5db060a0d5be072518839379];Forum;Graph Processing;(1571840160141,9223372036854775807),(-9223372036854775808,9223372036854775807)

0 comments on commit 27b12fa

Please sign in to comment.
You can’t perform that action at this time.