Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4647] Read bipartite graph #2987

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
137 changes: 135 additions & 2 deletions docs/dev/libs/gelly/bipartite_graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ DataSet<Vertex<String, Long>> topVertices = ...

DataSet<Vertex<String, Long>> bottomVertices = ...

DataSet<Edge<String, String, Double>> edges = ...
DataSet<BipartiteEdge<String, String, Double>> edges = ...

Graph<String, String, Long, Long, Double> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
{% endhighlight %}
Expand All @@ -100,6 +100,139 @@ Graph<String, String, Long, Long, Double> graph = BipartiteGraph.fromDataSet(top
</div>


* from a `DataSet` of `Tuple2` representing the edges. Gelly will convert each `Tuple2` to a `BipartiteEdge`, where the first field will be the top vertex ID, and the second field will be the bottom vertex ID. Both vertex values and edges values will be set to `NullValue`.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<String, String>> edges = ...

BipartiteGraph<String, String, NullValue, NullValue, NullValue> graph = BipartiteGraph.fromTuple2DataSet(edges, env);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
// TODO: Should be added when Scala interface is implemented
{% endhighlight %}
</div>
</div>

* from a `DataSet` of `Tuple3` and an optional `DataSet`s of `Tuple2`. In this case, Gelly will convert each `Tuple3` to a `BipartiteEdge`, where the first field will be the top vertex ID, the second field will be the bottom vertex ID and the third field will be the edge value. Equivalently, each `Tuple2` will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<Long, String>> topVertexTuples = env.readCsvFile("path/to/top/vertex/input").types(Long.class, String.class);
DataSet<Tuple2<Long, String>> bottomVertexTuples = env.readCsvFile("path/to/bottom/vertex/input").types(Long.class, String.class);

DataSet<Tuple3<Long, Long, Double>> edgeTuples = env.readCsvFile("path/to/edge/input").types(String.class, String.class, Double.class);

Graph<Long, Long, String, String, Double> graph = BipartiteGraph.fromTupleDataSet(topVertexTuples, bottomVertexTuples, edgeTuples, env);
{% endhighlight %}

* from a CSV file of Edge data and an optional CSV file of Vertex data. In this case, Gelly will convert each row from the Edge CSV file to a `BipartiteEdge`, where the first field will be the top vertex ID, the second field will be the bottom vertex ID and the third field (if present) will be the edge value. Equivalently, each row from the optional Vertex CSV files will be converted to a `Vertex`, where the first field will be the vertex ID and the second field (if present) will be the vertex value. In order to get a `BipartieGraph` from a `BipartiteGraphCsvReader` one has to specify the types, using one of the following methods:

- `types(Class<KT> topVertexKey, Class<KB> bottomVertexKey, Class<VVT> topVertexValue, Class<VVB> bottomVertexValue, Class<EV> edgeValue)`: both vertex and edge values are present.
- `edgeTypes(Class<KT> topVertexKey, Class<KB> bottomVertexKey, Class<EV> edgeValue)`: the Graph has edge values, but no vertex values.
- `vertexTypes(Class<KT> topVertexKey, Class<KB> bottomVertexKey, Class<VVT> topVertexValue, Class<VVB> bottomVertexValue)`: the Graph has vertex values, but no edge values.
- `keyType(Class<KT> topVertexKey, Class<KB> bottomVertexKey)`: the Graph has no vertex values and no edge values.

{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create a BipartiteGraph with Long Vertex IDs, String Vertex values and Double Edge values
BipartiteGraph<String, String, Long, Long, Double> graph = BipartiteGraph.fromCsvReader("path/to/top/vertex/input", "path/to/bottom/vertex/input", "path/to/edge/input", env)
.types(Long.class, Long.class, String.class, String.class Double.class);


// create a BipartiteGraph with neither Vertex nor Edge values
BipartiteGraph<Long, String.class, NullValue, NullValue, NullValue> simpleGraph
= BipartiteGraph.fromCsvReader("path/to/edge/input", env).keyType(Long.class, String.class);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
// TODO: Add Scala examples
{% endhighlight %}
</div>
</div>

* from a CSV file of Edge data and an optional CSV file of Vertex data.
In this case, Gelly will convert each row from the Edge CSV file to an `BipartiteEdge`.
The first field of the each row will be the top vertex ID, the second field will be the bottom vertex ID and the third field (if present) will be the edge value.
If the edges have no associated value, set the edge value type parameter (3rd type argument) to `NullValue`.
You can also specify that the vertices are initialized with a vertex value.
If you provide a path to a CSV file via `pathVertices`, each row of this file will be converted to a `Vertex`.
The first field of each row will be the vertex ID and the second field will be the vertex value.
If you provide a vertex value initializer two `MapFunction`s via the `topVertexValueInitializer` and `bottomVertexValueInitializer` parameters, then these function are used to generate the vertex values.
The set of vertices will be created automatically from the edges input.
If the vertices have no associated value, set the vertex value type parameter (2nd type argument) to `NullValue`.
The vertices will then be automatically created from the edges input with vertex value of type `NullValue`.

{% highlight scala %}
// Scala API is not yet supported
{% endhighlight %}
</div>
</div>


* from a `Collection` of edges and an optional `Collection`s of top and bottom vertices:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

List<Vertex<Long, String>> topVertexList = new ArrayList...
List<Vertex<Long, String>> bottomVertexList = new ArrayList...

List<BipartiteEdge<Long, Long, String>> edgeList = new ArrayList...

Graph<Long, Long, String> graph = BipartiteGraph.fromCollection(topVertexList, bottomVertexList, edgeList, env);
{% endhighlight %}

If no vertex input is provided during BipartiteGraph creation, Gelly will automatically produce the `Vertex` `DataSet` from the edge input. In this case, the created vertices will have no values. Alternatively, you can provide two `MapFunction`s as arguments to the creation method, in order to initialize the top and bottom `Vertex` values:

{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// initialize the vertex value to be equal to the vertex ID
Graph<Long, Long, Long, Long, String> graph = BipartiteGraph.fromCollection(edgeList,
new MapFunction<Long, Long>() {
public Long map(Long value) {
return value;
}
},
new MapFunction<Long, Long>() {
public Long map(Long value) {
return value;
}
},
env);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
// TODO: Add Scala support
{% endhighlight %}

If no vertex input is provided during Graph creation, Gelly will automatically produce the `Vertex` `DataSet` from the edge input. In this case, the created vertices will have no values. Alternatively, you can provide a `MapFunction` as an argument to the creation method, in order to initialize the `Vertex` values:

{% highlight java %}
// TODO: Add Scala support
{% endhighlight %}
</div>
</div>

{% top %}

Graph Transformations
---------------------

Expand All @@ -126,7 +259,7 @@ DataSet<Vertex<Long, String>> bottomVertices = ...

// Edge that connect vertex 2 to vertex 1 and vertex 4 to vertex 1:
// (1, 2, "1-2-edge"); (1, 4, "1-4-edge")
DataSet<Edge<Long, Long, String>> edges = ...
DataSet<BipartiteEdge<Long, Long, String>> edges = ...

BipartiteGraph<Long, Long, String, String, String> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class Graph<K, VV, EV> {
*
* @param vertices a DataSet of vertices.
* @param edges a DataSet of edges.
* @param context the flink execution environment.
* @param context the Flink execution environment.
*/
private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
this.vertices = vertices;
Expand All @@ -108,7 +108,7 @@ private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, Execu
*
* @param vertices a Collection of vertices.
* @param edges a Collection of edges.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices,
Expand All @@ -124,7 +124,7 @@ public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, V
* NullValue.
*
* @param edges a Collection of edges.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges,
Expand All @@ -141,7 +141,7 @@ public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K,
* @param edges a Collection of edges.
* @param vertexValueInitializer a map function that initializes the vertex values.
* It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
Expand All @@ -155,7 +155,7 @@ public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>
*
* @param vertices a DataSet of vertices.
* @param edges a DataSet of edges.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
Expand All @@ -170,7 +170,7 @@ public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> ve
* NullValue.
*
* @param edges a DataSet of edges.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
Expand Down Expand Up @@ -205,7 +205,7 @@ public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
* @param edges a DataSet of edges.
* @param vertexValueInitializer the mapper function that initializes the vertex values.
* It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Edge<K, EV>> edges,
Expand Down Expand Up @@ -262,7 +262,7 @@ public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
*
* @param vertices a DataSet of Tuple2 representing the vertices.
* @param edges a DataSet of Tuple3 representing the edges.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV>> vertices,
Expand All @@ -289,7 +289,7 @@ public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV
* Vertices are created automatically and their values are set to NullValue.
*
* @param edges a DataSet of Tuple3 representing the edges.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
Expand All @@ -315,7 +315,7 @@ public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K,
* @param edges a DataSet of Tuple3.
* @param vertexValueInitializer the mapper function that initializes the vertex values.
* It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
Expand All @@ -336,7 +336,7 @@ public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K,
* Edge value types and Vertex values types will be set to NullValue.
*
* @param edges a DataSet of Tuple2.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K> Graph<K, NullValue, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
Expand All @@ -361,7 +361,7 @@ public static <K> Graph<K, NullValue, NullValue> fromTuple2DataSet(DataSet<Tuple
* and the second field corresponds to the target ID.
* @param vertexValueInitializer the mapper function that initializes the vertex values.
* It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the flink execution environment.
* @param context the Flink execution environment.
* @return the newly created graph.
*/
public static <K, VV> Graph<K, VV, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
Expand Down Expand Up @@ -431,7 +431,7 @@ public static <K, VV> GraphCsvReader fromCsvReader(String edgesPath,
}

/**
* @return the flink execution environment.
* @return the Flink execution environment.
*/
public ExecutionEnvironment getContext() {
return this.context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
* such as whether to skip the initial line as the header.
* The configuration is done using the functions provided in the {@link org.apache.flink.api.java.io.CsvReader} class.
*/

public class GraphCsvReader {

@SuppressWarnings("unused")
Expand Down Expand Up @@ -280,8 +279,8 @@ public GraphCsvReader parseQuotedStringsEdges(char quoteCharacter) {
* @return The GraphCSVReader instance itself, to allow for fluent function chaining.
*/
public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) {
if(this.vertexReader != null) {
this.vertexReader.parseQuotedStrings(quoteCharacter);
if (vertexReader != null) {
vertexReader.parseQuotedStrings(quoteCharacter);
}
return this;
}
Expand All @@ -295,8 +294,8 @@ public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) {
* @return The GraphCSVReader instance itself, to allow for fluent function chaining.
*/
public GraphCsvReader ignoreCommentsVertices(String commentPrefix) {
if(this.vertexReader != null) {
this.vertexReader.ignoreComments(commentPrefix);
if (vertexReader != null) {
vertexReader.ignoreComments(commentPrefix);
}
return this;
}
Expand Down Expand Up @@ -327,8 +326,8 @@ public GraphCsvReader ignoreCommentsEdges(String commentPrefix) {
* @return The GraphCSVReader instance itself, to allow for fluent function chaining.
*/
public GraphCsvReader includeFieldsVertices(boolean ... vertexFields) {
if(this.vertexReader != null) {
this.vertexReader.includeFields(vertexFields);
if (vertexReader != null) {
vertexReader.includeFields(vertexFields);
}
return this;
}
Expand Down Expand Up @@ -364,8 +363,8 @@ public GraphCsvReader includeFieldsEdges(boolean ... edgeFields) {
* @return The GraphCSVReader instance itself, to allow for fluent function chaining.
*/
public GraphCsvReader includeFieldsVertices(String mask) {
if(this.vertexReader != null) {
this.vertexReader.includeFields(mask);
if (vertexReader != null) {
vertexReader.includeFields(mask);
}
return this;
}
Expand Down Expand Up @@ -408,8 +407,8 @@ public GraphCsvReader includeFieldsEdges(String mask) {
* @return The GraphCSVReader instance itself, to allow for fluent function chaining.
*/
public GraphCsvReader includeFieldsVertices(long mask) {
if(this.vertexReader != null) {
this.vertexReader.includeFields(mask);
if (vertexReader != null) {
vertexReader.includeFields(mask);
}
return this;
}
Expand Down Expand Up @@ -454,8 +453,8 @@ public GraphCsvReader ignoreFirstLineEdges() {
* @return The GraphCSVReader instance itself, to allow for fluent function chaining.
*/
public GraphCsvReader ignoreFirstLineVertices() {
if(this.vertexReader != null) {
this.vertexReader.ignoreFirstLine();
if (vertexReader != null) {
vertexReader.ignoreFirstLine();
}
return this;
}
Expand All @@ -478,9 +477,10 @@ public GraphCsvReader ignoreInvalidLinesEdges() {
* @return The GraphCSVReader instance itself, to allow for fluent function chaining.
*/
public GraphCsvReader ignoreInvalidLinesVertices() {
if(this.vertexReader != null) {
this.vertexReader.ignoreInvalidLines();
if (vertexReader != null) {
vertexReader.ignoreInvalidLines();
}
return this;
}

}