From 2933193529b73fb318d9647859c0dc9c89e8f9dc Mon Sep 17 00:00:00 2001 From: Shivani Date: Wed, 17 Jun 2015 15:37:36 +0200 Subject: [PATCH 1/7] [FLINK-1520]Read edges and vertices from CSV files --- docs/libs/gelly_guide.md | 9 + .../java/org/apache/flink/graph/Graph.java | 52 +++ .../apache/flink/graph/GraphCsvReader.java | 388 ++++++++++++++++++ .../test/operations/GraphCreationITCase.java | 183 ++++++--- .../GraphCreationWithMapperITCase.java | 107 +++-- 5 files changed, 659 insertions(+), 80 deletions(-) create mode 100644 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 1e9bf4831e69b..73e9b566b1d4b 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -104,6 +104,15 @@ DataSet> edgeTuples = env.readCsvFile("path/to/ed Graph graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env); {% endhighlight %} +* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the first CSV file to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the second CSV file will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A types() method is called on the GraphCsvReader object returned by fromCsvReader() to inform the CsvReader of the types of the fields : + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +Graph graph = Graph.fromCsvReader("path/to/vertex/input","path/to/edge/input",env).types(String.class, Long.class, Double.class); +{% endhighlight %} + + * from a `Collection` of edges and an optional `Collection` of vertices: {% highlight java %} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index ff279491d9711..2ea22b803dbe8 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -281,6 +281,58 @@ public static Graph fromTupleDataSet(DataSet Graph types(Class type0, Class type1, Class type2) + { + DataSet> edges = this.EdgeReader.types(type0,type0,type2); + if(path1!=null) + { + DataSet> vertices = this.VertexReader.types(type0,type1); + return Graph.fromTupleDataSet(vertices,edges,executionContext); + } + else + { + return Graph.fromTupleDataSet(edges,mapper,executionContext); + } + + + } + /** + * Specifies the types for the Graph fields and returns a Graph with those field types + * + * This method is overloaded for the case in which Vertices don't have a value + * + * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. + * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. + * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. + */ + public Graph types(Class type0, Class type1) + { + DataSet> edges = this.EdgeReader.types(type0,type0,type1); + return Graph.fromTupleDataSet(edges,executionContext); + } + + /** + *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices + * ({@code '\n'}) is used by default. + * + *@param delimiter The delimiter that separates the rows. + * @return The GraphCsv reader instance itself, to allow for fluent function chaining. + */ + public GraphCsvReader lineDelimiter(String delimiter) + { + this.EdgeReader.lineDelimiter(delimiter); + this.VertexReader.lineDelimiter(delimiter); + return this; + } + + /** + *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices + * ({@code ','}) is used by default. + * + * @param delimiter The delimiter that separates the fields in a row. + * @return The GraphCsv reader instance itself, to allow for fluent function chaining. + */ + @Deprecated + public GraphCsvReader fieldDelimiter(char delimiter) + { + this.EdgeReader.fieldDelimiter(delimiter); + this.VertexReader.fieldDelimiter(delimiter); + return this; + } + + + /** + *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices + * ({@code ','}) is used by default. + * + * @param delimiter The delimiter that separates the fields in a row. + * @return The GraphCsv reader instance itself, to allow for fluent function chaining. + */ + public GraphCsvReader fieldDelimiter(String delimiter) + { + this.EdgeReader.fieldDelimiter(delimiter); + this.VertexReader.fieldDelimiter(delimiter); + return this; + } + + /** + * Enables quoted String parsing. Field delimiters in quoted Strings are ignored. + * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise. + * Leading or tailing whitespaces are not allowed. + * + * @param quoteCharacter The character which is used as quoting character. + * @return The Graph Csv reader instance itself, to allow for fluent function chaining. + */ + + public GraphCsvReader parseQuotedStrings(char quoteCharacter) { + this.EdgeReader.parseQuotedStrings(quoteCharacter); + this.VertexReader.parseQuotedStrings(quoteCharacter); + return this; + } + + /** + * Configures the string that starts comments. + * By default comments will be treated as invalid lines. + * This function only recognizes comments which start at the beginning of the line! + * + * @param commentPrefix The string that starts the comments. + * @return The Graph csv reader instance itself, to allow for fluent function chaining. + */ + + + public GraphCsvReader ignoreComments(String commentPrefix) { + this.EdgeReader.ignoreComments(commentPrefix); + this.VertexReader.ignoreComments(commentPrefix); + return this; + } + + + /** + * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The + * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean + * array. The parser will skip over all fields where the boolean value at the corresponding position + * in the array is {@code false}. The result contains the fields where the corresponding position in + * the boolean array is {@code true}. + * The number of fields in the result is consequently equal to the number of times that {@code true} + * occurs in the fields array. + * + * @param vertexFields The array of flags that describes which fields are to be included from the CSV file for vertices. + * @return The CSV reader instance itself, to allow for fluent function chaining. + */ + public GraphCsvReader includeVertexFields(boolean ... vertexFields) { + this.VertexReader.includeFields(vertexFields); + return this; + } + + /** + * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The + * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean + * array. The parser will skip over all fields where the boolean value at the corresponding position + * in the array is {@code false}. The result contains the fields where the corresponding position in + * the boolean array is {@code true}. + * The number of fields in the result is consequently equal to the number of times that {@code true} + * occurs in the fields array. + * + * @param edgeFields The array of flags that describes which fields are to be included from the CSV file for edges. + * @return The CSV reader instance itself, to allow for fluent function chaining. + */ + + public GraphCsvReader includeEdgeFields(boolean ... edgeFields) { + this.EdgeReader.includeFields(edgeFields); + return this; + } + + /** + * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The + * positions in the string (read from position 0 to its length) define whether the field at + * the corresponding position in the CSV schema should be included. + * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string + * The parser will skip over all fields where the character at the corresponding position + * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value + * {@code false}). The result contains the fields where the corresponding position in + * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}). + * + * @param mask The string mask defining which fields to include and which to skip. + * @return The Graph Csv reader instance itself, to allow for fluent function chaining. + */ + + + public GraphCsvReader includeVertexFields(String mask) { + this.VertexReader.includeFields(mask); + return this; + } + + /** + * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The + * positions in the string (read from position 0 to its length) define whether the field at + * the corresponding position in the CSV schema should be included. + * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string + * The parser will skip over all fields where the character at the corresponding position + * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value + * {@code false}). The result contains the fields where the corresponding position in + * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}). + * + * @param mask The string mask defining which fields to include and which to skip. + * @return The Graph Csv reader instance itself, to allow for fluent function chaining. + */ + + + public GraphCsvReader includeEdgeFields(String mask) { + this.VertexReader.includeFields(mask); + return this; + } + + /** + * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The + * bits in the value (read from least significant to most significant) define whether the field at + * the corresponding position in the CSV schema should be included. + * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant + * non-zero bit. + * The parser will skip over all fields where the character at the corresponding bit is zero, and + * include the fields where the corresponding bit is one. + *

+ * Examples: + *

    + *
  • A mask of {@code 0x7} would include the first three fields.
  • + *
  • A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields + * two and three, skip fields four and five, and include field six.
  • + *
+ * + * @param mask The bit mask defining which fields to include and which to skip. + * @return The Graph CSV reader instance itself, to allow for fluent function chaining. + */ + + + public GraphCsvReader includeVertexFields(long mask) { + this.VertexReader.includeFields(mask); + return this; + } + + /** + * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The + * bits in the value (read from least significant to most significant) define whether the field at + * the corresponding position in the CSV schema should be included. + * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant + * non-zero bit. + * The parser will skip over all fields where the character at the corresponding bit is zero, and + * include the fields where the corresponding bit is one. + *

+ * Examples: + *

    + *
  • A mask of {@code 0x7} would include the first three fields.
  • + *
  • A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields + * two and three, skip fields four and five, and include field six.
  • + *
+ * + * @param mask The bit mask defining which fields to include and which to skip. + * @return The Graph CSV reader instance itself, to allow for fluent function chaining. + */ + + + public GraphCsvReader includeEdgeFields(long mask) { + this.VertexReader.includeFields(mask); + return this; + } + + + + /** + * Sets the CSV readers for the Edges and Vertices files to ignore the first line. This is useful for files that contain a header line. + * + * @return The Graph CSV reader instance itself, to allow for fluent function chaining. + */ + public GraphCsvReader ignoreFirstLine() { + this.EdgeReader.ignoreFirstLine(); + this.VertexReader.ignoreFirstLine(); + return this; + } + + /** + * Sets the CSV readers for the Edges and Vertices files to ignore any invalid lines. + * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise. + * + * @return The CSV reader instance itself, to allow for fluent function chaining. + */ + public GraphCsvReader ignoreInvalidLines(){ + this.EdgeReader.ignoreInvalidLines(); + this.VertexReader.ignoreInvalidLines(); + return this; + } + + + + +} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java index 22a51510e511f..77b68a8ed391f 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java @@ -18,12 +18,12 @@ package org.apache.flink.graph.test.operations; -import java.util.LinkedList; -import java.util.List; - +import com.google.common.base.Charsets; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -32,9 +32,17 @@ import org.apache.flink.graph.validation.InvalidVertexIdsValidator; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.NullValue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; @RunWith(Parameterized.class) public class GraphCreationITCase extends MultipleProgramsTestBase { @@ -43,8 +51,21 @@ public GraphCreationITCase(TestExecutionMode mode){ super(mode); } + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } - private String expectedResult; + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } @Test public void testCreateWithoutVertexValues() throws Exception { @@ -54,16 +75,13 @@ public void testCreateWithoutVertexValues() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> data = graph.getVertices(); - List> result= data.collect(); - + graph.getVertices().writeAsCsv(resultPath); + env.execute(); expectedResult = "1,(null)\n" + - "2,(null)\n" + - "3,(null)\n" + - "4,(null)\n" + - "5,(null)\n"; - - compareResultAsTuples(result, expectedResult); + "2,(null)\n" + + "3,(null)\n" + + "4,(null)\n" + + "5,(null)\n"; } @Test @@ -75,16 +93,13 @@ public void testCreateWithMapper() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), new AssignIdAsValueMapper(), env); - DataSet> data = graph.getVertices(); - List> result= data.collect(); - + graph.getVertices().writeAsCsv(resultPath); + env.execute(); expectedResult = "1,1\n" + - "2,2\n" + - "3,3\n" + - "4,4\n" + - "5,5\n"; - - compareResultAsTuples(result, expectedResult); + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n"; } @Test @@ -96,16 +111,86 @@ public void testCreateWithCustomVertexValue() throws Exception { Graph, Long> graph = Graph.fromDataSet( TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env); - DataSet>> data = graph.getVertices(); - List>> result= data.collect(); - + graph.getVertices().writeAsCsv(resultPath); + env.execute(); expectedResult = "1,(2.0,0)\n" + "2,(4.0,1)\n" + "3,(6.0,2)\n" + "4,(8.0,3)\n" + "5,(10.0,4)\n"; - - compareResultAsTuples(result, expectedResult); + } + + @Test + public void testCreateWithCsvFile() throws Exception { + /* + * Test with two Csv files one with Vertex Data and one with Edges data + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final String fileContent = "1,1\n"+ + "2,2\n"+ + "3,3\n"; + final FileInputSplit split = createTempFile(fileContent); + final String fileContent2 = "1,2,ot\n"+ + "3,2,tt\n"+ + "3,1,to\n"; + final FileInputSplit split2 = createTempFile(fileContent2); + Graph graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env). + types(Long.class,Long.class,String.class); + graph.getTriplets().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,1,2,ot\n" + + "3,2,3,2,tt\n" + + "3,1,3,1,to\n"; + } + + @Test + public void testCreateWithOnlyEdgesCsvFile() throws Exception { + /* + * Test with one Csv file one with Edges data. Also tests the configuration method ignoreFistLineEdges() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final String fileContent2 = "header\n1,2,ot\n"+ + "3,2,tt\n"+ + "3,1,to\n"; + final FileInputSplit split2 = createTempFile(fileContent2); + Graph graph= Graph.fromCsvReader(split2.getPath().toString(), env).ignoreFirstLineEdges() + .ignoreCommentsVertices("hi").typesVertexValueNull(Long.class, String.class); + graph.getTriplets().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,(null),(null),ot\n" + + "3,2,(null),(null),tt\n" + + "3,1,(null),(null),to\n"; + } + + @Test + public void testCreateCsvFileDelimiterConfiguration() throws Exception { + /* + * Test with an Edge and Vertex csv file. Tests the configuration methods FieldDelimiterEdges and + * FieldDelimiterVertices + * Also tests the configuration methods LineDelimiterEdges and LineDelimiterVertices + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final String fileContent = "header\n1;1\n"+ + "2;2\n"+ + "3;3\n"; + final FileInputSplit split = createTempFile(fileContent); + final String fileContent2 = "header|1:2:ot|"+ + "3:2:tt|"+ + "3:1:to|"; + final FileInputSplit split2 = createTempFile(fileContent2); + Graph graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env). + ignoreFirstLineEdges().ignoreFirstLineVertices(). + fieldDelimiterEdges(":").fieldDelimiterVertices(";"). + lineDelimiterEdges("|"). + types(Long.class, Long.class, String.class); + graph.getTriplets().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,1,2,ot\n" + + "3,2,3,2,tt\n" + + "3,1,3,1,to\n"; + + } @Test @@ -118,16 +203,12 @@ public void testValidate() throws Exception { DataSet> edges = TestGraphUtils.getLongLongEdgeData(env); Graph graph = Graph.fromDataSet(vertices, edges, env); - Boolean valid = graph.validate(new InvalidVertexIdsValidator()); - - //env.fromElements(result).writeAsText(resultPath); - - String res= valid.toString();//env.fromElements(valid); - List result= new LinkedList(); - result.add(res); - expectedResult = "true"; - - compareResultAsText(result, expectedResult); + Boolean result = graph.validate(new InvalidVertexIdsValidator()); + + env.fromElements(result).writeAsText(resultPath); + env.execute(); + + expectedResult = "true\n"; } @Test @@ -140,15 +221,11 @@ public void testValidateWithInvalidIds() throws Exception { DataSet> edges = TestGraphUtils.getLongLongEdgeData(env); Graph graph = Graph.fromDataSet(vertices, edges, env); - Boolean valid = graph.validate(new InvalidVertexIdsValidator()); - - String res= valid.toString();//env.fromElements(valid); - List result= new LinkedList(); - result.add(res); + Boolean result = graph.validate(new InvalidVertexIdsValidator()); + env.fromElements(result).writeAsText(resultPath); + env.execute(); expectedResult = "false\n"; - - compareResultAsText(result, expectedResult); } @SuppressWarnings("serial") @@ -160,7 +237,7 @@ public Long map(Long vertexId) { @SuppressWarnings("serial") private static final class AssignCustomVertexValueMapper implements - MapFunction> { + MapFunction> { DummyCustomParameterizedType dummyValue = new DummyCustomParameterizedType(); @@ -171,4 +248,18 @@ public DummyCustomParameterizedType map(Long vertexId) { return dummyValue; } } -} \ No newline at end of file + + private FileInputSplit createTempFile(String content) throws IOException { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + + OutputStreamWriter wrt = new OutputStreamWriter( + new FileOutputStream(tempFile), Charsets.UTF_8 + ); + wrt.write(content); + wrt.close(); + + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), + new String[] {"localhost"}); + } +} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java index 20cbca567c65e..fb29914a7b151 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java @@ -18,20 +18,27 @@ package org.apache.flink.graph.test.operations; -import java.util.List; - +import com.google.common.base.Charsets; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; @RunWith(Parameterized.class) public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase { @@ -40,8 +47,21 @@ public GraphCreationWithMapperITCase(TestExecutionMode mode){ super(mode); } - private String expectedResult; + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } @Test public void testWithDoubleValueMapper() throws Exception { @@ -52,16 +72,13 @@ public void testWithDoubleValueMapper() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), new AssignDoubleValueMapper(), env); - DataSet> data = graph.getVertices(); - List> result= data.collect(); - + graph.getVertices().writeAsCsv(resultPath); + env.execute(); expectedResult = "1,0.1\n" + "2,0.1\n" + "3,0.1\n" + "4,0.1\n" + "5,0.1\n"; - - compareResultAsTuples(result, expectedResult); } @Test @@ -73,16 +90,13 @@ public void testWithTuple2ValueMapper() throws Exception { Graph, Long> graph = Graph.fromDataSet( TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env); - DataSet>> data = graph.getVertices(); - List>> result= data.collect(); - + graph.getVertices().writeAsCsv(resultPath); + env.execute(); expectedResult = "1,(2,42)\n" + "2,(4,42)\n" + "3,(6,42)\n" + "4,(8,42)\n" + "5,(10,42)\n"; - - compareResultAsTuples(result, expectedResult); } @Test @@ -91,20 +105,17 @@ public void testWithConstantValueMapper() throws Exception { * Test create() with edge dataset with String key type * and a mapper that assigns a double constant as value */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env), - new AssignDoubleConstantMapper(), env); - - DataSet> data = graph.getVertices(); - List> result= data.collect(); - - expectedResult = "1,0.1\n" + - "2,0.1\n" + - "3,0.1\n" + - "4,0.1\n" + - "5,0.1\n"; - - compareResultAsTuples(result, expectedResult); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env), + new AssignDoubleConstantMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,0.1\n" + + "2,0.1\n" + + "3,0.1\n" + + "4,0.1\n" + + "5,0.1\n"; } @Test @@ -116,16 +127,31 @@ public void testWithDCustomValueMapper() throws Exception { Graph graph = Graph.fromDataSet( TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env); - DataSet> data = graph.getVertices(); - List> result= data.collect(); - + graph.getVertices().writeAsCsv(resultPath); + env.execute(); expectedResult = "1,(F,0)\n" + "2,(F,1)\n" + "3,(F,2)\n" + "4,(F,3)\n" + "5,(F,4)\n"; - - compareResultAsTuples(result, expectedResult); + } + + @Test + public void testCsvWithConstantValueMapper() throws Exception { + /* + *Test fromCsvReader with edge path and a mapper that assigns a Double constant as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final String fileContent = "1,2,ot\n"+ + "3,2,tt\n"+ + "3,1,to\n"; + final FileInputSplit split = createTempFile(fileContent); + Graph graph = Graph.fromCsvReader(split.getPath().toString(),new AssignDoubleValueMapper(),env).types(Long.class,Double.class,String.class); + graph.getTriplets().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,0.1,0.1,ot\n"+ + "3,2,0.1,0.1,tt\n"+ + "3,1,0.1,0.1,to\n"; } @SuppressWarnings("serial") @@ -155,4 +181,17 @@ public DummyCustomType map(Long vertexId) { return new DummyCustomType(vertexId.intValue()-1, false); } } + + private FileInputSplit createTempFile(String content) throws IOException { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + + OutputStreamWriter wrt = new OutputStreamWriter( + new FileOutputStream(tempFile), Charsets.UTF_8 + ); + wrt.write(content); + wrt.close(); + + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); + } } From 0094100f0af225fe2791d205ede236a901dcff7b Mon Sep 17 00:00:00 2001 From: Shivani Date: Thu, 18 Jun 2015 15:54:16 +0200 Subject: [PATCH 2/7] [FLINK-1520]Read edges and vertices from CSV files --- docs/libs/gelly_guide.md | 2 +- .../java/org/apache/flink/graph/Graph.java | 24 +-- .../apache/flink/graph/GraphCsvReader.java | 190 ++++++++++++------ .../test/operations/GraphCreationITCase.java | 18 ++ 4 files changed, 161 insertions(+), 73 deletions(-) diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 73e9b566b1d4b..8257d91b62122 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -104,7 +104,7 @@ DataSet> edgeTuples = env.readCsvFile("path/to/ed Graph graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env); {% endhighlight %} -* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the first CSV file to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the second CSV file will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A types() method is called on the GraphCsvReader object returned by fromCsvReader() to inform the CsvReader of the types of the fields : +* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the CSV file containing edges data to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the optional CSV file containing vertices will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A types() method is called on the GraphCsvReader object returned by fromCsvReader() to inform the CsvReader of the types of the fields : {% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 2ea22b803dbe8..fb763f3a8f69c 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -288,15 +288,14 @@ public static Graph fromTupleDataSet(DataSet{ private final Path path1,path2; private final ExecutionEnvironment executionContext; - - private Path edgePath; - private Path vertexPath; protected CsvReader EdgeReader; protected CsvReader VertexReader; - protected MapFunction mapper; + protected MapFunction mapper; //-------------------------------------------------------------------------------------------------------------------- @@ -73,7 +63,7 @@ public GraphCsvReader(Path path2, ExecutionEnvironment context) this.executionContext=context; } - public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) { this.path1=null; this.path2 = path2; @@ -95,7 +85,7 @@ public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) } - public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) { this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); @@ -124,17 +114,17 @@ public CsvReader getVertexReader() * @param type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data. */ - public Graph types(Class type0, Class type1, Class type2) + public Graph types(Class type0, Class type1, Class type2) { - DataSet> edges = this.EdgeReader.types(type0,type0,type2); + DataSet> edges = this.EdgeReader.types(type0, type0, type2); if(path1!=null) { - DataSet> vertices = this.VertexReader.types(type0,type1); - return Graph.fromTupleDataSet(vertices,edges,executionContext); + DataSet> vertices = this.VertexReader.types(type0, type1); + return Graph.fromTupleDataSet(vertices, edges, executionContext); } else { - return Graph.fromTupleDataSet(edges,mapper,executionContext); + return Graph.fromTupleDataSet(edges, this.mapper, executionContext); } @@ -148,58 +138,74 @@ public Graph types(Class type0, Class type1, Class * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. */ - public Graph types(Class type0, Class type1) + public Graph types(Class type0, Class type1) { - DataSet> edges = this.EdgeReader.types(type0,type0,type1); - return Graph.fromTupleDataSet(edges,executionContext); + DataSet> edges = this.EdgeReader.types(type0, type0, type1); + return Graph.fromTupleDataSet(edges, executionContext); } /** - *Configures the Delimiter that separates rows for the CSV readers used to read the edges and vertices + *Configures the Delimiter that separates rows for the CSV reader used to read the edges * ({@code '\n'}) is used by default. * *@param delimiter The delimiter that separates the rows. * @return The GraphCsv reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader lineDelimiter(String delimiter) + public GraphCsvReader lineDelimiterEdges(String delimiter) { this.EdgeReader.lineDelimiter(delimiter); - this.VertexReader.lineDelimiter(delimiter); return this; } /** - *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices + *Configures the Delimiter that separates rows for the CSV reader used to read the vertices + * ({@code '\n'}) is used by default. + * + *@param delimiter The delimiter that separates the rows. + * @return The GraphCsv reader instance itself, to allow for fluent function chaining. + */ + public GraphCsvReader lineDelimiterVertices(String delimiter) + { + if(this.VertexReader !=null) + { + this.VertexReader.lineDelimiter(delimiter); + } + return this; + } + + + /** + *Configures the Delimiter that separates fields in a row for the CSV reader used to read the vertices * ({@code ','}) is used by default. * * @param delimiter The delimiter that separates the fields in a row. * @return The GraphCsv reader instance itself, to allow for fluent function chaining. */ - @Deprecated - public GraphCsvReader fieldDelimiter(char delimiter) + public GraphCsvReader fieldDelimiterVertices(String delimiter) { - this.EdgeReader.fieldDelimiter(delimiter); - this.VertexReader.fieldDelimiter(delimiter); + if(this.VertexReader !=null) + { + this.VertexReader.fieldDelimiter(delimiter); + } return this; } - /** - *Configures the Delimiter that separates fields in a row for the CSV readers used to read the edges and vertices + *Configures the Delimiter that separates fields in a row for the CSV reader used to read the edges * ({@code ','}) is used by default. * * @param delimiter The delimiter that separates the fields in a row. * @return The GraphCsv reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader fieldDelimiter(String delimiter) + public GraphCsvReader fieldDelimiterEdges(String delimiter) { this.EdgeReader.fieldDelimiter(delimiter); - this.VertexReader.fieldDelimiter(delimiter); return this; } + /** - * Enables quoted String parsing. Field delimiters in quoted Strings are ignored. + * Enables quoted String parsing for Edge Csv Reader. Field delimiters in quoted Strings are ignored. * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise. * Leading or tailing whitespaces are not allowed. * @@ -207,14 +213,49 @@ public GraphCsvReader fieldDelimiter(String delimiter) * @return The Graph Csv reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader parseQuotedStrings(char quoteCharacter) { + public GraphCsvReader parseQuotedStringsEdges(char quoteCharacter) { this.EdgeReader.parseQuotedStrings(quoteCharacter); - this.VertexReader.parseQuotedStrings(quoteCharacter); return this; } /** - * Configures the string that starts comments. + * Enables quoted String parsing for Vertex Csv Reader. Field delimiters in quoted Strings are ignored. + * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise. + * Leading or tailing whitespaces are not allowed. + * + * @param quoteCharacter The character which is used as quoting character. + * @return The Graph Csv reader instance itself, to allow for fluent function chaining. + */ + + public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) { + if(this.VertexReader !=null) + { + this.VertexReader.parseQuotedStrings(quoteCharacter); + } + return this; + } + + /** + * Configures the string that starts comments for the Vertex Csv Reader. + * By default comments will be treated as invalid lines. + * This function only recognizes comments which start at the beginning of the line! + * + * @param commentPrefix The string that starts the comments. + * @return The Graph csv reader instance itself, to allow for fluent function chaining. + */ + + + public GraphCsvReader ignoreCommentsVertices(String commentPrefix) { + if(this.VertexReader !=null) + { + this.VertexReader.ignoreComments(commentPrefix); + } + return this; + } + + + /** + * Configures the string that starts comments for the Edge Csv Reader. * By default comments will be treated as invalid lines. * This function only recognizes comments which start at the beginning of the line! * @@ -223,9 +264,8 @@ public GraphCsvReader parseQuotedStrings(char quoteCharacter) { */ - public GraphCsvReader ignoreComments(String commentPrefix) { + public GraphCsvReader ignoreCommentsEdges(String commentPrefix) { this.EdgeReader.ignoreComments(commentPrefix); - this.VertexReader.ignoreComments(commentPrefix); return this; } @@ -242,8 +282,11 @@ public GraphCsvReader ignoreComments(String commentPrefix) { * @param vertexFields The array of flags that describes which fields are to be included from the CSV file for vertices. * @return The CSV reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader includeVertexFields(boolean ... vertexFields) { - this.VertexReader.includeFields(vertexFields); + public GraphCsvReader includeFieldsVertices(boolean ... vertexFields) { + if(this.VertexReader !=null) + { + this.VertexReader.includeFields(vertexFields); + } return this; } @@ -260,7 +303,7 @@ public GraphCsvReader includeVertexFields(boolean ... vertexFields) { * @return The CSV reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader includeEdgeFields(boolean ... edgeFields) { + public GraphCsvReader includeFieldsEdges(boolean ... edgeFields) { this.EdgeReader.includeFields(edgeFields); return this; } @@ -280,8 +323,11 @@ public GraphCsvReader includeEdgeFields(boolean ... edgeFields) { */ - public GraphCsvReader includeVertexFields(String mask) { - this.VertexReader.includeFields(mask); + public GraphCsvReader includeFieldsVertices(String mask) { + if(this.VertexReader !=null) + { + this.VertexReader.includeFields(mask); + } return this; } @@ -300,8 +346,8 @@ public GraphCsvReader includeVertexFields(String mask) { */ - public GraphCsvReader includeEdgeFields(String mask) { - this.VertexReader.includeFields(mask); + public GraphCsvReader includeFieldsEdges(String mask) { + this.EdgeReader.includeFields(mask); return this; } @@ -326,8 +372,11 @@ public GraphCsvReader includeEdgeFields(String mask) { */ - public GraphCsvReader includeVertexFields(long mask) { - this.VertexReader.includeFields(mask); + public GraphCsvReader includeFieldsVertices(long mask) { + if(this.VertexReader !=null) + { + this.VertexReader.includeFields(mask); + } return this; } @@ -352,33 +401,58 @@ public GraphCsvReader includeVertexFields(long mask) { */ - public GraphCsvReader includeEdgeFields(long mask) { - this.VertexReader.includeFields(mask); + public GraphCsvReader includeFieldsEdges(long mask) { + this.EdgeReader.includeFields(mask); + return this; + } + + /** + * Sets the CSV reader for the Edges file to ignore the first line. This is useful for files that contain a header line. + * + * @return The Graph CSV reader instance itself, to allow for fluent function chaining. + */ + public GraphCsvReader ignoreFirstLineEdges() { + this.EdgeReader.ignoreFirstLine(); return this; } /** - * Sets the CSV readers for the Edges and Vertices files to ignore the first line. This is useful for files that contain a header line. + * Sets the CSV reader for the Vertices file to ignore the first line. This is useful for files that contain a header line. * * @return The Graph CSV reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader ignoreFirstLine() { - this.EdgeReader.ignoreFirstLine(); - this.VertexReader.ignoreFirstLine(); + public GraphCsvReader ignoreFirstLineVertices() { + if(this.VertexReader !=null) + { + this.VertexReader.ignoreFirstLine(); + } return this; } /** - * Sets the CSV readers for the Edges and Vertices files to ignore any invalid lines. + * Sets the CSV reader for the Edges file to ignore any invalid lines. * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise. * * @return The CSV reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader ignoreInvalidLines(){ + public GraphCsvReader ignoreInvalidLinesEdges() { this.EdgeReader.ignoreInvalidLines(); - this.VertexReader.ignoreInvalidLines(); + return this; + } + + /** + * Sets the CSV reader Vertices file to ignore any invalid lines. + * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise. + * + * @return The CSV reader instance itself, to allow for fluent function chaining. + */ + public GraphCsvReader ignoreInvalidLinesVertices() { + if(this.VertexReader !=null) + { + this.VertexReader.ignoreInvalidLines(); + } return this; } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java index 77b68a8ed391f..6289f8e3c85a4 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java @@ -193,6 +193,24 @@ public void testCreateCsvFileDelimiterConfiguration() throws Exception { } + @Test + public void testCreateWithOnlyEdgesCsvFile() throws Exception { + /* + * Test with one Csv file one with Edges data. Also tests the configuration method ignoreFistLineEdges() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final String fileContent2 = "header\n1,2,ot\n"+ + "3,2,tt\n"+ + "3,1,to\n"; + final FileInputSplit split2 = createTempFile(fileContent2); + Graph graph= Graph.fromCsvReader(split2.getPath().toString(), env).ignoreFirstLineEdges().ignoreCommentsVertices("hi").types(Long.class, String.class); + graph.getTriplets().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,(null),(null),ot\n" + + "3,2,(null),(null),tt\n" + + "3,1,(null),(null),to\n"; + } + @Test public void testValidate() throws Exception { /* From d4a19d578d3008505347d51fb505c30157e805bf Mon Sep 17 00:00:00 2001 From: Shivani Date: Wed, 24 Jun 2015 17:27:35 +0200 Subject: [PATCH 3/7] [FLINK-1520][gelly]Updated Examples --- .../java/org/apache/flink/graph/Graph.java | 24 +++--- .../apache/flink/graph/GraphCsvReader.java | 70 +++++++++++++---- .../graph/example/CommunityDetection.java | 49 +++++++----- .../graph/example/ConnectedComponents.java | 51 +++++++------ .../example/GSASingleSourceShortestPaths.java | 18 ++--- .../flink/graph/example/GraphMetrics.java | 26 +++---- .../flink/graph/example/IncrementalSSSP.java | 75 +++++++------------ .../flink/graph/example/LabelPropagation.java | 38 +++++----- .../example/SingleSourceShortestPaths.java | 40 +++++----- .../test/operations/GraphCreationITCase.java | 18 ----- 10 files changed, 205 insertions(+), 204 deletions(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index fb763f3a8f69c..2ea22b803dbe8 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -288,14 +288,15 @@ public static Graph fromTupleDataSet(DataSet types(Class type0, Class type1, Class type2) + public Graph types(Class type0, Class type1, Class type2) { + /* If both Vertex value and Edge values are present */ + DataSet> edges = this.EdgeReader.types(type0, type0, type2); + if(path1!=null) + { + DataSet> vertices = this.VertexReader.types(type0, type1); + return Graph.fromTupleDataSet(vertices, edges, executionContext); + } + return Graph.fromTupleDataSet(edges, this.mapper, executionContext); + } + + /** + * Specifies the types for the Graph fields and returns a Graph with those field types + *NullValue for vertices + * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. + * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. + * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. + */ + public Graph typesVertexValueNull(Class type0, Class type1) { - DataSet> edges = this.EdgeReader.types(type0, type0, type2); + DataSet> edges = this.EdgeReader.types(type0, type0, type1); + return Graph.fromTupleDataSet(edges, executionContext); + } + + /** + * Specifies the types for the Graph fields and returns a Graph with those field types and a NullValue for + * EdgeValue + * + * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. + * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. + * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. + */ + public Graph typesEdgeValueNull(Class type0, Class type1) + { + DataSet> edges = this.EdgeReader.types(type0, type0) + .map(new MapFunction, Tuple3>() { + @Override + public Tuple3 map(Tuple2 tuple2) throws Exception { + return new Tuple3(tuple2.f0, tuple2.f1, NullValue.getInstance()); + } + }); + if(path1!=null) { DataSet> vertices = this.VertexReader.types(type0, type1); @@ -126,24 +165,29 @@ public Graph types(Class type0, Class type1, Class type2) { return Graph.fromTupleDataSet(edges, this.mapper, executionContext); } - - } + /** - * Specifies the types for the Graph fields and returns a Graph with those field types - * - * This method is overloaded for the case in which Vertices don't have a value + * Specifies the types for the Graph fields and returns a Graph with those field types and a NullValue for + * EdgeValue and VertexValue * * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. - * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. */ - public Graph types(Class type0, Class type1) + public Graph types(Class type0) { - DataSet> edges = this.EdgeReader.types(type0, type0, type1); - return Graph.fromTupleDataSet(edges, executionContext); + DataSet> edges = this.EdgeReader.types(type0, type0). + map(new MapFunction, Tuple3>() { + @Override + public Tuple3 map(Tuple2 tuple2) throws Exception { + return new Tuple3(tuple2.f0, tuple2.f1, NullValue.getInstance()); + } + }); + return Graph.fromTupleDataSet(edges, executionContext); } + + /** *Configures the Delimiter that separates rows for the CSV reader used to read the edges * ({@code '\n'}) is used by default. @@ -455,8 +499,4 @@ public GraphCsvReader ignoreInvalidLinesVertices() { } return this; } - - - - } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java index e44e5bd7920dc..8240ed7950eb4 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java @@ -27,8 +27,6 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.CommunityDetectionData; import org.apache.flink.graph.library.CommunityDetectionAlgorithm; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; - /** * This example shows how to use the {@link org.apache.flink.graph.library.CommunityDetectionAlgorithm} * library method: @@ -61,15 +59,8 @@ public static void main(String [] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // set up the graph - DataSet> edges = getEdgesDataSet(env); - Graph graph = Graph.fromDataSet(edges, - new MapFunction() { - - public Long map(Long label) { - return label; - } - }, env); + Graph graph = CommunityDetection.getGraph(env); // the result is in the form of , where the communityId is the label // which the vertex converged to DataSet> communityVertices = @@ -126,17 +117,35 @@ private static boolean parseParameters(String [] args) { return true; } - private static DataSet> getEdgesDataSet(ExecutionEnvironment env) { - if(fileOutput) { - return env.readCsvFile(edgeInputPath) - .ignoreComments("#") - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap()); - } else { - return CommunityDetectionData.getDefaultEdgeDataSet(env); + private static Graph getGraph(ExecutionEnvironment env) + { + Graph graph; + if(!fileOutput) + { + DataSet> edges = CommunityDetectionData.getDefaultEdgeDataSet(env); + graph = Graph.fromDataSet(edges, + new MapFunction() { + + public Long map(Long label) { + return label; + } + }, env); } + else + { + graph = Graph.fromCsvReader(edgeInputPath,new MapFunction() { + public Long map(Long label) { + return label; + } + }, env).ignoreCommentsEdges("#") + .fieldDelimiterEdges("\t") + .lineDelimiterEdges("\n") + .types(Long.class, Long.class, Double.class); + + } + return graph; } + + } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java index 3443a553a5e0d..2ced1ff8fdb72 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -60,14 +59,9 @@ public static void main(String [] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> edges = getEdgesDataSet(env); + //util method getGraph is used + Graph graph = ConnectedComponents.getGraph(env); - Graph graph = Graph.fromDataSet(edges, new MapFunction() { - @Override - public Long map(Long value) throws Exception { - return value; - } - }, env); DataSet> verticesWithMinIds = graph .run(new ConnectedComponentsAlgorithm(maxIterations)).getVertices(); @@ -122,23 +116,32 @@ private static boolean parseParameters(String [] args) { return true; } - @SuppressWarnings("serial") - private static DataSet> getEdgesDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgeInputPath) - .ignoreComments("#") - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction, Edge>() { - @Override - public Edge map(Tuple2 value) throws Exception { - return new Edge(value.f0, value.f1, NullValue.getInstance()); + private static Graph getGraph(ExecutionEnvironment env) + { + Graph graph; + if(!fileOutput) + { + DataSet> edges = ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env); + graph = Graph.fromDataSet(edges, + new MapFunction() { + + public Long map(Long label) { + return label; } - }); - } else { - return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env); + }, env); + } + else + { + graph = Graph.fromCsvReader(edgeInputPath,new MapFunction() { + public Long map(Long label) { + return label; + } + }, env).ignoreCommentsEdges("#") + .fieldDelimiterEdges("\t") + .lineDelimiterEdges("\n") + .typesEdgeValueNull(Long.class, Long.class); + } + return graph; } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java index b01aa234d9135..5d57723592ce6 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; @@ -30,7 +29,6 @@ import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.Neighbor; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; /** * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration @@ -49,9 +47,7 @@ public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> edges = getEdgeDataSet(env); - - Graph graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env); + Graph graph = GSASingleSourceShortestPaths.getGraph(env); // Execute the GSA iteration Graph result = graph @@ -161,15 +157,13 @@ private static boolean parseParameters(String[] args) { return true; } - private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { + private static Graph getGraph(ExecutionEnvironment env) { if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap()); + return Graph.fromCsvReader(edgesInputPath, new InitVertices(srcVertexId), env).fieldDelimiterEdges("\t") + .lineDelimiterEdges("\n") + .types(Long.class, Long.class, Double.class); } else { - return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); + return Graph.fromDataSet(SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), new InitVertices(srcVertexId), env); } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java index c6a776dfbdfaf..958ab1ec0b8a7 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java @@ -24,7 +24,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.example.utils.ExampleUtils; import org.apache.flink.types.NullValue; @@ -56,7 +55,7 @@ public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); /** create the graph **/ - Graph graph = Graph.fromDataSet(getEdgesDataSet(env), env); + Graph graph = GraphMetrics.getGraph(env); /** get the number of vertices **/ long numVertices = graph.numberOfVertices(); @@ -150,20 +149,15 @@ private static boolean parseParameters(String[] args) { } @SuppressWarnings("serial") - private static DataSet> getEdgesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter("\n").fieldDelimiter("\t") - .types(Long.class, Long.class).map( - new MapFunction, Edge>() { - - public Edge map(Tuple2 value) { - return new Edge(value.f0, value.f1, - NullValue.getInstance()); - } - }); - } else { - return ExampleUtils.getRandomEdges(env, NUM_VERTICES); + private static Graph getGraph(ExecutionEnvironment env) { + if(fileOutput) { + return Graph.fromCsvReader(edgesInputPath, env).lineDelimiterEdges("\n").fieldDelimiterEdges("\t") + .types(Long.class); + + } + else + { + return Graph.fromDataSet(ExampleUtils.getRandomEdges(env, NUM_VERTICES), env); } } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java index 8d94cbc165065..d9cfc47f8f87b 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java @@ -31,8 +31,6 @@ import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexCentricConfiguration; import org.apache.flink.graph.spargel.VertexUpdateFunction; -import org.apache.flink.graph.utils.Tuple2ToVertexMap; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; /** * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated @@ -77,18 +75,12 @@ public static void main(String [] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> vertices = getVerticesDataSet(env); - - DataSet> edges = getEdgesDataSet(env); - - DataSet> edgesInSSSP = getEdgesinSSSPDataSet(env); - Edge edgeToBeRemoved = getEdgeToBeRemoved(); - Graph graph = Graph.fromDataSet(vertices, edges, env); + Graph graph = IncrementalSSSP.getGraph(env); // Assumption: all minimum weight paths are kept - Graph ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env); + Graph ssspGraph = IncrementalSSSP.getSSSPGraph(env); // remove the edge graph.removeEdge(edgeToBeRemoved); @@ -96,7 +88,7 @@ public static void main(String [] args) throws Exception { // configure the iteration VertexCentricConfiguration parameters = new VertexCentricConfiguration(); - if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) { + if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) { parameters.setDirection(EdgeDirection.IN); parameters.setOptDegrees(true); @@ -110,24 +102,20 @@ public static void main(String [] args) throws Exception { // Emit results if(fileOutput) { resultedVertices.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("Incremental SSSP Example"); } else { resultedVertices.print(); } + env.execute("Incremental SSSP Example"); } else { // print the vertices if(fileOutput) { - vertices.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("Incremental SSSP Example"); + graph.getVertices().writeAsCsv(outputPath, "\n", ","); } else { - vertices.print(); + graph.getVertices().print(); } + env.execute("Incremental SSSP Example"); } } @@ -251,48 +239,39 @@ private static boolean parseParameters(String[] args) { return true; } - private static DataSet> getVerticesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(verticesInputPath) - .lineDelimiter("\n") - .types(Long.class, Double.class) - .map(new Tuple2ToVertexMap()); - } else { - System.err.println("Usage: IncrementalSSSP " + - " " + - " "); - return IncrementalSSSPData.getDefaultVertexDataSet(env); - } - } - private static DataSet> getEdgesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap()); - } else { + private static Graph getGraph(ExecutionEnvironment env) + { + if(fileOutput) { + return Graph.fromCsvReader(verticesInputPath, edgesInputPath, env).lineDelimiterEdges("\n") + .types(Long.class, Double.class, Double.class); + } + else + { System.err.println("Usage: IncrementalSSSP " + " " + " "); - return IncrementalSSSPData.getDefaultEdgeDataSet(env); + return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgeDataSet(env), env); } } - private static DataSet> getEdgesinSSSPDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInSSSPInputPath) - .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap()); - } else { + private static Graph getSSSPGraph(ExecutionEnvironment env) + { + if(fileOutput) { + return Graph.fromCsvReader(verticesInputPath, edgesInSSSPInputPath, env).lineDelimiterEdges("\n") + .types(Long.class, Double.class, Double.class); + } + else + { System.err.println("Usage: IncrementalSSSP " + " " + " "); - return IncrementalSSSPData.getDefaultEdgesInSSSP(env); + return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgesInSSSP(env), env); } } + + private static Edge getEdgeToBeRemoved() { if (fileOutput) { return new Edge(srcEdgeToBeRemoved, trgEdgeToBeRemoved, valEdgeToBeRemoved); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java index bee5af378f474..82e1bfa6af920 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -60,10 +59,8 @@ public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Set up the graph - DataSet> vertices = getVertexDataSet(env); - DataSet> edges = getEdgeDataSet(env); - Graph graph = Graph.fromDataSet(vertices, edges, env); + Graph graph = LabelPropagation.getGraph(env); // Set up the program DataSet> verticesWithCommunity = graph.run( @@ -134,26 +131,25 @@ public Vertex map(Long l) throws Exception { } @SuppressWarnings("serial") - private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { - - if (fileOutput) { - return env.readCsvFile(edgeInputPath) - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction, Edge>() { - @Override - public Edge map(Tuple2 value) throws Exception { - return new Edge(value.f0, value.f1, NullValue.getInstance()); - } - }); + private static Graph getGraph(ExecutionEnvironment env) + { + if(fileOutput) { + return Graph.fromCsvReader(vertexInputPath, edgeInputPath, env).fieldDelimiterEdges("\t") + .fieldDelimiterVertices("\t") + .lineDelimiterEdges("\n") + .lineDelimiterVertices("\n").typesEdgeValueNull(Long.class, Long.class); } - - return env.generateSequence(1, numVertices).flatMap( + return Graph.fromDataSet(env. + generateSequence(1, numVertices).map(new MapFunction>() { + public Vertex map(Long l) throws Exception { + return new Vertex(l, l); + } + }), + env.generateSequence(1, numVertices).flatMap( new FlatMapFunction>() { @Override public void flatMap(Long key, - Collector> out) { + Collector> out) { int numOutEdges = (int) (Math.random() * (numVertices / 2)); for (int i = 0; i < numOutEdges; i++) { long target = (long) (Math.random() * numVertices) + 1; @@ -161,7 +157,7 @@ public void flatMap(Long key, NullValue.getInstance())); } } - }); + }), env); } @Override diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java index 9d7d2c20974e6..ca5535edb7b5e 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java @@ -22,12 +22,10 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; import org.apache.flink.graph.library.SingleSourceShortestPathsAlgorithm; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; /** * This example implements the Single Source Shortest Paths algorithm, @@ -51,15 +49,7 @@ public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> edges = getEdgesDataSet(env); - - Graph graph = Graph.fromDataSet(edges, - new MapFunction() { - - public Double map(Long value) { - return Double.MAX_VALUE; - } - }, env); + Graph graph = SingleSourceShortestPaths.getGraph(env); DataSet> singleSourceShortestPaths = graph .run(new SingleSourceShortestPathsAlgorithm(srcVertexId, maxIterations)) @@ -121,15 +111,25 @@ private static boolean parseParameters(String[] args) { return true; } - private static DataSet> getEdgesDataSet(ExecutionEnvironment env) { + private static Graph getGraph(ExecutionEnvironment env) { if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter("\n") - .fieldDelimiter("\t") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap()); - } else { - return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); + return Graph.fromCsvReader(edgesInputPath, new MapFunction() { + @Override + public Double map(Long value) throws Exception { + return Double.MAX_VALUE; + } + }, env).lineDelimiterEdges("\n") + .fieldDelimiterEdges("\t") + .types(Long.class, Double.class, Double.class); + + } + else { + return Graph.fromDataSet(SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), new MapFunction() { + @Override + public Double map(Long value) throws Exception { + return Double.MAX_VALUE; + } + }, env); } } -} +} \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java index 6289f8e3c85a4..77b68a8ed391f 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java @@ -193,24 +193,6 @@ public void testCreateCsvFileDelimiterConfiguration() throws Exception { } - @Test - public void testCreateWithOnlyEdgesCsvFile() throws Exception { - /* - * Test with one Csv file one with Edges data. Also tests the configuration method ignoreFistLineEdges() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - final String fileContent2 = "header\n1,2,ot\n"+ - "3,2,tt\n"+ - "3,1,to\n"; - final FileInputSplit split2 = createTempFile(fileContent2); - Graph graph= Graph.fromCsvReader(split2.getPath().toString(), env).ignoreFirstLineEdges().ignoreCommentsVertices("hi").types(Long.class, String.class); - graph.getTriplets().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,(null),(null),ot\n" + - "3,2,(null),(null),tt\n" + - "3,1,(null),(null),to\n"; - } - @Test public void testValidate() throws Exception { /* From 08b19894709eadfb342270b957444fdb159f8cc1 Mon Sep 17 00:00:00 2001 From: Shivani Date: Wed, 24 Jun 2015 18:13:50 +0200 Subject: [PATCH 4/7] [FLINK-1520][gelly] Updated Examples --- .../java/org/apache/flink/graph/Graph.java | 24 ++++------ .../apache/flink/graph/GraphCsvReader.java | 47 ++++++++++--------- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 2ea22b803dbe8..fb763f3a8f69c 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -288,15 +288,14 @@ public static Graph fromTupleDataSet(DataSet types(Class type0, Class type1, Class type2) { - /* If both Vertex value and Edge values are present */ - DataSet> edges = this.EdgeReader.types(type0, type0, type2); - if(path1!=null) - { - DataSet> vertices = this.VertexReader.types(type0, type1); - return Graph.fromTupleDataSet(vertices, edges, executionContext); - } - return Graph.fromTupleDataSet(edges, this.mapper, executionContext); - } + public Graph types(Class type0, Class type1, Class type2) + { + DataSet> edges = this.EdgeReader.types(type0, type0, type2); + if(path1!=null) + { + DataSet> vertices = this.VertexReader.types(type0, type1); + return Graph.fromTupleDataSet(vertices, edges, executionContext); + } + else + { + return Graph.fromTupleDataSet(edges, this.mapper, executionContext); + } + + } /** - * Specifies the types for the Graph fields and returns a Graph with those field types - *NullValue for vertices + * Specifies the types for the Graph fields and returns a Graph with those field types in the special case + * where Vertices don't have a value * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. @@ -139,9 +143,8 @@ public Graph typesVertexValueNull(Class type0, Class ty } /** - * Specifies the types for the Graph fields and returns a Graph with those field types and a NullValue for - * EdgeValue - * + * Specifies the types for the Graph fields and returns a Graph with those field types ain the special case + * where Edges don't have a value * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. @@ -168,9 +171,8 @@ public Tuple3 map(Tuple2 tuple2) throws Exception { } /** - * Specifies the types for the Graph fields and returns a Graph with those field types and a NullValue for - * EdgeValue and VertexValue - * + * Specifies the types for the Graph fields and returns a Graph with those field types. + * This method is overloaded for the case in which Vertices and Edges don't have a value * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. */ @@ -183,11 +185,8 @@ public Tuple3 map(Tuple2 tuple2) throws Exception { return new Tuple3(tuple2.f0, tuple2.f1, NullValue.getInstance()); } }); - return Graph.fromTupleDataSet(edges, executionContext); + return Graph.fromTupleDataSet(edges, executionContext); } - - - /** *Configures the Delimiter that separates rows for the CSV reader used to read the edges * ({@code '\n'}) is used by default. @@ -499,4 +498,8 @@ public GraphCsvReader ignoreInvalidLinesVertices() { } return this; } + + + + } From cd823022c094cf01046b0dcd88c30aaa062d4ea1 Mon Sep 17 00:00:00 2001 From: Shivani Date: Wed, 24 Jun 2015 20:35:12 +0200 Subject: [PATCH 5/7] [FLINK-1520][gelly] Removed Lines --- .../src/main/java/org/apache/flink/graph/GraphCsvReader.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java index 9adf66fad6d08..88e5be54d0097 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java @@ -87,10 +87,7 @@ public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) { - this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); - - } public CsvReader getEdgeReader() From 8cae5e977f620feaf9b561255b359f9375a57fad Mon Sep 17 00:00:00 2001 From: Shivani Date: Wed, 1 Jul 2015 11:36:52 +0200 Subject: [PATCH 6/7] [FLINK-1520][gelly] Made corrections in coding style --- .../java/org/apache/flink/graph/Graph.java | 10 +- .../apache/flink/graph/GraphCsvReader.java | 119 +++++++----------- .../graph/example/ConnectedComponents.java | 13 +- .../flink/graph/example/IncrementalSSSP.java | 3 +- 4 files changed, 53 insertions(+), 92 deletions(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index fb763f3a8f69c..496309f05b24f 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -286,18 +286,16 @@ public static Graph fromTupleDataSet(DataSet{ +public class GraphCsvReader { - private final Path path1,path2; + private final Path vertexPath,edgePath; private final ExecutionEnvironment executionContext; protected CsvReader EdgeReader; protected CsvReader VertexReader; @@ -43,60 +43,52 @@ public class GraphCsvReader{ //-------------------------------------------------------------------------------------------------------------------- - public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) - { - this.path1 = path1; - this.path2 = path2; - this.VertexReader = new CsvReader(path1,context); - this.EdgeReader = new CsvReader(path2,context); + public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) { + this.vertexPath = vertexPath; + this.edgePath = edgePath; + this.VertexReader = new CsvReader(vertexPath,context); + this.EdgeReader = new CsvReader(edgePath,context); this.mapper=null; this.executionContext=context; } - public GraphCsvReader(Path path2, ExecutionEnvironment context) - { - this.path1=null; - this.path2 = path2; - this.EdgeReader = new CsvReader(path2,context); + public GraphCsvReader(Path edgePath, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); this.VertexReader = null; this.mapper = null; this.executionContext=context; } - public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) - { - this.path1=null; - this.path2 = path2; - this.EdgeReader = new CsvReader(path2,context); + public GraphCsvReader(Path edgePath,final MapFunction mapper, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); this.VertexReader = null; this.mapper = mapper; this.executionContext=context; } - public GraphCsvReader (String path2,ExecutionEnvironment context) - { - this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + public GraphCsvReader (String edgePath,ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); } - public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) - { - this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); } - public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) - { - this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + public GraphCsvReader (String edgePath, final MapFunction mapper, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context); } - public CsvReader getEdgeReader() - { + public CsvReader getEdgeReader() { return this.EdgeReader; } - public CsvReader getVertexReader() - { + public CsvReader getVertexReader() { return this.VertexReader; } //-------------------------------------------------------------------------------------------------------------------- @@ -111,16 +103,12 @@ public CsvReader getVertexReader() * @param type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data. */ - public Graph types(Class type0, Class type1, Class type2) - { + public Graph types(Class type0, Class type1, Class type2) { DataSet> edges = this.EdgeReader.types(type0, type0, type2); - if(path1!=null) - { + if(vertexPath!=null) { DataSet> vertices = this.VertexReader.types(type0, type1); return Graph.fromTupleDataSet(vertices, edges, executionContext); - } - else - { + } else { return Graph.fromTupleDataSet(edges, this.mapper, executionContext); } @@ -133,8 +121,7 @@ public Graph types(Class type0, Class type1, Class type2) * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. */ - public Graph typesVertexValueNull(Class type0, Class type1) - { + public Graph typesVertexValueNull(Class type0, Class type1) { DataSet> edges = this.EdgeReader.types(type0, type0, type1); return Graph.fromTupleDataSet(edges, executionContext); } @@ -146,8 +133,7 @@ public Graph typesVertexValueNull(Class type0, Class ty * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. */ - public Graph typesEdgeValueNull(Class type0, Class type1) - { + public Graph typesEdgeValueNull(Class type0, Class type1) { DataSet> edges = this.EdgeReader.types(type0, type0) .map(new MapFunction, Tuple3>() { @Override @@ -156,13 +142,10 @@ public Tuple3 map(Tuple2 tuple2) throws Exception { } }); - if(path1!=null) - { + if(vertexPath!=null) { DataSet> vertices = this.VertexReader.types(type0, type1); return Graph.fromTupleDataSet(vertices, edges, executionContext); - } - else - { + } else { return Graph.fromTupleDataSet(edges, this.mapper, executionContext); } } @@ -173,8 +156,7 @@ public Tuple3 map(Tuple2 tuple2) throws Exception { * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. */ - public Graph types(Class type0) - { + public Graph types(Class type0) { DataSet> edges = this.EdgeReader.types(type0, type0). map(new MapFunction, Tuple3>() { @Override @@ -191,8 +173,7 @@ public Tuple3 map(Tuple2 tuple2) throws Exception { *@param delimiter The delimiter that separates the rows. * @return The GraphCsv reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader lineDelimiterEdges(String delimiter) - { + public GraphCsvReader lineDelimiterEdges(String delimiter) { this.EdgeReader.lineDelimiter(delimiter); return this; } @@ -204,10 +185,8 @@ public GraphCsvReader lineDelimiterEdges(String delimiter) *@param delimiter The delimiter that separates the rows. * @return The GraphCsv reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader lineDelimiterVertices(String delimiter) - { - if(this.VertexReader !=null) - { + public GraphCsvReader lineDelimiterVertices(String delimiter) { + if(this.VertexReader !=null) { this.VertexReader.lineDelimiter(delimiter); } return this; @@ -221,10 +200,8 @@ public GraphCsvReader lineDelimiterVertices(String delimiter) * @param delimiter The delimiter that separates the fields in a row. * @return The GraphCsv reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader fieldDelimiterVertices(String delimiter) - { - if(this.VertexReader !=null) - { + public GraphCsvReader fieldDelimiterVertices(String delimiter) { + if(this.VertexReader !=null) { this.VertexReader.fieldDelimiter(delimiter); } return this; @@ -237,8 +214,7 @@ public GraphCsvReader fieldDelimiterVertices(String delimiter) * @param delimiter The delimiter that separates the fields in a row. * @return The GraphCsv reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader fieldDelimiterEdges(String delimiter) - { + public GraphCsvReader fieldDelimiterEdges(String delimiter) { this.EdgeReader.fieldDelimiter(delimiter); return this; } @@ -268,8 +244,7 @@ public GraphCsvReader parseQuotedStringsEdges(char quoteCharacter) { */ public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) { - if(this.VertexReader !=null) - { + if(this.VertexReader !=null) { this.VertexReader.parseQuotedStrings(quoteCharacter); } return this; @@ -286,8 +261,7 @@ public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) { public GraphCsvReader ignoreCommentsVertices(String commentPrefix) { - if(this.VertexReader !=null) - { + if(this.VertexReader !=null) { this.VertexReader.ignoreComments(commentPrefix); } return this; @@ -323,8 +297,7 @@ public GraphCsvReader ignoreCommentsEdges(String commentPrefix) { * @return The CSV reader instance itself, to allow for fluent function chaining. */ public GraphCsvReader includeFieldsVertices(boolean ... vertexFields) { - if(this.VertexReader !=null) - { + if(this.VertexReader !=null) { this.VertexReader.includeFields(vertexFields); } return this; @@ -364,8 +337,7 @@ public GraphCsvReader includeFieldsEdges(boolean ... edgeFields) { public GraphCsvReader includeFieldsVertices(String mask) { - if(this.VertexReader !=null) - { + if(this.VertexReader !=null) { this.VertexReader.includeFields(mask); } return this; @@ -413,8 +385,7 @@ public GraphCsvReader includeFieldsEdges(String mask) { public GraphCsvReader includeFieldsVertices(long mask) { - if(this.VertexReader !=null) - { + if(this.VertexReader !=null) { this.VertexReader.includeFields(mask); } return this; @@ -464,8 +435,7 @@ public GraphCsvReader ignoreFirstLineEdges() { * @return The Graph CSV reader instance itself, to allow for fluent function chaining. */ public GraphCsvReader ignoreFirstLineVertices() { - if(this.VertexReader !=null) - { + if(this.VertexReader !=null) { this.VertexReader.ignoreFirstLine(); } return this; @@ -489,8 +459,7 @@ public GraphCsvReader ignoreInvalidLinesEdges() { * @return The CSV reader instance itself, to allow for fluent function chaining. */ public GraphCsvReader ignoreInvalidLinesVertices() { - if(this.VertexReader !=null) - { + if(this.VertexReader !=null) { this.VertexReader.ignoreInvalidLines(); } return this; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java index 2ced1ff8fdb72..afb289173d9df 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData; @@ -116,13 +115,10 @@ private static boolean parseParameters(String [] args) { return true; } - private static Graph getGraph(ExecutionEnvironment env) - { + private static Graph getGraph(ExecutionEnvironment env) { Graph graph; - if(!fileOutput) - { - DataSet> edges = ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env); - graph = Graph.fromDataSet(edges, + if(!fileOutput) { + graph = Graph.fromDataSet(ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env), new MapFunction() { public Long map(Long label) { @@ -130,8 +126,7 @@ public Long map(Long label) { } }, env); } - else - { + else { graph = Graph.fromCsvReader(edgeInputPath,new MapFunction() { public Long map(Long label) { return label; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java index d9cfc47f8f87b..42cc49c1de1c9 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java @@ -261,8 +261,7 @@ private static Graph getSSSPGraph(ExecutionEnvironment env return Graph.fromCsvReader(verticesInputPath, edgesInSSSPInputPath, env).lineDelimiterEdges("\n") .types(Long.class, Double.class, Double.class); } - else - { + else { System.err.println("Usage: IncrementalSSSP " + " " + " "); From 58ff13c827a55a623382d8f2d82b38106052bd42 Mon Sep 17 00:00:00 2001 From: Shivani Date: Mon, 6 Jul 2015 15:41:59 +0200 Subject: [PATCH 7/7] [FLINK-1520][gelly]Changed the methods for specifying types. Created a new file for tests. Made appropriate changes in gelly_guide.md --- docs/libs/gelly_guide.md | 12 +- .../java/org/apache/flink/graph/Graph.java | 8 +- .../apache/flink/graph/GraphCsvReader.java | 153 ++++++------- .../graph/example/CommunityDetection.java | 16 +- .../graph/example/ConnectedComponents.java | 9 +- .../example/GSASingleSourceShortestPaths.java | 8 +- .../flink/graph/example/GraphMetrics.java | 13 +- .../flink/graph/example/IncrementalSSSP.java | 24 +- .../flink/graph/example/LabelPropagation.java | 30 +-- .../example/SingleSourceShortestPaths.java | 8 +- .../test/operations/GraphCreationITCase.java | 183 ++++----------- .../GraphCreationWithCsvITCase.java | 214 ++++++++++++++++++ .../GraphCreationWithMapperITCase.java | 107 +++------ 13 files changed, 421 insertions(+), 364 deletions(-) create mode 100644 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 8257d91b62122..167d593f16031 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -104,12 +104,20 @@ DataSet> edgeTuples = env.readCsvFile("path/to/ed Graph graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env); {% endhighlight %} -* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the CSV file containing edges data to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the optional CSV file containing vertices will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A types() method is called on the GraphCsvReader object returned by fromCsvReader() to inform the CsvReader of the types of the fields : +* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the CSV file containing edges data to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the optional CSV file containing vertices will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A `typesEdges()` method is called on the GraphCsvReader object returned by `fromCsvReader()` to inform the CsvReader of the types of the fields for Edges. If Edge doesn't have a value only type of Vertex Key is passed. `typesEdges()` method returns a GraphCsvReader on calling calling `typesVertices()` or `typesVerticesNullEdge()` returns the instance of Graph: {% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -Graph graph = Graph.fromCsvReader("path/to/vertex/input","path/to/edge/input",env).types(String.class, Long.class, Double.class); +Graph graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env).typesEdges(String.class).typesVerticesNullEdge(String.class, Long.class); +{% endhighlight %} + +If Vertices don't have a value, overloaded `typesVerticesNullEdge()` or `typesVertices()` Method should be used. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +Graph graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env).typesEdges(String.class, Long.class).typesVerticesNullEdge(String.class); {% endhighlight %} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 496309f05b24f..bb59143e91881 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -289,8 +289,8 @@ public static Graph fromTupleDataSet(DataSet { private final Path vertexPath,edgePath; @@ -40,9 +41,11 @@ public class GraphCsvReader { protected CsvReader EdgeReader; protected CsvReader VertexReader; protected MapFunction mapper; + protected Class vertexKey; + protected Class vertexValue; + protected Class edgeValue; //-------------------------------------------------------------------------------------------------------------------- - public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) { this.vertexPath = vertexPath; this.edgePath = edgePath; @@ -76,7 +79,8 @@ public GraphCsvReader (String edgePath,ExecutionEnvironment context) { } public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) { - this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")), + new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); } @@ -84,88 +88,100 @@ public GraphCsvReader (String edgePath, final MapFunction mapper, Executi this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context); } - public CsvReader getEdgeReader() { - return this.EdgeReader; + //-------------------------------------------------------------------------------------------------------------------- + /** + * Specifies the types for the edges fields and returns this instance of GraphCsvReader + * + * @param vertexKey The type of Vetex ID in the Graph. + * @param edgeValue The type of Edge Value in the returned Graph. + * @return The {@link org.apache.flink.graph.GraphCsvReader} + */ + public GraphCsvReader typesEdges(Class vertexKey, Class edgeValue) { + this.vertexKey = vertexKey; + this.edgeValue = edgeValue; + return this; } - public CsvReader getVertexReader() { - return this.VertexReader; + /** + * Specifies the types for the edges fields and returns this instance of GraphCsvReader + * This method is overloaded for the case when the type of EdgeValue is NullValue + * @param vertexKey The type of Vetex ID in the Graph. + * @return The {@link org.apache.flink.graph.GraphCsvReader} + */ + public GraphCsvReader typesEdges(Class vertexKey) { + this.vertexKey = vertexKey; + this.edgeValue = null; + return this; } - //-------------------------------------------------------------------------------------------------------------------- /** - * Specifies the types for the Graph fields and returns a Graph with those field types - * - * This method is overloaded for the case in which Vertices don't have a value - * - * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data, the type of CSV field 0 for the CSV reader used for reading Vertex data and the type of Vetex ID in the returned Graph. - * @param type1 The type of CSV field 1 for the CSV reader used for reading Vertex data and the type of Vertex Value in the returned Graph. - * @param type2 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. - * @return The {@link org.apache.flink.graph.Graph} with Edges and Vertices extracted from the parsed CSV data. + * Specifies the types for the vertices fields and returns an instance of Graph + * @param vertexKey The type of Vertex ID in the Graph. + * @param vertexValue The type of Vertex Value in the Graph. + * @return The {@link org.apache.flink.graph.Graph} */ - public Graph types(Class type0, Class type1, Class type2) { - DataSet> edges = this.EdgeReader.types(type0, type0, type2); - if(vertexPath!=null) { - DataSet> vertices = this.VertexReader.types(type0, type1); - return Graph.fromTupleDataSet(vertices, edges, executionContext); + public Graph typesVertices(Class vertexKey, Class vertexValue) { + DataSet> edges = this.EdgeReader.types(this.vertexKey,this.vertexKey, this.edgeValue); + if(mapper == null && this.VertexReader != null) { + DataSet> vertices = this.VertexReader.types(vertexKey, vertexValue); + return Graph.fromTupleDataSet(vertices, edges, executionContext); + } else if(this.mapper != null) { + return Graph.fromTupleDataSet(edges, this.mapper, executionContext); } else { - return Graph.fromTupleDataSet(edges, this.mapper, executionContext); + return null; } - - } + /** - * Specifies the types for the Graph fields and returns a Graph with those field types in the special case - * where Vertices don't have a value - * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. - * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. - * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. + * Specifies the types for the vertices fields and returns and instance of Graph + * This method is overloaded for the case when vertices don't have a value + * @param vertexKey The type of Vertex ID in the Graph. + * @return The {@link org.apache.flink.graph.Graph} */ - public Graph typesVertexValueNull(Class type0, Class type1) { - DataSet> edges = this.EdgeReader.types(type0, type0, type1); + public Graph typesVertices(Class vertexKey) { + DataSet> edges = this.EdgeReader.types(this.vertexKey, this.vertexKey, this.edgeValue); return Graph.fromTupleDataSet(edges, executionContext); } /** - * Specifies the types for the Graph fields and returns a Graph with those field types ain the special case - * where Edges don't have a value - * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. - * @param type1 The type of CSV field 2 for the CSV reader used for reading Edge data and the type of Edge Value in the returned Graph. - * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. + * Specifies the types for the vertices fields and returns an instance of Graph when Edges don't have a value + * @param vertexKey The type of Vertex ID in the Graph. + * @param vertexValue The type of Vertex Value in the Graph. + * @return The {@link org.apache.flink.graph.Graph} */ - public Graph typesEdgeValueNull(Class type0, Class type1) { - DataSet> edges = this.EdgeReader.types(type0, type0) + public Graph typesVerticesNullEdge(Class vertexKey, Class vertexValue) { + DataSet> edges= this.EdgeReader.types(this.vertexKey, this.vertexKey) .map(new MapFunction, Tuple3>() { - @Override - public Tuple3 map(Tuple2 tuple2) throws Exception { - return new Tuple3(tuple2.f0, tuple2.f1, NullValue.getInstance()); + public Tuple3 map(Tuple2 value) { + return new Tuple3(value.f0, value.f1, NullValue.getInstance()); } }); - - if(vertexPath!=null) { - DataSet> vertices = this.VertexReader.types(type0, type1); - return Graph.fromTupleDataSet(vertices, edges, executionContext); + if(this.mapper == null && this.VertexReader != null) { + DataSet> vertices = this.VertexReader.types(vertexKey, vertexValue); + return Graph.fromTupleDataSet(vertices, edges, executionContext); + } else if (this.mapper != null) { + return Graph.fromTupleDataSet(edges, mapper, executionContext); } else { - return Graph.fromTupleDataSet(edges, this.mapper, executionContext); + return null; } } /** - * Specifies the types for the Graph fields and returns a Graph with those field types. - * This method is overloaded for the case in which Vertices and Edges don't have a value - * @param type0 The type of CSV field 0 and 1 for the CSV reader used for reading Edge data and the type of Vetex ID in the returned Graph. - * @return The {@link org.apache.flink.graph.Graph} with Edge extracted from the parsed CSV data and Vertices mapped from Edges with null Value. + * Specifies the types for the vertices fields and returns an instance of Graph when Edges don't have a value + * This method is overloaded for the case when vertices don't have a value + * @param vertexKey The type of Vertex ID in the Graph. + * @return The {@link org.apache.flink.graph.Graph} */ - public Graph types(Class type0) { - DataSet> edges = this.EdgeReader.types(type0, type0). - map(new MapFunction, Tuple3>() { - @Override - public Tuple3 map(Tuple2 tuple2) throws Exception { - return new Tuple3(tuple2.f0, tuple2.f1, NullValue.getInstance()); + public Graph typesVerticesNullEdge(Class vertexKey) { + DataSet> edges= this.EdgeReader.types(this.vertexKey, this.vertexKey) + .map(new MapFunction, Tuple3>() { + public Tuple3 map(Tuple2 value) { + return new Tuple3(value.f0, value.f1, NullValue.getInstance()); } }); return Graph.fromTupleDataSet(edges, executionContext); } + /** *Configures the Delimiter that separates rows for the CSV reader used to read the edges * ({@code '\n'}) is used by default. @@ -192,7 +208,6 @@ public GraphCsvReader lineDelimiterVertices(String delimiter) { return this; } - /** *Configures the Delimiter that separates fields in a row for the CSV reader used to read the vertices * ({@code ','}) is used by default. @@ -219,7 +234,6 @@ public GraphCsvReader fieldDelimiterEdges(String delimiter) { return this; } - /** * Enables quoted String parsing for Edge Csv Reader. Field delimiters in quoted Strings are ignored. * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise. @@ -228,7 +242,6 @@ public GraphCsvReader fieldDelimiterEdges(String delimiter) { * @param quoteCharacter The character which is used as quoting character. * @return The Graph Csv reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader parseQuotedStringsEdges(char quoteCharacter) { this.EdgeReader.parseQuotedStrings(quoteCharacter); return this; @@ -242,7 +255,6 @@ public GraphCsvReader parseQuotedStringsEdges(char quoteCharacter) { * @param quoteCharacter The character which is used as quoting character. * @return The Graph Csv reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) { if(this.VertexReader !=null) { this.VertexReader.parseQuotedStrings(quoteCharacter); @@ -258,8 +270,6 @@ public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) { * @param commentPrefix The string that starts the comments. * @return The Graph csv reader instance itself, to allow for fluent function chaining. */ - - public GraphCsvReader ignoreCommentsVertices(String commentPrefix) { if(this.VertexReader !=null) { this.VertexReader.ignoreComments(commentPrefix); @@ -267,7 +277,6 @@ public GraphCsvReader ignoreCommentsVertices(String commentPrefix) { return this; } - /** * Configures the string that starts comments for the Edge Csv Reader. * By default comments will be treated as invalid lines. @@ -276,14 +285,11 @@ public GraphCsvReader ignoreCommentsVertices(String commentPrefix) { * @param commentPrefix The string that starts the comments. * @return The Graph csv reader instance itself, to allow for fluent function chaining. */ - - public GraphCsvReader ignoreCommentsEdges(String commentPrefix) { this.EdgeReader.ignoreComments(commentPrefix); return this; } - /** * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean @@ -315,7 +321,6 @@ public GraphCsvReader includeFieldsVertices(boolean ... vertexFields) { * @param edgeFields The array of flags that describes which fields are to be included from the CSV file for edges. * @return The CSV reader instance itself, to allow for fluent function chaining. */ - public GraphCsvReader includeFieldsEdges(boolean ... edgeFields) { this.EdgeReader.includeFields(edgeFields); return this; @@ -334,8 +339,6 @@ public GraphCsvReader includeFieldsEdges(boolean ... edgeFields) { * @param mask The string mask defining which fields to include and which to skip. * @return The Graph Csv reader instance itself, to allow for fluent function chaining. */ - - public GraphCsvReader includeFieldsVertices(String mask) { if(this.VertexReader !=null) { this.VertexReader.includeFields(mask); @@ -356,8 +359,6 @@ public GraphCsvReader includeFieldsVertices(String mask) { * @param mask The string mask defining which fields to include and which to skip. * @return The Graph Csv reader instance itself, to allow for fluent function chaining. */ - - public GraphCsvReader includeFieldsEdges(String mask) { this.EdgeReader.includeFields(mask); return this; @@ -382,8 +383,6 @@ public GraphCsvReader includeFieldsEdges(String mask) { * @param mask The bit mask defining which fields to include and which to skip. * @return The Graph CSV reader instance itself, to allow for fluent function chaining. */ - - public GraphCsvReader includeFieldsVertices(long mask) { if(this.VertexReader !=null) { this.VertexReader.includeFields(mask); @@ -410,8 +409,6 @@ public GraphCsvReader includeFieldsVertices(long mask) { * @param mask The bit mask defining which fields to include and which to skip. * @return The Graph CSV reader instance itself, to allow for fluent function chaining. */ - - public GraphCsvReader includeFieldsEdges(long mask) { this.EdgeReader.includeFields(mask); return this; @@ -427,8 +424,6 @@ public GraphCsvReader ignoreFirstLineEdges() { return this; } - - /** * Sets the CSV reader for the Vertices file to ignore the first line. This is useful for files that contain a header line. * @@ -464,8 +459,4 @@ public GraphCsvReader ignoreInvalidLinesVertices() { } return this; } - - - - -} +} \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java index 8240ed7950eb4..aa57afd8ca709 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java @@ -117,12 +117,10 @@ private static boolean parseParameters(String [] args) { return true; } - - private static Graph getGraph(ExecutionEnvironment env) - { + @SuppressWarnings("unchecked") + private static Graph getGraph(ExecutionEnvironment env) { Graph graph; - if(!fileOutput) - { + if(!fileOutput) { DataSet> edges = CommunityDetectionData.getDefaultEdgeDataSet(env); graph = Graph.fromDataSet(edges, new MapFunction() { @@ -131,9 +129,7 @@ public Long map(Long label) { return label; } }, env); - } - else - { + } else { graph = Graph.fromCsvReader(edgeInputPath,new MapFunction() { public Long map(Long label) { return label; @@ -141,8 +137,8 @@ public Long map(Long label) { }, env).ignoreCommentsEdges("#") .fieldDelimiterEdges("\t") .lineDelimiterEdges("\n") - .types(Long.class, Long.class, Double.class); - + .typesEdges(Long.class, Double.class) + .typesVertices(Long.class, Long.class); } return graph; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java index afb289173d9df..864c0c4d2c081 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java @@ -61,7 +61,6 @@ public static void main(String [] args) throws Exception { //util method getGraph is used Graph graph = ConnectedComponents.getGraph(env); - DataSet> verticesWithMinIds = graph .run(new ConnectedComponentsAlgorithm(maxIterations)).getVertices(); @@ -114,7 +113,7 @@ private static boolean parseParameters(String [] args) { return true; } - +@SuppressWarnings("unchecked") private static Graph getGraph(ExecutionEnvironment env) { Graph graph; if(!fileOutput) { @@ -125,8 +124,7 @@ public Long map(Long label) { return label; } }, env); - } - else { + } else { graph = Graph.fromCsvReader(edgeInputPath,new MapFunction() { public Long map(Long label) { return label; @@ -134,7 +132,8 @@ public Long map(Long label) { }, env).ignoreCommentsEdges("#") .fieldDelimiterEdges("\t") .lineDelimiterEdges("\n") - .typesEdgeValueNull(Long.class, Long.class); + .typesEdges(Long.class) + .typesVerticesNullEdge(Long.class, Long.class); } return graph; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java index 5d57723592ce6..2b541305da01c 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java @@ -98,7 +98,7 @@ private static final class CalculateDistances extends GatherFunction neighbor) { return neighbor.getNeighborValue() + neighbor.getEdgeValue(); } - }; + } @SuppressWarnings("serial") private static final class ChooseMinDistance extends SumFunction { @@ -106,7 +106,7 @@ private static final class ChooseMinDistance extends SumFunction { @@ -157,11 +157,13 @@ private static boolean parseParameters(String[] args) { return true; } + @SuppressWarnings("unchecked") private static Graph getGraph(ExecutionEnvironment env) { if (fileOutput) { return Graph.fromCsvReader(edgesInputPath, new InitVertices(srcVertexId), env).fieldDelimiterEdges("\t") .lineDelimiterEdges("\n") - .types(Long.class, Long.class, Double.class); + .typesEdges(Long.class, Double.class) + .typesVertices(Long.class, Double.class); } else { return Graph.fromDataSet(SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), new InitVertices(srcVertexId), env); } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java index 958ab1ec0b8a7..e28f28e129ee4 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java @@ -148,15 +148,16 @@ private static boolean parseParameters(String[] args) { return true; } - @SuppressWarnings("serial") + @SuppressWarnings({"serial", "unchecked"}) private static Graph getGraph(ExecutionEnvironment env) { if(fileOutput) { - return Graph.fromCsvReader(edgesInputPath, env).lineDelimiterEdges("\n").fieldDelimiterEdges("\t") - .types(Long.class); + return Graph.fromCsvReader(edgesInputPath, env) + .lineDelimiterEdges("\n") + .fieldDelimiterEdges("\t") + .typesEdges(Long.class) + .typesVerticesNullEdge(Long.class); - } - else - { + } else { return Graph.fromDataSet(ExampleUtils.getRandomEdges(env, NUM_VERTICES), env); } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java index 42cc49c1de1c9..cf0d9f31cd751 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java @@ -239,15 +239,13 @@ private static boolean parseParameters(String[] args) { return true; } - - private static Graph getGraph(ExecutionEnvironment env) - { + @SuppressWarnings("unchecked") + private static Graph getGraph(ExecutionEnvironment env) { if(fileOutput) { return Graph.fromCsvReader(verticesInputPath, edgesInputPath, env).lineDelimiterEdges("\n") - .types(Long.class, Double.class, Double.class); - } - else - { + .typesEdges(Long.class, Double.class) + .typesVertices(Long.class, Double.class); + } else { System.err.println("Usage: IncrementalSSSP " + " " + " "); @@ -255,13 +253,13 @@ private static Graph getGraph(ExecutionEnvironment env) } } - private static Graph getSSSPGraph(ExecutionEnvironment env) - { + @SuppressWarnings("unchecked") + private static Graph getSSSPGraph(ExecutionEnvironment env) { if(fileOutput) { return Graph.fromCsvReader(verticesInputPath, edgesInSSSPInputPath, env).lineDelimiterEdges("\n") - .types(Long.class, Double.class, Double.class); - } - else { + .typesEdges(Long.class, Double.class) + .typesVertices(Long.class, Double.class); + } else { System.err.println("Usage: IncrementalSSSP " + " " + " "); @@ -269,8 +267,6 @@ private static Graph getSSSPGraph(ExecutionEnvironment env } } - - private static Edge getEdgeToBeRemoved() { if (fileOutput) { return new Edge(srcEdgeToBeRemoved, trgEdgeToBeRemoved, valEdgeToBeRemoved); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java index 82e1bfa6af920..b72ebed1bd1c3 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java @@ -27,7 +27,6 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.library.LabelPropagationAlgorithm; -import org.apache.flink.graph.utils.Tuple2ToVertexMap; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; @@ -59,7 +58,6 @@ public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Set up the graph - Graph graph = LabelPropagation.getGraph(env); // Set up the program @@ -111,33 +109,15 @@ private static boolean parseParameters(String[] args) { return true; } - @SuppressWarnings("serial") - private static DataSet> getVertexDataSet(ExecutionEnvironment env) { - - if (fileOutput) { - return env.readCsvFile(vertexInputPath) - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new Tuple2ToVertexMap()); - } - - return env.generateSequence(1, numVertices).map( - new MapFunction>() { - public Vertex map(Long l) throws Exception { - return new Vertex(l, l); - } - }); - } - - @SuppressWarnings("serial") - private static Graph getGraph(ExecutionEnvironment env) - { + @SuppressWarnings({"serial" , "unchecked"}) + private static Graph getGraph(ExecutionEnvironment env) { if(fileOutput) { return Graph.fromCsvReader(vertexInputPath, edgeInputPath, env).fieldDelimiterEdges("\t") .fieldDelimiterVertices("\t") .lineDelimiterEdges("\n") - .lineDelimiterVertices("\n").typesEdgeValueNull(Long.class, Long.class); + .lineDelimiterVertices("\n") + .typesEdges(Long.class) + .typesVerticesNullEdge(Long.class, Long.class); } return Graph.fromDataSet(env. generateSequence(1, numVertices).map(new MapFunction>() { diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java index ca5535edb7b5e..83f86e33fd6f1 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java @@ -111,6 +111,7 @@ private static boolean parseParameters(String[] args) { return true; } + @SuppressWarnings("unchecked") private static Graph getGraph(ExecutionEnvironment env) { if (fileOutput) { return Graph.fromCsvReader(edgesInputPath, new MapFunction() { @@ -120,10 +121,9 @@ public Double map(Long value) throws Exception { } }, env).lineDelimiterEdges("\n") .fieldDelimiterEdges("\t") - .types(Long.class, Double.class, Double.class); - - } - else { + .typesEdges(Long.class, Double.class) + .typesVertices(Long.class, Double.class); + } else { return Graph.fromDataSet(SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), new MapFunction() { @Override public Double map(Long value) throws Exception { diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java index 77b68a8ed391f..22a51510e511f 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java @@ -18,12 +18,12 @@ package org.apache.flink.graph.test.operations; -import com.google.common.base.Charsets; +import java.util.LinkedList; +import java.util.List; + import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -32,17 +32,9 @@ import org.apache.flink.graph.validation.InvalidVertexIdsValidator; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; @RunWith(Parameterized.class) public class GraphCreationITCase extends MultipleProgramsTestBase { @@ -51,21 +43,8 @@ public GraphCreationITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } + private String expectedResult; @Test public void testCreateWithoutVertexValues() throws Exception { @@ -75,13 +54,16 @@ public void testCreateWithoutVertexValues() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,(null)\n" + - "2,(null)\n" + - "3,(null)\n" + - "4,(null)\n" + - "5,(null)\n"; + "2,(null)\n" + + "3,(null)\n" + + "4,(null)\n" + + "5,(null)\n"; + + compareResultAsTuples(result, expectedResult); } @Test @@ -93,13 +75,16 @@ public void testCreateWithMapper() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), new AssignIdAsValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,1\n" + - "2,2\n" + - "3,3\n" + - "4,4\n" + - "5,5\n"; + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n"; + + compareResultAsTuples(result, expectedResult); } @Test @@ -111,86 +96,16 @@ public void testCreateWithCustomVertexValue() throws Exception { Graph, Long> graph = Graph.fromDataSet( TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet>> data = graph.getVertices(); + List>> result= data.collect(); + expectedResult = "1,(2.0,0)\n" + "2,(4.0,1)\n" + "3,(6.0,2)\n" + "4,(8.0,3)\n" + "5,(10.0,4)\n"; - } - - @Test - public void testCreateWithCsvFile() throws Exception { - /* - * Test with two Csv files one with Vertex Data and one with Edges data - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - final String fileContent = "1,1\n"+ - "2,2\n"+ - "3,3\n"; - final FileInputSplit split = createTempFile(fileContent); - final String fileContent2 = "1,2,ot\n"+ - "3,2,tt\n"+ - "3,1,to\n"; - final FileInputSplit split2 = createTempFile(fileContent2); - Graph graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env). - types(Long.class,Long.class,String.class); - graph.getTriplets().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,1,2,ot\n" + - "3,2,3,2,tt\n" + - "3,1,3,1,to\n"; - } - - @Test - public void testCreateWithOnlyEdgesCsvFile() throws Exception { - /* - * Test with one Csv file one with Edges data. Also tests the configuration method ignoreFistLineEdges() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - final String fileContent2 = "header\n1,2,ot\n"+ - "3,2,tt\n"+ - "3,1,to\n"; - final FileInputSplit split2 = createTempFile(fileContent2); - Graph graph= Graph.fromCsvReader(split2.getPath().toString(), env).ignoreFirstLineEdges() - .ignoreCommentsVertices("hi").typesVertexValueNull(Long.class, String.class); - graph.getTriplets().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,(null),(null),ot\n" + - "3,2,(null),(null),tt\n" + - "3,1,(null),(null),to\n"; - } - - @Test - public void testCreateCsvFileDelimiterConfiguration() throws Exception { - /* - * Test with an Edge and Vertex csv file. Tests the configuration methods FieldDelimiterEdges and - * FieldDelimiterVertices - * Also tests the configuration methods LineDelimiterEdges and LineDelimiterVertices - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - final String fileContent = "header\n1;1\n"+ - "2;2\n"+ - "3;3\n"; - final FileInputSplit split = createTempFile(fileContent); - final String fileContent2 = "header|1:2:ot|"+ - "3:2:tt|"+ - "3:1:to|"; - final FileInputSplit split2 = createTempFile(fileContent2); - Graph graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env). - ignoreFirstLineEdges().ignoreFirstLineVertices(). - fieldDelimiterEdges(":").fieldDelimiterVertices(";"). - lineDelimiterEdges("|"). - types(Long.class, Long.class, String.class); - graph.getTriplets().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,1,2,ot\n" + - "3,2,3,2,tt\n" + - "3,1,3,1,to\n"; - - + + compareResultAsTuples(result, expectedResult); } @Test @@ -203,12 +118,16 @@ public void testValidate() throws Exception { DataSet> edges = TestGraphUtils.getLongLongEdgeData(env); Graph graph = Graph.fromDataSet(vertices, edges, env); - Boolean result = graph.validate(new InvalidVertexIdsValidator()); - - env.fromElements(result).writeAsText(resultPath); - env.execute(); - - expectedResult = "true\n"; + Boolean valid = graph.validate(new InvalidVertexIdsValidator()); + + //env.fromElements(result).writeAsText(resultPath); + + String res= valid.toString();//env.fromElements(valid); + List result= new LinkedList(); + result.add(res); + expectedResult = "true"; + + compareResultAsText(result, expectedResult); } @Test @@ -221,11 +140,15 @@ public void testValidateWithInvalidIds() throws Exception { DataSet> edges = TestGraphUtils.getLongLongEdgeData(env); Graph graph = Graph.fromDataSet(vertices, edges, env); - Boolean result = graph.validate(new InvalidVertexIdsValidator()); - env.fromElements(result).writeAsText(resultPath); - env.execute(); + Boolean valid = graph.validate(new InvalidVertexIdsValidator()); + + String res= valid.toString();//env.fromElements(valid); + List result= new LinkedList(); + result.add(res); expectedResult = "false\n"; + + compareResultAsText(result, expectedResult); } @SuppressWarnings("serial") @@ -237,7 +160,7 @@ public Long map(Long vertexId) { @SuppressWarnings("serial") private static final class AssignCustomVertexValueMapper implements - MapFunction> { + MapFunction> { DummyCustomParameterizedType dummyValue = new DummyCustomParameterizedType(); @@ -248,18 +171,4 @@ public DummyCustomParameterizedType map(Long vertexId) { return dummyValue; } } - - private FileInputSplit createTempFile(String content) throws IOException { - File tempFile = File.createTempFile("test_contents", "tmp"); - tempFile.deleteOnExit(); - - OutputStreamWriter wrt = new OutputStreamWriter( - new FileOutputStream(tempFile), Charsets.UTF_8 - ); - wrt.write(content); - wrt.close(); - - return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), - new String[] {"localhost"}); - } -} +} \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java new file mode 100644 index 0000000000000..2b78d32793598 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import com.google.common.base.Charsets; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Triplet; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.List; + +@RunWith(Parameterized.class) +public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase { + + public GraphCreationWithCsvITCase(TestExecutionMode mode) { + super(mode); + } + + private String expectedResult; + + @Test + @SuppressWarnings("unchecked") + public void testCreateWithCsvFile() throws Exception { + /* + * Test with two Csv files one with Vertex Data and one with Edges data + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final String fileContent = "1,1\n"+ + "2,2\n"+ + "3,3\n"; + final FileInputSplit split = createTempFile(fileContent); + final String fileContent2 = "1,2,ot\n"+ + "3,2,tt\n"+ + "3,1,to\n"; + final FileInputSplit split2 = createTempFile(fileContent2); + + Graph graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env) + .typesEdges(Long.class, String.class) + .typesVertices(Long.class, Long.class); + + List> result = graph.getTriplets().collect(); + + expectedResult = "1,2,1,2,ot\n" + + "3,2,3,2,tt\n" + + "3,1,3,1,to\n"; + + compareResultAsTuples(result, expectedResult); + } + + @Test + @SuppressWarnings("unchecked") + public void testCsvWithNullEdge() throws Exception { + /* + Test fromCsvReader with edge and vertex path and nullvalue for edge + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final String vertexFileContent = "1,one\n"+ + "2,two\n"+ + "3,three\n"; + final String edgeFileContent = "1,2\n"+ + "3,2\n"+ + "3,1\n"; + final FileInputSplit split = createTempFile(vertexFileContent); + final FileInputSplit edgeSplit = createTempFile(edgeFileContent); + + Graph graph= Graph.fromCsvReader(split.getPath().toString(), edgeSplit.getPath().toString(), + env) + .typesEdges(Long.class) + .typesVerticesNullEdge(Long.class, String.class); + + List> result = graph.getTriplets().collect(); + + expectedResult = "1,2,one,two,(null)\n"+ + "3,2,three,two,(null)\n"+ + "3,1,three,one,(null)\n"; + + compareResultAsTuples(result, expectedResult); + } + + @Test + @SuppressWarnings("unchecked") + public void testCsvWithConstantValueMapper() throws Exception { + /* + *Test fromCsvReader with edge path and a mapper that assigns a Double constant as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final String fileContent = "1,2,ot\n"+ + "3,2,tt\n"+ + "3,1,to\n"; + final FileInputSplit split = createTempFile(fileContent); + + Graph graph = Graph.fromCsvReader(split.getPath().toString(), + new AssignDoubleValueMapper(), env).typesEdges(Long.class, String.class) + .typesVertices(Long.class, Double.class); + List> result = graph.getTriplets().collect(); + //graph.getTriplets().writeAsCsv(resultPath); + expectedResult = "1,2,0.1,0.1,ot\n" + "3,1,0.1,0.1,to\n" + "3,2,0.1,0.1,tt\n"; + compareResultAsTuples(result, expectedResult); + } + + @Test + @SuppressWarnings("unchecked") + public void testCreateWithOnlyEdgesCsvFile() throws Exception { + /* + * Test with one Csv file one with Edges data. Also tests the configuration method ignoreFistLineEdges() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final String fileContent2 = "header\n1,2,ot\n"+ + "3,2,tt\n"+ + "3,1,to\n"; + + final FileInputSplit split2 = createTempFile(fileContent2); + Graph graph= Graph.fromCsvReader(split2.getPath().toString(), env) + .ignoreFirstLineEdges() + .ignoreCommentsVertices("hi") + .typesEdges(Long.class, String.class) + .typesVertices(Long.class); + + List> result = graph.getTriplets().collect(); + expectedResult = "1,2,(null),(null),ot\n" + + "3,2,(null),(null),tt\n" + + "3,1,(null),(null),to\n"; + + compareResultAsTuples(result, expectedResult); + } + + @Test + @SuppressWarnings("unchecked") + public void testCreateCsvFileDelimiterConfiguration() throws Exception { + /* + * Test with an Edge and Vertex csv file. Tests the configuration methods FieldDelimiterEdges and + * FieldDelimiterVertices + * Also tests the configuration methods LineDelimiterEdges and LineDelimiterVertices + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + final String fileContent = "header\n1;1\n"+ + "2;2\n"+ + "3;3\n"; + + final FileInputSplit split = createTempFile(fileContent); + + final String fileContent2 = "header|1:2:ot|"+ + "3:2:tt|"+ + "3:1:to|"; + + final FileInputSplit split2 = createTempFile(fileContent2); + + Graph graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env). + ignoreFirstLineEdges().ignoreFirstLineVertices(). + fieldDelimiterEdges(":").fieldDelimiterVertices(";"). + lineDelimiterEdges("|"). + typesEdges(Long.class, String.class) + .typesVertices(Long.class, Long.class); + + List> result = graph.getTriplets().collect(); + + expectedResult = "1,2,1,2,ot\n" + + "3,2,3,2,tt\n" + + "3,1,3,1,to\n"; + + compareResultAsTuples(result, expectedResult); + + } + + /*----------------------------------------------------------------------------------------------------------------*/ + @SuppressWarnings("serial") + private static final class AssignDoubleValueMapper implements MapFunction { + public Double map(Long value) { + return 0.1d; + } + } + + private FileInputSplit createTempFile(String content) throws IOException { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + + OutputStreamWriter wrt = new OutputStreamWriter( + new FileOutputStream(tempFile), Charsets.UTF_8 + ); + wrt.write(content); + wrt.close(); + + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, + tempFile.length(), new String[] {"localhost"}); + } +} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java index fb29914a7b151..20cbca567c65e 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java @@ -18,27 +18,20 @@ package org.apache.flink.graph.test.operations; -import com.google.common.base.Charsets; +import java.util.List; + import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; @RunWith(Parameterized.class) public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase { @@ -47,21 +40,8 @@ public GraphCreationWithMapperITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } + private String expectedResult; - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } @Test public void testWithDoubleValueMapper() throws Exception { @@ -72,13 +52,16 @@ public void testWithDoubleValueMapper() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), new AssignDoubleValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,0.1\n" + "2,0.1\n" + "3,0.1\n" + "4,0.1\n" + "5,0.1\n"; + + compareResultAsTuples(result, expectedResult); } @Test @@ -90,13 +73,16 @@ public void testWithTuple2ValueMapper() throws Exception { Graph, Long> graph = Graph.fromDataSet( TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet>> data = graph.getVertices(); + List>> result= data.collect(); + expectedResult = "1,(2,42)\n" + "2,(4,42)\n" + "3,(6,42)\n" + "4,(8,42)\n" + "5,(10,42)\n"; + + compareResultAsTuples(result, expectedResult); } @Test @@ -105,17 +91,20 @@ public void testWithConstantValueMapper() throws Exception { * Test create() with edge dataset with String key type * and a mapper that assigns a double constant as value */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env), - new AssignDoubleConstantMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,0.1\n" + - "2,0.1\n" + - "3,0.1\n" + - "4,0.1\n" + - "5,0.1\n"; + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env), + new AssignDoubleConstantMapper(), env); + + DataSet> data = graph.getVertices(); + List> result= data.collect(); + + expectedResult = "1,0.1\n" + + "2,0.1\n" + + "3,0.1\n" + + "4,0.1\n" + + "5,0.1\n"; + + compareResultAsTuples(result, expectedResult); } @Test @@ -127,31 +116,16 @@ public void testWithDCustomValueMapper() throws Exception { Graph graph = Graph.fromDataSet( TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,(F,0)\n" + "2,(F,1)\n" + "3,(F,2)\n" + "4,(F,3)\n" + "5,(F,4)\n"; - } - - @Test - public void testCsvWithConstantValueMapper() throws Exception { - /* - *Test fromCsvReader with edge path and a mapper that assigns a Double constant as value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - final String fileContent = "1,2,ot\n"+ - "3,2,tt\n"+ - "3,1,to\n"; - final FileInputSplit split = createTempFile(fileContent); - Graph graph = Graph.fromCsvReader(split.getPath().toString(),new AssignDoubleValueMapper(),env).types(Long.class,Double.class,String.class); - graph.getTriplets().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,0.1,0.1,ot\n"+ - "3,2,0.1,0.1,tt\n"+ - "3,1,0.1,0.1,to\n"; + + compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") @@ -181,17 +155,4 @@ public DummyCustomType map(Long vertexId) { return new DummyCustomType(vertexId.intValue()-1, false); } } - - private FileInputSplit createTempFile(String content) throws IOException { - File tempFile = File.createTempFile("test_contents", "tmp"); - tempFile.deleteOnExit(); - - OutputStreamWriter wrt = new OutputStreamWriter( - new FileOutputStream(tempFile), Charsets.UTF_8 - ); - wrt.write(content); - wrt.close(); - - return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); - } }