Skip to content

Commit

Permalink
[FLINK-1520] [gelly] add types methods and make formatting changes to…
Browse files Browse the repository at this point in the history
… the graph csv reader

This squashes the following commits:

[FLINK-1520] [gelly] add named types methods for reading a Graph from CSV input,
with and without vertex/edge values. Change the examples and the tests accordingly.

[FLINK-1520] [gelly] corrections in Javadocs; updated documentation

This closes #1149
  • Loading branch information
vasia committed Sep 24, 2015
1 parent 702277f commit d01d369
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 273 deletions.
20 changes: 12 additions & 8 deletions docs/libs/gelly_guide.md
Expand Up @@ -104,20 +104,24 @@ DataSet<Tuple3<String, String, Double>> edgeTuples = env.readCsvFile("path/to/ed
Graph<String, Long, Double> graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env);
{% endhighlight %}

* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the CSV file containing edges data to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the optional CSV file containing vertices will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A `typesEdges()` method is called on the GraphCsvReader object returned by `fromCsvReader()` to inform the CsvReader of the types of the fields for Edges. If Edge doesn't have a value only type of Vertex Key is passed. `typesEdges()` method returns a GraphCsvReader on calling calling `typesVertices()` or `typesVerticesNullEdge()` returns the instance of Graph:
* from a CSV file of Edge data and an optional CSV file of Vertex data. In this case, Gelly will convert each row from the Edge CSV file to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field (if present) will be the edge value. Equivalently, each row from the optional Vertex CSV file will be converted to a `Vertex`, where the first field will be the vertex ID and the second field (if present) will be the vertex value. In order to get a `Graph` from a `GraphCsvReader` one has to specify the types, using one of the following methods:

{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<String, Long, NullValue> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env).typesEdges(String.class).typesVerticesNullEdge(String.class, Long.class);
{% endhighlight %}
- `types(Class<K> vertexKey, Class<VV> vertexValue,Class<EV> edgeValue)`: both vertex and edge values are present.
- `edgeTypes(Class<K> vertexKey, Class<EV> edgeValue)`: the Graph has edge values, but no vertex values.
- `vertexTypes(Class<K> vertexKey, Class<VV> vertexValue)`: the Graph has vertex values, but no edge values.
- `keyType(Class<K> vertexKey)`: the Graph has no vertex values and no edge values.

If Vertices don't have a value, overloaded `typesVerticesNullEdge()` or `typesVertices()` Method should be used.

{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<String, NullValue, Long> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env).typesEdges(String.class, Long.class).typesVerticesNullEdge(String.class);
// create a Graph with String Vertex IDs, Long Vertex values and Double Edge values
Graph<String, Long, Double> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env)
.types(String.class, Long.class, Double.class);


// create a Graph with no Vertex or Edge values
Graph<Long, NullValue, NullValue> simpleGraph = Graph.fromCsvReader("path/to/edge/input", env).keyType(Long.class);
{% endhighlight %}


Expand Down
Expand Up @@ -137,7 +137,7 @@ public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K,
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
final MapFunction<K, VV> mapper,ExecutionEnvironment context) {
final MapFunction<K, VV> mapper, ExecutionEnvironment context) {

return fromDataSet(context.fromCollection(edges), mapper, context);
}
Expand Down Expand Up @@ -282,48 +282,57 @@ public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K,
}

/**
* Creates a graph from CSV files.
*
* Vertices with value are created from a CSV file with 2 fields
* Edges with value are created from a CSV file with 3 fields
* @param verticesPath path to a CSV file with the Vertices data.
* @param edgesPath path to a CSV file with the Edges data
* @param context the flink execution environment.
* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , on which calling typesEdges() and typesVertices() methods to specify types of the
*Vertex ID, Vertex Value and Edge value returns a Graph
* Creates a Graph from a CSV file of vertices and a CSV file of edges.
*
* @param verticesPath path to a CSV file with the Vertex data.
* @param edgesPath path to a CSV file with the Edge data
* @param context the Flink execution environment.
* @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
* on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
*
* @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
* {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
* {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
* {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
*/
public static GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
public static GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
return new GraphCsvReader(verticesPath, edgesPath, context);
}
/** Creates a graph from a CSV file for Edges.Vertices are
* induced from the edges.
*
* Edges with value are created from a CSV file with 3 fields. Vertices are created
* automatically and their values are set to NullValue.

/**
* Creates a graph from a CSV file of edges. Vertices will be created automatically.
*
* @param edgesPath a path to a CSV file with the Edges data
* @param context the flink execution environment.
* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , on which calling typesEdges() and typesVertices() methods to specify types of the
* Vertex ID, Vertex Value and Edge value returns a Graph
* @param context the execution environment.
* @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
* on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
*
* @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
* {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
* {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
* {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
*/
public static GraphCsvReader fromCsvReader(String edgesPath, ExecutionEnvironment context) {
return new GraphCsvReader(edgesPath, context);
}

/**
*Creates a graph from a CSV file for Edges., Vertices are
* induced from the edges and vertex values are calculated by a mapper
* function. Edges with value are created from a CSV file with 3 fields.
* Vertices are created automatically and their values are set by applying the provided map
* function to the vertex ids.
/**
* Creates a graph from a CSV file of edges. Vertices will be created automatically and
* Vertex values are set by the provided mapper.
*
* @param edgesPath a path to a CSV file with the Edges data
* @param edgesPath a path to a CSV file with the Edge data
* @param mapper the mapper function.
* @param context the flink execution environment.
* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} ,on which calling typesEdges() and typesVertices() methods to specify types of the
* Vertex ID, Vertex Value and Edge value returns a Graph
* @param context the execution environment.
* @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
* on which calling methods to specify types of the Vertex ID, Vertex Value and Edge value returns a Graph.
*
* @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
* {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
* {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
* {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
*/
public static GraphCsvReader fromCsvReader(String edgesPath, final MapFunction mapper, ExecutionEnvironment context) {
public static <K, VV> GraphCsvReader fromCsvReader(String edgesPath,
final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
return new GraphCsvReader(edgesPath, mapper, context);
}

Expand Down Expand Up @@ -412,7 +421,6 @@ public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
}
}


/**
* Apply a function to the attribute of each vertex in the graph.
*
Expand Down Expand Up @@ -1898,7 +1906,7 @@ public Tuple2<K, VV> reduce(Tuple2<K, VV> first, Tuple2<K, VV> second) throws Ex
* the function to apply to the neighborhood
* @param direction
* the edge direction (in-, out-, all-)
* @return a Dataset containing one value per vertex(vertex key, aggegate edge value)
* @return a Dataset containing one value per vertex(vertex key, aggregate edge value)
* @throws IllegalArgumentException
*/
public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<EV> reduceEdgesFunction,
Expand Down

0 comments on commit d01d369

Please sign in to comment.