Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2785] [gelly] implements fromCsvReader for gelly-scala #1205

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion docs/libs/gelly_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ Graph<String, Long, Double> graph = Graph.fromCsvReader("path/to/vertex/input",
.types(String.class, Long.class, Double.class);


// create a Graph with no Vertex or Edge values
// create a Graph with neither Vertex nor Edge values
Graph<Long, NullValue, NullValue> simpleGraph = Graph.fromCsvReader("path/to/edge/input", env).keyType(Long.class);
{% endhighlight %}
</div>
Expand All @@ -193,6 +193,28 @@ val edgeTuples = env.readCsvFile[String, String, Double]("path/to/edge/input")

val graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env)
{% endhighlight %}

* from a CSV file of Edge data and an optional CSV file of Vertex data.
In this case, Gelly will convert each row from the Edge CSV file to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field (if present) will be the edge value. If the edges have no associated value, set the `hasEdgeValues` parameter to `false`. The parameter `readVertices` defines whether vertex data are provided. If `readVertices` is set to `true`, then `pathVertices` must be specified. In this case, each row from the Vertex CSV file will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. If `readVertices` is set to false, then Vertex data will be ignored and vertices will be automatically created from the edges input.

{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment

// create a Graph with String Vertex IDs, Long Vertex values and Double Edge values
val graph = Graph.fromCsvReader[String, Long, Double](
readVertices = true,
pathVertices = "path/to/vertex/input",
pathEdges = "path/to/edge/input",
env = env)


// create a Graph with neither Vertex nor Edge values
val simpleGraph = Graph.fromCsvReader[Long, NullValue, NullVale](
readVertices = false,
pathEdges = "path/to/edge/input",
hasEdgeValues = false,
env = env)
{% endhighlight %}
</div>
</div>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,133 @@ object Graph {
wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
}

/**
* Creates a Graph with from a CSV file of vertices and a CSV file of edges
*
* @param The Execution Environment.
* @param pathEdges The file path containing the edges.
* @param readVertices Defines whether the vertices have associated values.
* If set to false, the vertex input is ignored and vertices are created from the edges file.
* @param pathVertices The file path containing the vertices.
* @param hasEdgeValues Defines whether the edges have associated values. True by default.
* @param lineDelimiterVertices The string that separates lines in the vertices file.
* It defaults to newline.
* @param fieldDelimiterVertices The string that separates vertex Ids from vertex values
* in the vertices file.
* @param quoteCharacterVertices The character to use for quoted String parsing
* in the vertices file. Disabled by default.
* @param ignoreFirstLineVertices Whether the first line in the vertices file should be ignored.
* @param ignoreCommentsVertices Lines that start with the given String in the vertices file
* are ignored, disabled by default.
* @param lenientVertices Whether the parser should silently ignore malformed lines in the
* vertices file.
* @param includedFieldsVertices The fields in the vertices file that should be read.
* By default all fields are read.
* @param lineDelimiterEdges The string that separates lines in the edges file.
* It defaults to newline.
* @param fieldDelimiterEdges The string that separates fields in the edges file.
* @param quoteCharacterEdges The character to use for quoted String parsing
* in the edges file. Disabled by default.
* @param ignoreFirstLineEdges Whether the first line in the vertices file should be ignored.
* @param ignoreCommentsEdges Lines that start with the given String in the edges file
* are ignored, disabled by default.
* @param lenientEdges Whether the parser should silently ignore malformed lines in the
* edges file.
* @param includedFieldsEdges The fields in the edges file that should be read.
* By default all fields are read.
* @param mapper If no vertex values are provided, this mapper can be used to initialize them.
*
*/
// scalastyle:off
// This method exceeds the max allowed number of parameters -->
def fromCsvReader[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
EV: TypeInformation : ClassTag](
env: ExecutionEnvironment,
pathEdges: String,
readVertices: Boolean,
pathVertices: String = null,
hasEdgeValues: Boolean = true,
lineDelimiterVertices: String = "\n",
fieldDelimiterVertices: String = ",",
quoteCharacterVertices: Character = null,
ignoreFirstLineVertices: Boolean = false,
ignoreCommentsVertices: String = null,
lenientVertices: Boolean = false,
includedFieldsVertices: Array[Int] = null,
lineDelimiterEdges: String = "\n",
fieldDelimiterEdges: String = ",",
quoteCharacterEdges: Character = null,
ignoreFirstLineEdges: Boolean = false,
ignoreCommentsEdges: String = null,
lenientEdges: Boolean = false,
includedFieldsEdges: Array[Int] = null,
mapper: MapFunction[K, VV] = null) = {

// with vertex and edge values
if (readVertices && hasEdgeValues) {
if (pathVertices.equals(null)) {
throw new IllegalArgumentException(
"The vertices file path must be specified when readVertices is true.")
} else {
val vertices = env.readCsvFile[(K, VV)](pathVertices, lineDelimiterVertices,
fieldDelimiterVertices, quoteCharacterVertices, ignoreFirstLineVertices,
ignoreCommentsVertices, lenientVertices, includedFieldsVertices)

val edges = env.readCsvFile[(K, K, EV)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
includedFieldsEdges)

fromTupleDataSet[K, VV, EV](vertices, edges, env)
}
}
// with vertex value and no edge value
else if (readVertices && (!hasEdgeValues)) {
if (pathVertices.equals(null)) {
throw new IllegalArgumentException(
"The vertices file path must be specified when readVertices is true.")
} else {
val vertices = env.readCsvFile[(K, VV)](pathVertices, lineDelimiterVertices,
fieldDelimiterVertices, quoteCharacterVertices, ignoreFirstLineVertices,
ignoreCommentsVertices, lenientVertices, includedFieldsVertices)

val edges = env.readCsvFile[(K, K)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))

fromTupleDataSet[K, VV, NullValue](vertices, edges, env)
}
}
// with edge value and no vertex value
else if ((!readVertices) && hasEdgeValues) {
val edges = env.readCsvFile[(K, K, EV)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges, lenientEdges,
includedFieldsEdges)

// initializer provided
if (mapper != null) {
fromTupleDataSet[K, VV, EV](edges, env, mapper)
}
else {
fromTupleDataSet[K, EV](edges, env)
}
}
// with no edge value and no vertex value
else {
val edges = env.readCsvFile[(K, K)](pathEdges, lineDelimiterEdges, fieldDelimiterEdges,
quoteCharacterEdges, ignoreFirstLineEdges, ignoreCommentsEdges,
lenientEdges, includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))

// no initializer provided
if (mapper != null) {
fromTupleDataSet[K, VV, NullValue](edges, env, mapper)
}
else {
fromTupleDataSet[K, NullValue](edges, env)
}
}
}
// scalastyle:on

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {

override def isExcludedByName(method: Method): Boolean = {
val name = method.getDeclaringClass.getName + "." + method.getName
val excludedNames = Seq("org.apache.flink.graph.Graph.getContext",
// NOTE: until fromCsvReader() is added to to the Scala API Graph
"org.apache.flink.graph.Graph.fromCsvReader")
val excludedNames = Seq("org.apache.flink.graph.Graph.getContext")
excludedNames.contains(name)
}

Expand Down
Loading