diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index e51453eacee10..11ee7cd62d92f 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -170,7 +170,7 @@ object Graph { /** * Creates a Graph with from a CSV file of vertices and a CSV file of edges * - * @param The Execution Environment. + * @param env Execution Environment. * @param pathEdges The file path containing the edges. * @param readVertices Defines whether the vertices have associated values. * If set to false, the vertex input is ignored and vertices are created from the edges file. @@ -868,7 +868,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * Adds the list of vertices, passed as input, to the graph. * If the vertices already exist in the graph, they will not be added once more. * - * @param verticesToAdd the list of vertices to add + * @param vertices the list of vertices to add * @return the new graph containing the existing and newly added vertices */ def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = { @@ -881,7 +881,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * When adding an edge for a non-existing set of vertices, * the edge is considered invalid and ignored. * - * @param newEdges the data set of edges to be added + * @param edges the data set of edges to be added * @return a new graph containing the existing edges plus the newly added edges. */ def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = { @@ -916,7 +916,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { /** * Removes the given vertex and its edges from the graph. * - * @param vertex the vertex to remove + * @param vertices list of vertices to remove * @return the new graph containing the existing vertices and edges without * the removed vertex and its edges */ @@ -938,7 +938,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { /** * Removes all the edges that match the edges in the given data set from the graph. * - * @param edgesToBeRemoved the list of edges to be removed + * @param edges the list of edges to be removed * @return a new graph where the edges have been removed and in which the vertices remained intact */ def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = { @@ -993,7 +993,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * The {@link ReduceNeighborsFunction} combines a pair of neighbor vertex values * into one new value of the same type. * - * @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex. + * @param reduceEdgesFunction the reduce function to apply to the edges of each vertex. * @param direction the edge direction (in-, out-, all-) * @return a Dataset of Tuple2, with one tuple per vertex. * The first field of the Tuple2 is the vertex ID and the second field diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala index b3da52018e864..75b793eaad061 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.graph.scala.example; +package org.apache.flink.graph.scala.example import org.apache.flink.api.scala._ import org.apache.flink.graph.scala._ @@ -32,7 +32,7 @@ import java.lang.Long * You can find all available library methods in [[org.apache.flink.graph.library]]. * * In particular, this example uses the - * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]] + * [[org.apache.flink.graph.library.GSAConnectedComponents]] * library method to compute the connected components of the input graph. * * The input file is a plain text file and must be formatted as follows: @@ -70,7 +70,7 @@ object ConnectedComponents { } private final class InitVertices extends MapFunction[Long, Long] { - override def map(id: Long) = {id} + override def map(id: Long) = id } // *********************************************************************** @@ -87,19 +87,18 @@ object ConnectedComponents { if(args.length != 3) { System.err.println("Usage ConnectedComponents " + "") - false } fileOutput = true edgesInputPath = args(0) outputPath = args(1) - maxIterations = (2).toInt + maxIterations = 2 } else { System.out.println("Executing ConnectedComponents example with default parameters" + " and built-in default data.") System.out.println(" Provide parameters to read input data from files.") System.out.println(" See the documentation for the correct format of input files.") System.out.println("Usage ConnectedComponents " + - ""); + "") } true } diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala index 2dc272c1c52b0..68435bad95dd8 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala @@ -16,20 +16,15 @@ * limitations under the License. */ -package org.apache.flink.graph.scala.example; +package org.apache.flink.graph.scala.example +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.scala._ -import org.apache.flink.graph.scala._ -import org.apache.flink.types.NullValue import org.apache.flink.graph.Edge -import org.apache.flink.api.common.functions.MapFunction -import scala.collection.JavaConversions._ -import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData -import org.apache.flink.graph.gsa.GatherFunction -import org.apache.flink.graph.gsa.Neighbor -import org.apache.flink.graph.gsa.SumFunction -import org.apache.flink.graph.gsa.ApplyFunction +import org.apache.flink.graph.gsa.{ApplyFunction, GatherFunction, Neighbor, SumFunction} +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap /** * This example shows how to use Gelly's gather-sum-apply iterations. @@ -121,20 +116,19 @@ object GSASingleSourceShortestPaths { if(args.length != 4) { System.err.println("Usage: SingleSourceShortestPaths " + " ") - false } fileOutput = true srcVertexId = args(0).toLong edgesInputPath = args(1) outputPath = args(2) - maxIterations = (3).toInt + maxIterations = 3 } else { System.out.println("Executing Single Source Shortest Paths example " + "with default parameters and built-in default data.") System.out.println(" Provide parameters to read input data from files.") System.out.println(" See the documentation for the correct format of input files.") System.out.println("Usage: SingleSourceShortestPaths " + - " "); + " ") } true } diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala index 4eed824287ac1..1c3fcdd6d9662 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala @@ -53,13 +53,13 @@ object GraphMetrics { val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env) /** get the number of vertices **/ - val numVertices = graph.numberOfVertices; + val numVertices = graph.numberOfVertices /** get the number of edges **/ - val numEdges = graph.numberOfEdges; + val numEdges = graph.numberOfEdges /** compute the average node degree **/ - val verticesWithDegrees = graph.getDegrees; + val verticesWithDegrees = graph.getDegrees val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble) /** find the vertex with the maximum in-degree **/ @@ -114,7 +114,7 @@ object GraphMetrics { (key: Long, out: Collector[Edge[Long, NullValue]]) => { val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt for ( i <- 0 to numOutEdges ) { - var target: Long = ((Math.random() * numVertices) + 1).toLong + val target: Long = ((Math.random() * numVertices) + 1).toLong new Edge[Long, NullValue](key, target, NullValue.getInstance()) } }) diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala index 65a8e7f133cc8..7fc23c469f602 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala @@ -16,11 +16,10 @@ * limitations under the License. */ -package org.apache.flink.graph.scala.example; +package org.apache.flink.graph.scala.example import org.apache.flink.api.scala._ import org.apache.flink.graph.scala._ -import org.apache.flink.types.NullValue import org.apache.flink.graph.Edge import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.graph.spargel.VertexUpdateFunction @@ -95,7 +94,7 @@ object SingleSourceShortestPaths { override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) { var minDistance = Double.MaxValue while (inMessages.hasNext) { - var msg = inMessages.next + val msg = inMessages.next if (msg < minDistance) { minDistance = msg } @@ -115,7 +114,7 @@ object SingleSourceShortestPaths { override def sendMessages(vertex: Vertex[Long, Double]) { for (edge: Edge[Long, Double] <- getEdges) { - sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue) + sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue) } } } @@ -135,20 +134,19 @@ object SingleSourceShortestPaths { if(args.length != 4) { System.err.println("Usage: SingleSourceShortestPaths " + " ") - false } fileOutput = true srcVertexId = args(0).toLong edgesInputPath = args(1) outputPath = args(2) - maxIterations = (3).toInt + maxIterations = 3 } else { System.out.println("Executing Single Source Shortest Paths example " + "with default parameters and built-in default data.") System.out.println(" Provide parameters to read input data from files.") System.out.println(" See the documentation for the correct format of input files.") System.out.println("Usage: SingleSourceShortestPaths " + - " "); + " ") } true } diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala index 55faee32d93d9..d7ab1dd44093a 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.api.scala +package org.apache.flink.graph.scala.test import java.lang.reflect.Method import org.apache.flink.graph.scala._ diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala index 1c2cf54d33a56..2fedfc7b7b02f 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala @@ -24,11 +24,11 @@ import org.apache.flink.graph.{Edge, Vertex} object TestGraphUtils { def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = { - return env.fromCollection(getLongLongVertices) + env.fromCollection(getLongLongVertices) } def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = { - return env.fromCollection(getLongLongEdges) + env.fromCollection(getLongLongEdges) } def getLongLongVertices: List[Vertex[Long, Long]] = { diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala index b34704957f6db..2a2b34edf9f0d 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala @@ -22,10 +22,10 @@ import org.apache.flink.api.scala._ import org.apache.flink.graph.scala._ import org.apache.flink.graph.scala.test.TestGraphUtils import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} + import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) @@ -36,33 +36,33 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testInDegrees { + def testInDegrees() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.inDegrees.collect().toList + val res = graph.inDegrees().collect().toList expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testOutDegrees { + def testOutDegrees() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.outDegrees.collect().toList + val res = graph.outDegrees().collect().toList expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testGetDegrees { + def testGetDegrees() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.getDegrees.collect().toList + val res = graph.getDegrees().collect().toList expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala index a963845fa6e34..253040b7a2c44 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala @@ -18,25 +18,20 @@ package org.apache.flink.graph.scala.test.operations +import java.io.{File, FileOutputStream, IOException, OutputStreamWriter} + +import com.google.common.base.Charsets +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.scala._ +import org.apache.flink.core.fs.{FileInputSplit, Path} import org.apache.flink.graph.scala._ -import org.apache.flink.graph.scala.test.TestGraphUtils import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder +import org.apache.flink.types.NullValue +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} + import _root_.scala.collection.JavaConverters._ -import java.io.IOException -import org.apache.flink.core.fs.FileInputSplit -import java.io.File -import java.io.OutputStreamWriter -import java.io.FileOutputStream -import java.io.FileOutputStream -import com.google.common.base.Charsets -import org.apache.flink.core.fs.Path -import org.apache.flink.types.NullValue -import org.apache.flink.api.common.functions.MapFunction @RunWith(classOf[Parameterized]) class GraphCreationWithCsvITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends @@ -46,7 +41,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testCsvWithValues { + def testCsvWithValues() { /* * Test with two Csv files, both vertices and edges have values */ @@ -61,14 +56,14 @@ MultipleProgramsTestBase(mode) { pathEdges = edgesSplit.getPath.toString, env = env) - val result = graph.getTriplets.collect() + val result = graph.getTriplets().collect() expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n" - TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult); + TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testCsvNoEdgeValues { + def testCsvNoEdgeValues() { /* * Test with two Csv files; edges have no values */ @@ -84,14 +79,14 @@ MultipleProgramsTestBase(mode) { hasEdgeValues = false, env = env) - val result = graph.getTriplets.collect() + val result = graph.getTriplets().collect() expectedResult = "1,2,one,two,(null)\n3,2,three,two,(null)\n3,1,three,one,(null)\n" - TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult); + TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testCsvWithMapperValues { + def testCsvWithMapperValues() { /* * Test with edges Csv file and vertex mapper initializer */ @@ -104,14 +99,14 @@ MultipleProgramsTestBase(mode) { vertexValueInitializer = new VertexDoubleIdAssigner(), env = env) - val result = graph.getTriplets.collect() + val result = graph.getTriplets().collect() expectedResult = "1,2,1.0,2.0,12\n3,2,3.0,2.0,32\n3,1,3.0,1.0,31\n" - TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult); + TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testCsvNoVertexValues { + def testCsvNoVertexValues() { /* * Test with edges Csv file: no vertex values */ @@ -123,15 +118,15 @@ MultipleProgramsTestBase(mode) { pathEdges = edgesSplit.getPath.toString, env = env) - val result = graph.getTriplets.collect() + val result = graph.getTriplets().collect() expectedResult = "1,2,(null),(null),12\n3,2,(null),(null),32\n" + "3,1,(null),(null),31\n" - TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult); + TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testCsvNoValues { + def testCsvNoValues() { /* * Test with edges Csv file: neither vertex nor edge values */ @@ -144,15 +139,15 @@ MultipleProgramsTestBase(mode) { hasEdgeValues = false, env = env) - val result = graph.getTriplets.collect() + val result = graph.getTriplets().collect() expectedResult = "1,2,(null),(null),(null)\n" + "3,2,(null),(null),(null)\n3,1,(null),(null),(null)\n" - TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult); + TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testCsvOptionsVertices { + def testCsvOptionsVertices() { /* * Test the options for vertices: delimiters, comments, ignore first line. */ @@ -172,14 +167,14 @@ MultipleProgramsTestBase(mode) { pathEdges = edgesSplit.getPath.toString, env = env) - val result = graph.getTriplets.collect() + val result = graph.getTriplets().collect() expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n" - TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult); + TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testCsvOptionsEdges { + def testCsvOptionsEdges() { /* * Test the options for edges: delimiters, comments, ignore first line. */ @@ -199,9 +194,9 @@ MultipleProgramsTestBase(mode) { pathEdges = edgesSplit.getPath.toString, env = env) - val result = graph.getTriplets.collect() + val result = graph.getTriplets().collect() expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n" - TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult); + TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult) } @throws(classOf[IOException]) @@ -214,7 +209,7 @@ MultipleProgramsTestBase(mode) { wrt.close() new FileInputSplit(0, new Path(tempFile.toURI.toString), 0, tempFile.length, - Array("localhost")); + Array("localhost")) } final class VertexDoubleIdAssigner extends MapFunction[Long, Double] { diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala index 4b776e2d6fa77..f6acdc1576c04 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala @@ -23,10 +23,10 @@ import org.apache.flink.graph.scala._ import org.apache.flink.graph.scala.test.TestGraphUtils import org.apache.flink.graph.{Edge, Vertex} import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} +import org.junit.Test + import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) @@ -37,7 +37,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testAddVertex { + def testAddVertex() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -50,7 +50,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testAddVertexExisting { + def testAddVertexExisting() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testAddVertexNoEdges { + def testAddVertexNoEdges() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -74,7 +74,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testAddVertices { + def testAddVertices() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -88,7 +88,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testAddVerticesExisting { + def testAddVerticesExisting() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -102,7 +102,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testRemoveVertex { + def testRemoveVertex() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -114,7 +114,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testRemoveInvalidVertex { + def testRemoveInvalidVertex() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -127,7 +127,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testRemoveVertices { + def testRemoveVertices() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -140,7 +140,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testRemoveValidAndInvalidVertex { + def testRemoveValidAndInvalidVertex() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -153,7 +153,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testAddEdge { + def testAddEdge() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -167,7 +167,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testAddEdges { + def testAddEdges() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -181,7 +181,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testAddEdgesInvalidVertices { + def testAddEdgesInvalidVertices() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -195,7 +195,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testAddExistingEdge { + def testAddExistingEdge() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -209,7 +209,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testRemoveEdge { + def testRemoveEdge() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -221,7 +221,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testRemoveInvalidEdge { + def testRemoveInvalidEdge() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -234,7 +234,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testRemoveEdges { + def testRemoveEdges() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -247,7 +247,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testRemoveSameEdgeTwiceEdges { + def testRemoveSameEdgeTwiceEdges() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala index 7f7ebc0bdc75d..9d77e6861c1a0 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala @@ -23,10 +23,9 @@ import org.apache.flink.graph.scala._ import org.apache.flink.graph.scala.test.TestGraphUtils import org.apache.flink.graph.{Edge, Vertex} import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} +import org.junit.Test import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) @@ -37,11 +36,11 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testUndirected { + def testUndirected() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.getUndirected.getEdges.collect().toList; + val res = graph.getUndirected().getEdges.collect().toList expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," + "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" + @@ -51,11 +50,11 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testReverse { + def testReverse() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.reverse().getEdges.collect().toList; + val res = graph.reverse().getEdges.collect().toList expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," + "45\n" + "1,5,51\n" @@ -64,22 +63,22 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testSubGraph { + def testSubGraph() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] { @throws(classOf[Exception]) def filter(vertex: Vertex[Long, Long]): Boolean = { - return (vertex.getValue > 2) + vertex.getValue > 2 } }, new FilterFunction[Edge[Long, Long]] { @throws(classOf[Exception]) override def filter(edge: Edge[Long, Long]): Boolean = { - return (edge.getValue > 34) + edge.getValue > 34 } - }).getEdges.collect().toList; + }).getEdges.collect().toList expectedResult = "3,5,35\n" + "4,5,45\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -87,14 +86,14 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testSubGraphSugar { + def testSubGraphSugar() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.subgraph( vertex => vertex.getValue > 2, edge => edge.getValue > 34 - ).getEdges.collect().toList; + ).getEdges.collect().toList expectedResult = "3,5,35\n" + "4,5,45\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -102,7 +101,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testFilterOnVertices { + def testFilterOnVertices() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -111,7 +110,7 @@ MultipleProgramsTestBase(mode) { def filter(vertex: Vertex[Long, Long]): Boolean = { vertex.getValue > 2 } - }).getEdges.collect().toList; + }).getEdges.collect().toList expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -119,13 +118,13 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testFilterOnVerticesSugar { + def testFilterOnVerticesSugar() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.filterOnVertices( vertex => vertex.getValue > 2 - ).getEdges.collect().toList; + ).getEdges.collect().toList expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -133,7 +132,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testFilterOnEdges { + def testFilterOnEdges() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -142,7 +141,7 @@ MultipleProgramsTestBase(mode) { def filter(edge: Edge[Long, Long]): Boolean = { edge.getValue > 34 } - }).getEdges.collect().toList; + }).getEdges.collect().toList expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -150,13 +149,13 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testFilterOnEdgesSugar { + def testFilterOnEdgesSugar() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.filterOnEdges( edge => edge.getValue > 34 - ).getEdges.collect().toList; + ).getEdges.collect().toList expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -164,44 +163,44 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testNumberOfVertices { + def testNumberOfVertices() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = env.fromElements(graph.numberOfVertices).collect().toList + val res = env.fromElements(graph.numberOfVertices()).collect().toList expectedResult = "5" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testNumberOfEdges { + def testNumberOfEdges() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = env.fromElements(graph.numberOfEdges).collect().toList + val res = env.fromElements(graph.numberOfEdges()).collect().toList expectedResult = "7" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testVertexIds { + def testVertexIds() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.getVertexIds.collect().toList + val res = graph.getVertexIds().collect().toList expectedResult = "1\n2\n3\n4\n5\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @throws(classOf[Exception]) - def testEdgesIds { + def testEdgesIds() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.getEdgeIds.collect().toList + val res = graph.getEdgeIds().collect().toList expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" + "(5,1)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) @@ -209,7 +208,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testUnion { + def testUnion() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -229,7 +228,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testDifference { + def testDifference() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -250,7 +249,7 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testDifferenceNoCommonVertices { + def testDifferenceNoCommonVertices() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) @@ -270,11 +269,11 @@ MultipleProgramsTestBase(mode) { @Test @throws(classOf[Exception]) - def testTriplets { + def testTriplets() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.getTriplets.collect().toList + val res = graph.getTriplets().collect().toList expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" + "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala index 83fa61b7d008b..0a7f1b9487da0 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala @@ -20,17 +20,16 @@ package org.apache.flink.graph.scala.test.operations import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.scala._ -import org.apache.flink.graph.Edge +import org.apache.flink.graph.{Edge, EdgeJoinFunction} import org.apache.flink.graph.scala._ import org.apache.flink.graph.scala.test.TestGraphUtils import org.apache.flink.graph.scala.utils.EdgeToTuple3Map import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} + import _root_.scala.collection.JavaConverters._ -import org.apache.flink.graph.EdgeJoinFunction @RunWith(classOf[Parameterized]) class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala index f2beb7b0817fe..5998270fcf155 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala @@ -18,18 +18,17 @@ package org.apache.flink.graph.scala.test.operations -import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.scala._ +import org.apache.flink.graph.VertexJoinFunction import org.apache.flink.graph.scala._ import org.apache.flink.graph.scala.test.TestGraphUtils import org.apache.flink.graph.scala.utils.VertexToTuple2Map import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} + import _root_.scala.collection.JavaConverters._ -import org.apache.flink.graph.VertexJoinFunction @RunWith(classOf[Parameterized]) class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala index bdfd569c1af2a..4c1d1f09e5c9b 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala @@ -24,10 +24,10 @@ import org.apache.flink.graph.Edge import org.apache.flink.graph.scala._ import org.apache.flink.graph.scala.test.TestGraphUtils import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} + import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala index 2e51d90cd834a..a27b42c8e94f2 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala @@ -24,10 +24,10 @@ import org.apache.flink.graph.Vertex import org.apache.flink.graph.scala._ import org.apache.flink.graph.scala.test.TestGraphUtils import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} + import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) @@ -47,7 +47,7 @@ MultipleProgramsTestBase(mode) { "2,3\n" + "3,4\n" + "4,5\n" + - "5,6\n"; + "5,6\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) { "2,3\n" + "3,4\n" + "4,5\n" + - "5,6\n"; + "5,6\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala index dcd1deb0985a0..6dda54779ed0e 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala @@ -24,10 +24,10 @@ import org.apache.flink.graph.scala.test.TestGraphUtils import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph} import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.apache.flink.util.Collector -import org.junit.rules.TemporaryFolder +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} + import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala index aef5493aea93b..67e9b9a7ae02b 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala @@ -24,10 +24,10 @@ import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _} import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex} import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.apache.flink.util.Collector -import org.junit.rules.TemporaryFolder +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} + import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java index 947f34314fe1a..f45437646dc1f 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java @@ -135,7 +135,7 @@ public String getDescription() { * * @param edgeToBeRemoved * @param edgesInSSSP - * @return + * @return true or false */ public static boolean isInSSSP(final Edge edgeToBeRemoved, DataSet> edgesInSSSP) throws Exception { @@ -154,9 +154,7 @@ public void updateVertex(Vertex vertex, MessageIterator in if (inMessages.hasNext()) { Long outDegree = getOutDegree() - 1; // check if the vertex has another SP-Edge - if (outDegree > 0) { - // there is another shortest path from the source to this vertex - } else { + if (outDegree <= 0) { // set own value to infinity setNewVertexValue(Double.MAX_VALUE); } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java index 2ad203f2c839c..cd677b6bdb9be 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java @@ -56,7 +56,7 @@ public void testGSACompiler() { env.setParallelism(DEFAULT_PARALLELISM); // compose test program { - DataSet> edges = env.fromElements(new Tuple3( + DataSet> edges = env.fromElements(new Tuple3<>( 1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap()); Graph graph = Graph.fromDataSet(edges, new InitVertices(), env); @@ -124,7 +124,7 @@ private static final class GatherNeighborIds extends GatherFunction neighbor) { return neighbor.getNeighborValue(); } - }; + } @SuppressWarnings("serial") private static final class SelectMinId extends SumFunction { @@ -132,7 +132,7 @@ private static final class SelectMinId extends SumFunction { diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java index ced75084dafb5..2deebcbe26a6f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java @@ -71,7 +71,7 @@ public void testTranslation() { // ------------ construct the test program ------------------ { - DataSet> edges = env.fromElements(new Tuple3( + DataSet> edges = env.fromElements(new Tuple3<>( 1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap()); Graph graph = Graph.fromDataSet(edges, new InitVertices(), env); @@ -98,7 +98,7 @@ public void testTranslation() { assertTrue(result instanceof DeltaIterationResultSet); DeltaIterationResultSet resultSet = (DeltaIterationResultSet) result; - DeltaIteration iteration = (DeltaIteration) resultSet.getIterationHead(); + DeltaIteration iteration = resultSet.getIterationHead(); // check the basic iteration properties assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations()); @@ -142,7 +142,7 @@ private static final class GatherNeighborIds extends GatherFunction neighbor) { return neighbor.getNeighborValue(); } - }; + } @SuppressWarnings("serial") private static final class SelectMinId extends SumFunction { @@ -150,7 +150,7 @@ private static final class SelectMinId extends SumFunction { diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java index 7a8143a9f694b..335481fd59570 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java @@ -61,14 +61,14 @@ public void testSpargelCompiler() { { DataSet> initialVertices = env.fromElements( - new Tuple2(1L, 1L), new Tuple2(2L, 2L)) + new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L)) .map(new Tuple2ToVertexMap()); - DataSet> edges = env.fromElements(new Tuple2(1L, 2L)) + DataSet> edges = env.fromElements(new Tuple2<>(1L, 2L)) .map(new MapFunction, Edge>() { public Edge map(Tuple2 edge) { - return new Edge(edge.f0, edge.f1, NullValue.getInstance()); + return new Edge<>(edge.f0, edge.f1, NullValue.getInstance()); } }); @@ -143,14 +143,14 @@ public void testSpargelCompilerWithBroadcastVariable() { DataSet bcVar = env.fromElements(1L); DataSet> initialVertices = env.fromElements( - new Tuple2(1L, 1L), new Tuple2(2L, 2L)) + new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L)) .map(new Tuple2ToVertexMap()); - DataSet> edges = env.fromElements(new Tuple2(1L, 2L)) + DataSet> edges = env.fromElements(new Tuple2<>(1L, 2L)) .map(new MapFunction, Edge>() { public Edge map(Tuple2 edge) { - return new Edge(edge.f0, edge.f1, NullValue.getInstance()); + return new Edge<>(edge.f0, edge.f1, NullValue.getInstance()); } }); diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java index bb3a131658a8e..69aa99c8633b4 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java @@ -52,7 +52,7 @@ public void testTranslationPlainEdges() { final String BC_SET_MESSAGES_NAME = "borat messages"; final String BC_SET_UPDATES_NAME = "borat updates"; - ; + final int NUM_ITERATIONS = 13; final int ITERATION_parallelism = 77; @@ -68,16 +68,16 @@ public void testTranslationPlainEdges() { // ------------ construct the test program ------------------ { - DataSet> initialVertices = env.fromElements(new Tuple2("abc", 3.44)); + DataSet> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44)); - DataSet> edges = env.fromElements(new Tuple2("a", "c")); + DataSet> edges = env.fromElements(new Tuple2<>("a", "c")); Graph graph = Graph.fromTupleDataSet(initialVertices, edges.map(new MapFunction, Tuple3>() { public Tuple3 map( Tuple2 edge) { - return new Tuple3(edge.f0, edge.f1, NullValue.getInstance()); + return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance()); } }), env); @@ -101,7 +101,7 @@ public Tuple3 map( assertTrue(result instanceof DeltaIterationResultSet); DeltaIterationResultSet resultSet = (DeltaIterationResultSet) result; - DeltaIteration iteration = (DeltaIteration) resultSet.getIterationHead(); + DeltaIteration iteration = resultSet.getIterationHead(); // check the basic iteration properties assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations()); @@ -139,7 +139,7 @@ public void testTranslationPlainEdgesWithForkedBroadcastVariable() { final String BC_SET_MESSAGES_NAME = "borat messages"; final String BC_SET_UPDATES_NAME = "borat updates"; - ; + final int NUM_ITERATIONS = 13; final int ITERATION_parallelism = 77; @@ -154,16 +154,16 @@ public void testTranslationPlainEdgesWithForkedBroadcastVariable() { // ------------ construct the test program ------------------ { - DataSet> initialVertices = env.fromElements(new Tuple2("abc", 3.44)); + DataSet> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44)); - DataSet> edges = env.fromElements(new Tuple2("a", "c")); + DataSet> edges = env.fromElements(new Tuple2<>("a", "c")); Graph graph = Graph.fromTupleDataSet(initialVertices, edges.map(new MapFunction, Tuple3>() { public Tuple3 map( Tuple2 edge) { - return new Tuple3(edge.f0, edge.f1, NullValue.getInstance()); + return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance()); } }), env); @@ -187,7 +187,7 @@ public Tuple3 map( assertTrue(result instanceof DeltaIterationResultSet); DeltaIterationResultSet resultSet = (DeltaIterationResultSet) result; - DeltaIteration iteration = (DeltaIteration) resultSet.getIterationHead(); + DeltaIteration iteration = resultSet.getIterationHead(); // check the basic iteration properties assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations()); diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java index 3fbd0bc2cf702..61fe0c2b47c8b 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java @@ -78,7 +78,7 @@ public void sendMessages(Vertex vertex) { public static final class AssignOneMapper implements MapFunction, Long> { public Long map(Vertex value) { - return 1l; + return 1L; } } } \ No newline at end of file diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java index 67d32a8ccc308..1e44d5b46dec0 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java @@ -382,11 +382,10 @@ public HashSet gather(Neighbor, Long> neighbor) { private static final class FindAllReachableVertices extends SumFunction, Long, HashSet> { @Override public HashSet sum(HashSet newSet, HashSet currentSet) { - HashSet set = currentSet; for(Long l : newSet) { - set.add(l); + currentSet.add(l); } - return set; + return currentSet; } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java index 0213f024b0464..039a05cf53043 100755 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java @@ -79,7 +79,7 @@ public void testSingleSourceShortestPaths() throws Exception { new InitMapperSSSP(), env); List> result = inputGraph.run( - new GSASingleSourceShortestPaths(1l, 16)).collect(); + new GSASingleSourceShortestPaths<>(1L, 16)).collect(); expectedResult = "1,0.0\n" + "2,12.0\n" + diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java index 294926f914877..85373d39c1085 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java @@ -32,176 +32,156 @@ public class TestGraphUtils { - public static final DataSet> getLongLongVertexData( + public static DataSet> getLongLongVertexData( ExecutionEnvironment env) { return env.fromCollection(getLongLongVertices()); } - public static final DataSet> getLongLongEdgeData( + public static DataSet> getLongLongEdgeData( ExecutionEnvironment env) { return env.fromCollection(getLongLongEdges()); } - public static final DataSet> getLongLongEdgeInvalidSrcData( + public static DataSet> getLongLongEdgeInvalidSrcData( ExecutionEnvironment env) { List> edges = getLongLongEdges(); edges.remove(1); - edges.add(new Edge(13L, 3L, 13L)); + edges.add(new Edge<>(13L, 3L, 13L)); return env.fromCollection(edges); } - public static final DataSet> getLongLongEdgeInvalidTrgData( + public static DataSet> getLongLongEdgeInvalidTrgData( ExecutionEnvironment env) { List> edges = getLongLongEdges(); edges.remove(0); - edges.add(new Edge(3L, 13L, 13L)); + edges.add(new Edge<>(3L, 13L, 13L)); return env.fromCollection(edges); } - public static final DataSet> getLongLongEdgeInvalidSrcTrgData( + public static DataSet> getLongLongEdgeInvalidSrcTrgData( ExecutionEnvironment env) { List> edges = getLongLongEdges(); edges.remove(0); edges.remove(1); edges.remove(2); - edges.add(new Edge(13L, 3L, 13L)); - edges.add(new Edge(1L, 12L, 12L)); - edges.add(new Edge(13L, 33L, 13L)); + edges.add(new Edge<>(13L, 3L, 13L)); + edges.add(new Edge<>(1L, 12L, 12L)); + edges.add(new Edge<>(13L, 33L, 13L)); return env.fromCollection(edges); } - public static final DataSet> getStringLongEdgeData( + public static DataSet> getStringLongEdgeData( ExecutionEnvironment env) { - List> edges = new ArrayList>(); - edges.add(new Edge("1", "2", 12L)); - edges.add(new Edge("1", "3", 13L)); - edges.add(new Edge("2", "3", 23L)); - edges.add(new Edge("3", "4", 34L)); - edges.add(new Edge("3", "5", 35L)); - edges.add(new Edge("4", "5", 45L)); - edges.add(new Edge("5", "1", 51L)); + List> edges = new ArrayList<>(); + edges.add(new Edge<>("1", "2", 12L)); + edges.add(new Edge<>("1", "3", 13L)); + edges.add(new Edge<>("2", "3", 23L)); + edges.add(new Edge<>("3", "4", 34L)); + edges.add(new Edge<>("3", "5", 35L)); + edges.add(new Edge<>("4", "5", 45L)); + edges.add(new Edge<>("5", "1", 51L)); return env.fromCollection(edges); } - public static final DataSet> getLongLongTuple2Data( + public static DataSet> getLongLongTuple2Data( ExecutionEnvironment env) { - List> tuples = new ArrayList>(); - tuples.add(new Tuple2(1L, 10L)); - tuples.add(new Tuple2(2L, 20L)); - tuples.add(new Tuple2(3L, 30L)); - tuples.add(new Tuple2(4L, 40L)); - tuples.add(new Tuple2(6L, 60L)); + List> tuples = new ArrayList<>(); + tuples.add(new Tuple2<>(1L, 10L)); + tuples.add(new Tuple2<>(2L, 20L)); + tuples.add(new Tuple2<>(3L, 30L)); + tuples.add(new Tuple2<>(4L, 40L)); + tuples.add(new Tuple2<>(6L, 60L)); return env.fromCollection(tuples); } - public static final DataSet> getLongLongTuple2SourceData( + public static DataSet> getLongLongTuple2SourceData( ExecutionEnvironment env) { - List> tuples = new ArrayList>(); - tuples.add(new Tuple2(1L, 10L)); - tuples.add(new Tuple2(1L, 20L)); - tuples.add(new Tuple2(2L, 30L)); - tuples.add(new Tuple2(3L, 40L)); - tuples.add(new Tuple2(3L, 50L)); - tuples.add(new Tuple2(4L, 60L)); - tuples.add(new Tuple2(6L, 70L)); + List> tuples = new ArrayList<>(); + tuples.add(new Tuple2<>(1L, 10L)); + tuples.add(new Tuple2<>(1L, 20L)); + tuples.add(new Tuple2<>(2L, 30L)); + tuples.add(new Tuple2<>(3L, 40L)); + tuples.add(new Tuple2<>(3L, 50L)); + tuples.add(new Tuple2<>(4L, 60L)); + tuples.add(new Tuple2<>(6L, 70L)); return env.fromCollection(tuples); } - public static final DataSet> getLongLongTuple2TargetData( + public static DataSet> getLongLongTuple2TargetData( ExecutionEnvironment env) { - List> tuples = new ArrayList>(); - tuples.add(new Tuple2(2L, 10L)); - tuples.add(new Tuple2(3L, 20L)); - tuples.add(new Tuple2(3L, 30L)); - tuples.add(new Tuple2(4L, 40L)); - tuples.add(new Tuple2(6L, 50L)); - tuples.add(new Tuple2(6L, 60L)); - tuples.add(new Tuple2(1L, 70L)); + List> tuples = new ArrayList<>(); + tuples.add(new Tuple2<>(2L, 10L)); + tuples.add(new Tuple2<>(3L, 20L)); + tuples.add(new Tuple2<>(3L, 30L)); + tuples.add(new Tuple2<>(4L, 40L)); + tuples.add(new Tuple2<>(6L, 50L)); + tuples.add(new Tuple2<>(6L, 60L)); + tuples.add(new Tuple2<>(1L, 70L)); return env.fromCollection(tuples); } - public static final DataSet> getLongLongLongTuple3Data( + public static DataSet> getLongLongLongTuple3Data( ExecutionEnvironment env) { - List> tuples = new ArrayList>(); - tuples.add(new Tuple3(1L, 2L, 12L)); - tuples.add(new Tuple3(1L, 3L, 13L)); - tuples.add(new Tuple3(2L, 3L, 23L)); - tuples.add(new Tuple3(3L, 4L, 34L)); - tuples.add(new Tuple3(3L, 6L, 36L)); - tuples.add(new Tuple3(4L, 6L, 46L)); - tuples.add(new Tuple3(6L, 1L, 61L)); + List> tuples = new ArrayList<>(); + tuples.add(new Tuple3<>(1L, 2L, 12L)); + tuples.add(new Tuple3<>(1L, 3L, 13L)); + tuples.add(new Tuple3<>(2L, 3L, 23L)); + tuples.add(new Tuple3<>(3L, 4L, 34L)); + tuples.add(new Tuple3<>(3L, 6L, 36L)); + tuples.add(new Tuple3<>(4L, 6L, 46L)); + tuples.add(new Tuple3<>(6L, 1L, 61L)); return env.fromCollection(tuples); } - public static final DataSet>> getLongCustomTuple2Data( + public static DataSet>> getLongCustomTuple2Data( ExecutionEnvironment env) { - List>> tuples = new ArrayList>>(); - tuples.add(new Tuple2>(1L, - new DummyCustomParameterizedType(10, 10f))); - tuples.add(new Tuple2>(2L, - new DummyCustomParameterizedType(20, 20f))); - tuples.add(new Tuple2>(3L, - new DummyCustomParameterizedType(30, 30f))); - tuples.add(new Tuple2>(4L, - new DummyCustomParameterizedType(40, 40f))); + List>> tuples = new ArrayList<>(); + tuples.add(new Tuple2<>(1L, new DummyCustomParameterizedType<>(10, 10f))); + tuples.add(new Tuple2<>(2L, new DummyCustomParameterizedType<>(20, 20f))); + tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(30, 30f))); + tuples.add(new Tuple2<>(4L, new DummyCustomParameterizedType<>(40, 40f))); return env.fromCollection(tuples); } - public static final DataSet>> getLongCustomTuple2SourceData( + public static DataSet>> getLongCustomTuple2SourceData( ExecutionEnvironment env) { - List>> tuples = new ArrayList>>(); - tuples.add(new Tuple2>(1L, - new DummyCustomParameterizedType(10, 10f))); - tuples.add(new Tuple2>(1L, - new DummyCustomParameterizedType(20, 20f))); - tuples.add(new Tuple2>(2L, - new DummyCustomParameterizedType(30, 30f))); - tuples.add(new Tuple2>(3L, - new DummyCustomParameterizedType(40, 40f))); + List>> tuples = new ArrayList<>(); + tuples.add(new Tuple2<>(1L, new DummyCustomParameterizedType<>(10, 10f))); + tuples.add(new Tuple2<>(1L, new DummyCustomParameterizedType<>(20, 20f))); + tuples.add(new Tuple2<>(2L, new DummyCustomParameterizedType<>(30, 30f))); + tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(40, 40f))); return env.fromCollection(tuples); } - public static final DataSet>> getLongCustomTuple2TargetData( + public static DataSet>> getLongCustomTuple2TargetData( ExecutionEnvironment env) { - List>> tuples = new ArrayList>>(); - tuples.add(new Tuple2>(2L, - new DummyCustomParameterizedType(10, 10f))); - tuples.add(new Tuple2>(3L, - new DummyCustomParameterizedType(20, 20f))); - tuples.add(new Tuple2>(3L, - new DummyCustomParameterizedType(30, 30f))); - tuples.add(new Tuple2>(4L, - new DummyCustomParameterizedType(40, 40f))); + List>> tuples = new ArrayList<>(); + tuples.add(new Tuple2<>(2L, new DummyCustomParameterizedType<>(10, 10f))); + tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(20, 20f))); + tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(30, 30f))); + tuples.add(new Tuple2<>(4L, new DummyCustomParameterizedType<>(40, 40f))); return env.fromCollection(tuples); } - public static final DataSet>> getLongLongCustomTuple3Data( + public static DataSet>> getLongLongCustomTuple3Data( ExecutionEnvironment env) { - List>> tuples = - new ArrayList>>(); - tuples.add(new Tuple3>(1L, 2L, - new DummyCustomParameterizedType(10, 10f))); - tuples.add(new Tuple3>(1L, 3L, - new DummyCustomParameterizedType(20, 20f))); - tuples.add(new Tuple3>(2L, 3L, - new DummyCustomParameterizedType(30, 30f))); - tuples.add(new Tuple3>(3L, 4L, - new DummyCustomParameterizedType(40, 40f))); + List>> tuples = new ArrayList<>(); + tuples.add(new Tuple3<>(1L, 2L, new DummyCustomParameterizedType<>(10, 10f))); + tuples.add(new Tuple3<>(1L, 3L, new DummyCustomParameterizedType<>(20, 20f))); + tuples.add(new Tuple3<>(2L, 3L, new DummyCustomParameterizedType<>(30, 30f))); + tuples.add(new Tuple3<>(3L, 4L, new DummyCustomParameterizedType<>(40, 40f))); return env.fromCollection(tuples); } @@ -209,12 +189,12 @@ public static final DataSet> getLongLongInvalidVertexData( + public static DataSet> getLongLongInvalidVertexData( ExecutionEnvironment env) { List> vertices = getLongLongVertices(); vertices.remove(0); - vertices.add(new Vertex(15L, 1L)); + vertices.add(new Vertex<>(15L, 1L)); return env.fromCollection(vertices); } @@ -222,15 +202,15 @@ public static final DataSet> getLongLongInvalidVertexData( /** * A graph that has at least one vertex with no ingoing/outgoing edges */ - public static final DataSet> getLongLongEdgeDataWithZeroDegree( + public static DataSet> getLongLongEdgeDataWithZeroDegree( ExecutionEnvironment env) { - List> edges = new ArrayList>(); - edges.add(new Edge(1L, 2L, 12L)); - edges.add(new Edge(1L, 4L, 14L)); - edges.add(new Edge(1L, 5L, 15L)); - edges.add(new Edge(2L, 3L, 23L)); - edges.add(new Edge(3L, 5L, 35L)); - edges.add(new Edge(4L, 5L, 45L)); + List> edges = new ArrayList<>(); + edges.add(new Edge<>(1L, 2L, 12L)); + edges.add(new Edge<>(1L, 4L, 14L)); + edges.add(new Edge<>(1L, 5L, 15L)); + edges.add(new Edge<>(2L, 3L, 23L)); + edges.add(new Edge<>(3L, 5L, 35L)); + edges.add(new Edge<>(4L, 5L, 45L)); return env.fromCollection(edges); } @@ -238,35 +218,34 @@ public static final DataSet> getLongLongEdgeDataWithZeroDegree( /** * Function that produces an ArrayList of vertices */ - public static final List> getLongLongVertices() { - List> vertices = new ArrayList>(); - vertices.add(new Vertex(1L, 1L)); - vertices.add(new Vertex(2L, 2L)); - vertices.add(new Vertex(3L, 3L)); - vertices.add(new Vertex(4L, 4L)); - vertices.add(new Vertex(5L, 5L)); + public static List> getLongLongVertices() { + List> vertices = new ArrayList<>(); + vertices.add(new Vertex<>(1L, 1L)); + vertices.add(new Vertex<>(2L, 2L)); + vertices.add(new Vertex<>(3L, 3L)); + vertices.add(new Vertex<>(4L, 4L)); + vertices.add(new Vertex<>(5L, 5L)); return vertices; } - public static final List> getLongBooleanVertices() { - List> vertices = new ArrayList>(); - vertices.add(new Vertex(1L, true)); - vertices.add(new Vertex(2L, true)); - vertices.add(new Vertex(3L, true)); - vertices.add(new Vertex(4L, true)); - vertices.add(new Vertex(5L, true)); + public static List> getLongBooleanVertices() { + List> vertices = new ArrayList<>(); + vertices.add(new Vertex<>(1L, true)); + vertices.add(new Vertex<>(2L, true)); + vertices.add(new Vertex<>(3L, true)); + vertices.add(new Vertex<>(4L, true)); + vertices.add(new Vertex<>(5L, true)); return vertices; } - public static final DataSet> getDisconnectedLongLongEdgeData( - ExecutionEnvironment env) { - List> edges = new ArrayList>(); - edges.add(new Edge(1L, 2L, 12L)); - edges.add(new Edge(1L, 3L, 13L)); - edges.add(new Edge(2L, 3L, 23L)); - edges.add(new Edge(4L, 5L, 45L)); + public static DataSet> getDisconnectedLongLongEdgeData(ExecutionEnvironment env) { + List> edges = new ArrayList<>(); + edges.add(new Edge<>(1L, 2L, 12L)); + edges.add(new Edge<>(1L, 3L, 13L)); + edges.add(new Edge<>(2L, 3L, 23L)); + edges.add(new Edge<>(4L, 5L, 45L)); return env.fromCollection(edges); } @@ -274,15 +253,15 @@ public static final DataSet> getDisconnectedLongLongEdgeData( /** * Function that produces an ArrayList of edges */ - public static final List> getLongLongEdges() { - List> edges = new ArrayList>(); - edges.add(new Edge(1L, 2L, 12L)); - edges.add(new Edge(1L, 3L, 13L)); - edges.add(new Edge(2L, 3L, 23L)); - edges.add(new Edge(3L, 4L, 34L)); - edges.add(new Edge(3L, 5L, 35L)); - edges.add(new Edge(4L, 5L, 45L)); - edges.add(new Edge(5L, 1L, 51L)); + public static List> getLongLongEdges() { + List> edges = new ArrayList<>(); + edges.add(new Edge<>(1L, 2L, 12L)); + edges.add(new Edge<>(1L, 3L, 13L)); + edges.add(new Edge<>(2L, 3L, 23L)); + edges.add(new Edge<>(3L, 4L, 34L)); + edges.add(new Edge<>(3L, 5L, 35L)); + edges.add(new Edge<>(4L, 5L, 45L)); + edges.add(new Edge<>(5L, 1L, 51L)); return edges; } @@ -373,45 +352,41 @@ public void write(int b){} /** * utils for getting the second graph for the test of method difference(); - * @param env + * @param env - ExecutionEnvironment */ - public static final DataSet> getLongLongEdgeDataDifference( - ExecutionEnvironment env){ + public static DataSet> getLongLongEdgeDataDifference(ExecutionEnvironment env) { return env.fromCollection(getLongLongEdgesForDifference()); } - public static final DataSet> getLongLongEdgeDataDifference2( - ExecutionEnvironment env){ + public static DataSet> getLongLongEdgeDataDifference2(ExecutionEnvironment env) { return env.fromCollection(getLongLongEdgesForDifference2()); } - public static final DataSet> getLongLongVertexDataDifference( - ExecutionEnvironment env) - { + public static DataSet> getLongLongVertexDataDifference(ExecutionEnvironment env) { return env.fromCollection(getVerticesForDifference()); } - public static final List> getVerticesForDifference(){ - List> vertices = new ArrayList>(); - vertices.add(new Vertex(1L, 1L)); - vertices.add(new Vertex(3L, 3L)); - vertices.add(new Vertex(6L, 6L)); + public static List> getVerticesForDifference(){ + List> vertices = new ArrayList<>(); + vertices.add(new Vertex<>(1L, 1L)); + vertices.add(new Vertex<>(3L, 3L)); + vertices.add(new Vertex<>(6L, 6L)); return vertices; } - public static final List> getLongLongEdgesForDifference() { - List> edges = new ArrayList>(); - edges.add(new Edge(1L, 3L, 13L)); - edges.add(new Edge(1L, 6L, 26L)); - edges.add(new Edge(6L, 3L, 63L)); + public static List> getLongLongEdgesForDifference() { + List> edges = new ArrayList<>(); + edges.add(new Edge<>(1L, 3L, 13L)); + edges.add(new Edge<>(1L, 6L, 26L)); + edges.add(new Edge<>(6L, 3L, 63L)); return edges; } - public static final List> getLongLongEdgesForDifference2() { - List> edges = new ArrayList>(); - edges.add(new Edge(6L, 6L, 66L)); + public static List> getLongLongEdgesForDifference2() { + List> edges = new ArrayList<>(); + edges.add(new Edge<>(6L, 6L, 66L)); return edges; } } \ No newline at end of file diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java index 567b194872650..0820e6a7f9d92 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java @@ -75,7 +75,7 @@ public void testRunWithConfiguration() throws Exception { new UpdateFunction(), new MessageFunction(), 10, parameters); DataSet> data = res.getVertices(); - List> result= data.collect(); + List> result= data.collect(); expectedResult = "1,11\n" + "2,11\n" + @@ -137,7 +137,7 @@ public void testDefaultConfiguration() throws Exception { DataSet> data = res.getVertices().map(new VertexToTuple2Map()); - List> result= data.collect(); + List> result= data.collect(); expectedResult = "1,6\n" + "2,6\n" + @@ -231,7 +231,7 @@ public void testIterationALLDirection() throws Exception { .runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters) .getVertices(); - List>> result= resultedVertices.collect(); + List>> result= resultedVertices.collect(); expectedResult = "1,[2, 3, 5]\n" + "2,[1, 3]\n" + @@ -264,7 +264,7 @@ public void testSendToAllDirectionIN() throws Exception { .runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters) .getVertices(); - List>> result = resultedVertices.collect(); + List>> result = resultedVertices.collect(); expectedResult = "1,[2, 3]\n" + "2,[3]\n" + @@ -297,7 +297,7 @@ public void testSendToAllDirectionOUT() throws Exception { .runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters) .getVertices(); - List>> result = resultedVertices.collect(); + List>> result = resultedVertices.collect(); expectedResult = "1,[5]\n" + "2,[1]\n" + @@ -330,7 +330,7 @@ public void testSendToAllDirectionALL() throws Exception { .runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters) .getVertices(); - List>> result = resultedVertices.collect(); + List>> result = resultedVertices.collect(); expectedResult = "1,[2, 3, 5]\n" + "2,[1, 3]\n" + @@ -356,7 +356,7 @@ public void testNumVerticesNotSet() throws Exception { DataSet> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(), new DummyMessageFunction(), 2).getVertices(); - List> result= verticesWithNumVertices.collect(); + List> result= verticesWithNumVertices.collect(); expectedResult = "1,-1\n" + "2,-1\n" + @@ -388,7 +388,7 @@ public void testInDegreesSet() throws Exception { DataSet> verticesWithDegrees = graph.runVertexCentricIteration( new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices(); - List> result= verticesWithDegrees.collect(); + List> result= verticesWithDegrees.collect(); expectedResult = "1,1\n" + "2,1\n" + @@ -413,7 +413,7 @@ public void testInDegreesNotSet() throws Exception { DataSet> verticesWithDegrees = graph.runVertexCentricIteration( new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices(); - List> result= verticesWithDegrees.collect(); + List> result= verticesWithDegrees.collect(); expectedResult = "1,-1\n" + "2,-1\n" + @@ -445,7 +445,7 @@ public void testOutDegreesSet() throws Exception { DataSet> verticesWithDegrees = graph.runVertexCentricIteration( new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices(); - List> result= verticesWithDegrees.collect(); + List> result= verticesWithDegrees.collect(); expectedResult = "1,2\n" + "2,1\n" + @@ -470,7 +470,7 @@ public void testOutDegreesNotSet() throws Exception { DataSet> verticesWithDegrees = graph.runVertexCentricIteration( new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices(); - List> result= verticesWithDegrees.collect(); + List> result= verticesWithDegrees.collect(); expectedResult = "1,-1\n" + "2,-1\n" + @@ -502,7 +502,7 @@ public void testDirectionALLAndDegrees() throws Exception { DataSet> verticesWithNumNeighbors = graph.runVertexCentricIteration( new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices(); - List> result= verticesWithNumNeighbors.collect(); + List> result= verticesWithNumNeighbors.collect(); expectedResult = "1,true\n" + "2,true\n" + @@ -548,8 +548,6 @@ public void updateVertex(Vertex vertex, MessageIterator inMess @SuppressWarnings("serial") public static final class UpdateFunctionDefault extends VertexUpdateFunction { - LongSumAggregator aggregator = new LongSumAggregator(); - @Override public void updateVertex(Vertex vertex, MessageIterator inMessages) { @@ -643,11 +641,11 @@ public static final class DegreesMessageFunction extends MessagingFunction vertex) { - if (vertex.getId().equals(1)) { + if (vertex.getId() == 1) { Assert.assertEquals(2, getOutDegree()); Assert.assertEquals(1, getInDegree()); } - else if(vertex.getId().equals(3)) { + else if(vertex.getId() == 3) { Assert.assertEquals(2, getOutDegree()); Assert.assertEquals(2, getInDegree()); } @@ -735,7 +733,7 @@ public static final class IdMessengerAll extends MessagingFunction> vertex) throws Exception { for (Edge edge : getEdges()) { - if(edge.getSource() != vertex.getId()) { + if(!edge.getSource().equals(vertex.getId())) { sendMessageTo(edge.getSource(), vertex.getId()); } else { sendMessageTo(edge.getTarget(), vertex.getId()); @@ -759,7 +757,7 @@ public static final class IdMessenger extends MessagingFunction vertex) throws Exception { for (Edge edge : getEdges()) { - if(edge.getSource() != vertex.getId()) { + if(!edge.getSource().equals(vertex.getId())) { sendMessageTo(edge.getSource(), vertex.getId()); } else { sendMessageTo(edge.getTarget(), vertex.getId()); @@ -783,7 +781,7 @@ public void sendMessages(Vertex> vertex) throws Exception { public static final class AssignOneMapper implements MapFunction, Long> { public Long map(Vertex value) { - return 1l; + return 1L; } } @@ -792,7 +790,7 @@ public static final class InitialiseHashSetMapper implements MapFunction map(Vertex value) throws Exception { - return new HashSet(); + return new HashSet<>(); } } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java index c19411b9980f5..a23bb61e6aafb 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java @@ -92,7 +92,7 @@ public void testIncrementalSSSPNonSPEdge() throws Exception { DataSet> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env); DataSet> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env); // the edge to be removed is a non-SP edge - Edge edgeToBeRemoved = new Edge(3L, 5L, 5.0); + Edge edgeToBeRemoved = new Edge<>(3L, 5L, 5.0); Graph graph = Graph.fromDataSet(vertices, edges, env); // Assumption: all minimum weight paths are kept diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java index 5aa9f26f3785a..8152885f10342 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java @@ -82,7 +82,7 @@ public void testMusicProfilesExample() throws Exception { public void after() throws Exception { compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath); - ArrayList list = new ArrayList(); + ArrayList list = new ArrayList<>(); readAllResultLines(list, communitiesResultPath, new String[]{}, false); String[] result = list.toArray(new String[list.size()]); diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java index 9eb7387ad8896..c8d85f0a58263 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java @@ -73,7 +73,7 @@ protected void testProgram() throws Exception { public static final class IdAssigner implements MapFunction> { @Override public Vertex map(Long value) { - return new Vertex(value, value); + return new Vertex<>(value, value); } } @@ -87,7 +87,7 @@ protected void postSubmit() throws Exception { public static final class EdgeParser extends RichMapFunction> { public Edge map(String value) { String[] nums = value.split(" "); - return new Edge(Long.parseLong(nums[0]), Long.parseLong(nums[1]), + return new Edge<>(Long.parseLong(nums[0]), Long.parseLong(nums[1]), NullValue.getInstance()); } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java index 94c77131767a4..431ab7028198f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java @@ -41,8 +41,6 @@ public PageRankITCase(TestExecutionMode mode){ super(mode); } - private String expectedResult; - @Test public void testPageRankWithThreeIterations() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -53,7 +51,7 @@ public void testPageRankWithThreeIterations() throws Exception { List> result = inputGraph.run(new PageRank(0.85, 3)) .collect(); - compareWithDelta(result, expectedResult, 0.01); + compareWithDelta(result, 0.01); } @Test @@ -66,7 +64,7 @@ public void testGSAPageRankWithThreeIterations() throws Exception { List> result = inputGraph.run(new GSAPageRank(0.85, 3)) .collect(); - compareWithDelta(result, expectedResult, 0.01); + compareWithDelta(result, 0.01); } @Test @@ -79,7 +77,7 @@ public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception { List> result = inputGraph.run(new PageRank(0.85, 5, 3)) .collect(); - compareWithDelta(result, expectedResult, 0.01); + compareWithDelta(result, 0.01); } @Test @@ -92,18 +90,18 @@ public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exceptio List> result = inputGraph.run(new GSAPageRank(0.85, 5, 3)) .collect(); - compareWithDelta(result, expectedResult, 0.01); + compareWithDelta(result, 0.01); } private void compareWithDelta(List> result, - String expectedResult, double delta) { + double delta) { String resultString = ""; for (Vertex v : result) { resultString += v.f0.toString() + "," + v.f1.toString() +"\n"; } - - expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS; + + String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS; String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n"); String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n"); diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java index 1d9ab9f234cf3..15f59fe568ae3 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java @@ -34,8 +34,6 @@ @RunWith(Parameterized.class) public class TriangleCountITCase extends MultipleProgramsTestBase { - private String expectedResult; - public TriangleCountITCase(TestExecutionMode mode) { super(mode); } @@ -49,7 +47,7 @@ public void testGSATriangleCount() throws Exception { env).getUndirected(); List numberOfTriangles = graph.run(new GSATriangleCount()).collect(); - expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES; + String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES; Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult)); } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java index 20f4454b5c74c..9995bad84439a 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java @@ -124,7 +124,7 @@ public void testValidate() throws Exception { //env.fromElements(result).writeAsText(resultPath); String res= valid.toString();//env.fromElements(valid); - List result= new LinkedList(); + List result= new LinkedList<>(); result.add(res); expectedResult = "true"; @@ -144,7 +144,7 @@ public void testValidateWithInvalidIds() throws Exception { Boolean valid = graph.validate(new InvalidVertexIdsValidator()); String res= valid.toString();//env.fromElements(valid); - List result= new LinkedList(); + List result= new LinkedList<>(); result.add(res); expectedResult = "false\n"; @@ -216,8 +216,7 @@ public Long map(Long vertexId) { private static final class AssignCustomVertexValueMapper implements MapFunction> { - DummyCustomParameterizedType dummyValue = - new DummyCustomParameterizedType(); + DummyCustomParameterizedType dummyValue = new DummyCustomParameterizedType<>(); public DummyCustomParameterizedType map(Long vertexId) { dummyValue.setIntField(vertexId.intValue()-1); diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java index 20cbca567c65e..c78e6ba3481a5 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java @@ -138,7 +138,7 @@ public Double map(Long value) { @SuppressWarnings("serial") private static final class AssignTuple2ValueMapper implements MapFunction> { public Tuple2 map(Long vertexId) { - return new Tuple2(vertexId*2, 42l); + return new Tuple2<>(vertexId*2, 42L); } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java index d6e5a9cd2727a..704a913d2fab3 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java @@ -39,8 +39,7 @@ public GraphMutationsITCase(TestExecutionMode mode){ super(mode); } - private String expectedResult; - + private String expectedResult; @Test public void testAddVertex() throws Exception { @@ -53,10 +52,10 @@ public void testAddVertex() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.addVertex(new Vertex(6L, 6L)); - + graph = graph.addVertex(new Vertex<>(6L, 6L)); + DataSet> data = graph.getVertices(); - List> result= data.collect(); + List> result = data.collect(); expectedResult = "1,1\n" + "2,2\n" + @@ -64,7 +63,7 @@ public void testAddVertex() throws Exception { "4,4\n" + "5,5\n" + "6,6\n"; - + compareResultAsTuples(result, expectedResult); } @@ -79,14 +78,14 @@ public void testAddVertices() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> vertices = new ArrayList>(); - vertices.add(new Vertex(6L, 6L)); - vertices.add(new Vertex(7L, 7L)); + List> vertices = new ArrayList<>(); + vertices.add(new Vertex<>(6L, 6L)); + vertices.add(new Vertex<>(7L, 7L)); graph = graph.addVertices(vertices); DataSet> data = graph.getVertices(); - List> result= data.collect(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + @@ -95,7 +94,7 @@ public void testAddVertices() throws Exception { "5,5\n" + "6,6\n" + "7,7\n"; - + compareResultAsTuples(result, expectedResult); } @@ -109,17 +108,17 @@ public void testAddVertexExisting() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.addVertex(new Vertex(1L, 1L)); - + graph = graph.addVertex(new Vertex<>(1L, 1L)); + DataSet> data = graph.getVertices(); - List> result= data.collect(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"; - + compareResultAsTuples(result, expectedResult); } @@ -134,21 +133,21 @@ public void testAddVerticesBothExisting() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> vertices = new ArrayList>(); - vertices.add(new Vertex(1L, 1L)); - vertices.add(new Vertex(3L, 3L)); + List> vertices = new ArrayList<>(); + vertices.add(new Vertex<>(1L, 1L)); + vertices.add(new Vertex<>(3L, 3L)); graph = graph.addVertices(vertices); DataSet> data = graph.getVertices(); - List> result= data.collect(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"; - + compareResultAsTuples(result, expectedResult); } @@ -163,14 +162,14 @@ public void testAddVerticesOneExisting() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> vertices = new ArrayList>(); - vertices.add(new Vertex(1L, 1L)); - vertices.add(new Vertex(6L, 6L)); + List> vertices = new ArrayList<>(); + vertices.add(new Vertex<>(1L, 1L)); + vertices.add(new Vertex<>(6L, 6L)); graph = graph.addVertices(vertices); DataSet> data = graph.getVertices(); - List> result= data.collect(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + @@ -178,7 +177,7 @@ public void testAddVerticesOneExisting() throws Exception { "4,4\n" + "5,5\n" + "6,6\n"; - + compareResultAsTuples(result, expectedResult); } @@ -192,16 +191,16 @@ public void testRemoveVertex() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeVertex(new Vertex(5L, 5L)); + graph = graph.removeVertex(new Vertex<>(5L, 5L)); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"; - + compareResultAsTuples(result, expectedResult); } @@ -216,19 +215,19 @@ public void testRemoveVertices() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> verticesToBeRemoved = new ArrayList>(); - verticesToBeRemoved.add(new Vertex(1L, 1L)); - verticesToBeRemoved.add(new Vertex(2L, 2L)); + List> verticesToBeRemoved = new ArrayList<>(); + verticesToBeRemoved.add(new Vertex<>(1L, 1L)); + verticesToBeRemoved.add(new Vertex<>(2L, 2L)); graph = graph.removeVertices(verticesToBeRemoved); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; - + compareResultAsTuples(result, expectedResult); } @@ -236,16 +235,16 @@ public void testRemoveVertices() throws Exception { public void testRemoveInvalidVertex() throws Exception { /* * Test removeVertex() -- remove an invalid vertex - */ - + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeVertex(new Vertex(6L, 6L)); + graph = graph.removeVertex(new Vertex<>(6L, 6L)); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -254,7 +253,7 @@ public void testRemoveInvalidVertex() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; - + compareResultAsTuples(result, expectedResult); } @@ -268,20 +267,20 @@ public void testRemoveOneValidOneInvalidVertex() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> verticesToBeRemoved = new ArrayList>(); - verticesToBeRemoved.add(new Vertex(1L, 1L)); - verticesToBeRemoved.add(new Vertex(7L, 7L)); + List> verticesToBeRemoved = new ArrayList<>(); + verticesToBeRemoved.add(new Vertex<>(1L, 1L)); + verticesToBeRemoved.add(new Vertex<>(7L, 7L)); graph = graph.removeVertices(verticesToBeRemoved); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; - + compareResultAsTuples(result, expectedResult); } @@ -295,14 +294,14 @@ public void testRemoveBothInvalidVertices() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> verticesToBeRemoved = new ArrayList>(); - verticesToBeRemoved.add(new Vertex(6L, 6L)); - verticesToBeRemoved.add(new Vertex(7L, 7L)); + List> verticesToBeRemoved = new ArrayList<>(); + verticesToBeRemoved.add(new Vertex<>(6L, 6L)); + verticesToBeRemoved.add(new Vertex<>(7L, 7L)); graph = graph.removeVertices(verticesToBeRemoved); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -311,7 +310,7 @@ public void testRemoveBothInvalidVertices() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; - + compareResultAsTuples(result, expectedResult); } @@ -325,39 +324,38 @@ public void testRemoveBothInvalidVerticesVertexResult() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> verticesToBeRemoved = new ArrayList>(); - verticesToBeRemoved.add(new Vertex(6L, 6L)); - verticesToBeRemoved.add(new Vertex(7L, 7L)); + List> verticesToBeRemoved = new ArrayList<>(); + verticesToBeRemoved.add(new Vertex<>(6L, 6L)); + verticesToBeRemoved.add(new Vertex<>(7L, 7L)); graph = graph.removeVertices(verticesToBeRemoved); - DataSet> data = graph.getVertices(); - List> result= data.collect(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"; - + compareResultAsTuples(result, expectedResult); } - + @Test public void testAddEdge() throws Exception { /* * Test addEdge() -- simple case */ - + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.addEdge(new Vertex(6L, 6L), new Vertex(1L, 1L), - 61L); + graph = graph.addEdge(new Vertex<>(6L, 6L), new Vertex<>(1L, 1L), 61L); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -366,8 +364,8 @@ public void testAddEdge() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n" + - "6,1,61\n"; - + "6,1,61\n"; + compareResultAsTuples(result, expectedResult); } @@ -382,14 +380,14 @@ public void testAddEdges() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> edgesToBeAdded = new ArrayList>(); - edgesToBeAdded.add(new Edge(2L, 4L, 24L)); - edgesToBeAdded.add(new Edge(4L, 1L, 41L)); + List> edgesToBeAdded = new ArrayList<>(); + edgesToBeAdded.add(new Edge<>(2L, 4L, 24L)); + edgesToBeAdded.add(new Edge<>(4L, 1L, 41L)); graph = graph.addEdges(edgesToBeAdded); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -400,7 +398,7 @@ public void testAddEdges() throws Exception { "4,1,41\n" + "4,5,45\n" + "5,1,51\n"; - + compareResultAsTuples(result, expectedResult); } @@ -415,14 +413,14 @@ public void testAddEdgesInvalidVertices() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> edgesToBeAdded = new ArrayList>(); - edgesToBeAdded.add(new Edge(6L, 1L, 61L)); - edgesToBeAdded.add(new Edge(7L, 1L, 71L)); + List> edgesToBeAdded = new ArrayList<>(); + edgesToBeAdded.add(new Edge<>(6L, 1L, 61L)); + edgesToBeAdded.add(new Edge<>(7L, 1L, 71L)); graph = graph.addEdges(edgesToBeAdded); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -431,7 +429,7 @@ public void testAddEdgesInvalidVertices() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; - + compareResultAsTuples(result, expectedResult); } @@ -445,11 +443,11 @@ public void testAddExistingEdge() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.addEdge(new Vertex(1L, 1L), new Vertex(2L, 2L), + graph = graph.addEdge(new Vertex<>(1L, 1L), new Vertex<>(2L, 2L), 12L); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,2,12\n" + @@ -458,8 +456,8 @@ public void testAddExistingEdge() throws Exception { "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + - "5,1,51\n"; - + "5,1,51\n"; + compareResultAsTuples(result, expectedResult); } @@ -473,10 +471,10 @@ public void testRemoveEdge() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeEdge(new Edge(5L, 1L, 51L)); + graph = graph.removeEdge(new Edge<>(5L, 1L, 51L)); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -484,7 +482,7 @@ public void testRemoveEdge() throws Exception { "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; - + compareResultAsTuples(result, expectedResult); } @@ -498,21 +496,21 @@ public void testRemoveEdges() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> edgesToBeRemoved = new ArrayList>(); - edgesToBeRemoved.add(new Edge(5L, 1L, 51L)); - edgesToBeRemoved.add(new Edge(2L, 3L, 23L)); + List> edgesToBeRemoved = new ArrayList<>(); + edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L)); + edgesToBeRemoved.add(new Edge<>(2L, 3L, 23L)); graph = graph.removeEdges(edgesToBeRemoved); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; - + compareResultAsTuples(result, expectedResult); } @@ -526,14 +524,14 @@ public void testRemoveSameEdgeTwice() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> edgesToBeRemoved = new ArrayList>(); - edgesToBeRemoved.add(new Edge(5L, 1L, 51L)); - edgesToBeRemoved.add(new Edge(5L, 1L, 51L)); + List> edgesToBeRemoved = new ArrayList<>(); + edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L)); + edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L)); graph = graph.removeEdges(edgesToBeRemoved); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -541,7 +539,7 @@ public void testRemoveSameEdgeTwice() throws Exception { "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; - + compareResultAsTuples(result, expectedResult); } @@ -552,13 +550,13 @@ public void testRemoveInvalidEdge() throws Exception { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeEdge(new Edge(6L, 1L, 61L)); + graph = graph.removeEdge(new Edge<>(6L, 1L, 61L)); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -567,7 +565,7 @@ public void testRemoveInvalidEdge() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; - + compareResultAsTuples(result, expectedResult); } @@ -581,14 +579,14 @@ public void testRemoveOneValidOneInvalidEdge() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> edgesToBeRemoved = new ArrayList>(); - edgesToBeRemoved.add(new Edge(1L, 1L, 51L)); - edgesToBeRemoved.add(new Edge(6L, 1L, 61L)); + List> edgesToBeRemoved = new ArrayList<>(); + edgesToBeRemoved.add(new Edge<>(1L, 1L, 51L)); + edgesToBeRemoved.add(new Edge<>(6L, 1L, 61L)); graph = graph.removeEdges(edgesToBeRemoved); - DataSet> data = graph.getEdges(); - List> result= data.collect(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java index ffc9da94d2885..df37248d46aac 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java @@ -259,11 +259,11 @@ public void testUnion() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> vertices = new ArrayList>(); - List> edges = new ArrayList>(); + List> vertices = new ArrayList<>(); + List> edges = new ArrayList<>(); - vertices.add(new Vertex(6L, 6L)); - edges.add(new Edge(6L, 1L, 61L)); + vertices.add(new Vertex<>(6L, 6L)); + edges.add(new Edge<>(6L, 1L, 61L)); graph = graph.union(Graph.fromCollection(vertices, edges, env)); @@ -336,7 +336,7 @@ public void testDifference2() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> vertex = env.fromElements(new Vertex(6L, 6L)); + DataSet> vertex = env.fromElements(new Vertex<>(6L, 6L)); Graph graph2 = Graph.fromDataSet(vertex,TestGraphUtils.getLongLongEdgeDataDifference2(env),env); diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java index e0bc35a7b1d4d..2fa3b8c597eaf 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java @@ -473,8 +473,7 @@ public Long edgeJoin(Long edgeValue, Long inputValue) throws Exception { @SuppressWarnings("serial") private static final class BooleanEdgeValueMapper implements MapFunction, Tuple3> { public Tuple3 map(Edge edge) throws Exception { - return new Tuple3(edge.getSource(), - edge.getTarget(), true); + return new Tuple3<>(edge.getSource(), edge.getTarget(), true); } } @@ -512,28 +511,28 @@ public Long edgeJoin(Long edgeValue, @SuppressWarnings("serial") private static final class ProjectSourceAndValueMapper implements MapFunction, Tuple2> { public Tuple2 map(Edge edge) throws Exception { - return new Tuple2(edge.getSource(), edge.getValue()); + return new Tuple2<>(edge.getSource(), edge.getValue()); } } @SuppressWarnings("serial") private static final class ProjectSourceWithTrueMapper implements MapFunction, Tuple2> { public Tuple2 map(Edge edge) throws Exception { - return new Tuple2(edge.getSource(), true); + return new Tuple2<>(edge.getSource(), true); } } @SuppressWarnings("serial") private static final class ProjectTargetAndValueMapper implements MapFunction, Tuple2> { public Tuple2 map(Edge edge) throws Exception { - return new Tuple2(edge.getTarget(), edge.getValue()); + return new Tuple2<>(edge.getTarget(), edge.getValue()); } } @SuppressWarnings("serial") private static final class ProjectTargetWithTrueMapper implements MapFunction, Tuple2> { public Tuple2 map(Edge edge) throws Exception { - return new Tuple2(edge.getTarget(), true); + return new Tuple2<>(edge.getTarget(), true); } } } \ No newline at end of file diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java index 7a25788700e2c..5b77101c15da7 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java @@ -184,7 +184,7 @@ public Long vertexJoin(Long vertexValue, Long inputValue) { @SuppressWarnings("serial") private static final class ProjectIdWithTrue implements MapFunction, Tuple2> { public Tuple2 map(Vertex vertex) throws Exception { - return new Tuple2(vertex.getId(), true); + return new Tuple2<>(vertex.getId(), true); } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java index 35f7b0e3179ed..5e751a58c6851 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java @@ -181,7 +181,7 @@ public String map(Edge edge) throws Exception { @SuppressWarnings("serial") private static final class ToTuple1Mapper implements MapFunction, Tuple1> { public Tuple1 map(Edge edge) throws Exception { - Tuple1 tupleValue = new Tuple1(); + Tuple1 tupleValue = new Tuple1<>(); tupleValue.setFields(edge.getValue()); return tupleValue; } @@ -201,7 +201,7 @@ private static final class ToCustomParametrizedTypeMapper implements MapFunction DummyCustomParameterizedType> { public DummyCustomParameterizedType map(Edge edge) throws Exception { - DummyCustomParameterizedType dummyValue = new DummyCustomParameterizedType(); + DummyCustomParameterizedType dummyValue = new DummyCustomParameterizedType<>(); dummyValue.setIntField(edge.getValue().intValue()); dummyValue.setTField(new Double(edge.getValue())); return dummyValue; diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java index 677a03cafa9dd..108be3ecd9fe6 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java @@ -190,7 +190,7 @@ else if (vertex.getValue() == 5) { @SuppressWarnings("serial") private static final class ToTuple1Mapper implements MapFunction, Tuple1> { public Tuple1 map(Vertex vertex) throws Exception { - Tuple1 tupleValue = new Tuple1(); + Tuple1 tupleValue = new Tuple1<>(); tupleValue.setFields(vertex.getValue()); return tupleValue; } @@ -210,7 +210,7 @@ private static final class ToCustomParametrizedTypeMapper implements MapFunction DummyCustomParameterizedType> { public DummyCustomParameterizedType map(Vertex vertex) throws Exception { - DummyCustomParameterizedType dummyValue = new DummyCustomParameterizedType(); + DummyCustomParameterizedType dummyValue = new DummyCustomParameterizedType<>(); dummyValue.setIntField(vertex.getValue().intValue()); dummyValue.setTField(new Double(vertex.getValue())); return dummyValue; diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java index 3bb19fa5ba9d7..3978d3644060a 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.test.operations; import java.util.List; +import java.util.Objects; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -414,7 +415,7 @@ public void iterateEdges(Vertex v, minNeighborId = edge.getTarget(); } } - out.collect(new Tuple2(v.getId(), minNeighborId)); + out.collect(new Tuple2<>(v.getId(), minNeighborId)); } } @@ -432,7 +433,7 @@ public void iterateEdges(Vertex v, weight = edge.getValue(); } } - out.collect(new Tuple2(v.getId(), weight)); + out.collect(new Tuple2<>(v.getId(), weight)); } } @@ -470,7 +471,7 @@ public void iterateEdges(Vertex v, minNeighborId = edge.getSource(); } } - out.collect(new Tuple2(v.getId(), minNeighborId)); + out.collect(new Tuple2<>(v.getId(), minNeighborId)); } } @@ -482,7 +483,7 @@ public void iterateEdges(Iterable>> edges, Collector> out) throws Exception { for(Tuple2> edge : edges) { - out.collect(new Tuple2(edge.f0, edge.f1.getTarget())); + out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget())); } } } @@ -496,7 +497,7 @@ public void iterateEdges(Iterable>> edges, for(Tuple2> edge : edges) { if(edge.f0 != 5) { - out.collect(new Tuple2(edge.f0, edge.f1.getTarget())); + out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget())); } } } @@ -511,7 +512,7 @@ public void iterateEdges(Vertex v, Iterable> edges, Collector> out) throws Exception { for (Edge edge: edges) { if(v.getValue() > 2) { - out.collect(new Tuple2(v.getId(), edge.getTarget())); + out.collect(new Tuple2<>(v.getId(), edge.getTarget())); } } } @@ -525,7 +526,7 @@ public void iterateEdges(Iterable>> edges, Collector> out) throws Exception { for(Tuple2> edge : edges) { - out.collect(new Tuple2(edge.f0, edge.f1.getSource())); + out.collect(new Tuple2<>(edge.f0, edge.f1.getSource())); } } } @@ -539,7 +540,7 @@ public void iterateEdges(Iterable>> edges, for(Tuple2> edge : edges) { if(edge.f0 != 5) { - out.collect(new Tuple2(edge.f0, edge.f1.getSource())); + out.collect(new Tuple2<>(edge.f0, edge.f1.getSource())); } } } @@ -554,7 +555,7 @@ public void iterateEdges(Vertex v, Iterable> edges, Collector> out) throws Exception { for (Edge edge: edges) { if(v.getValue() > 2) { - out.collect(new Tuple2(v.getId(), edge.getSource())); + out.collect(new Tuple2<>(v.getId(), edge.getSource())); } } } @@ -567,10 +568,10 @@ private static final class SelectNeighbors implements EdgesFunction>> edges, Collector> out) throws Exception { for (Tuple2> edge : edges) { - if (edge.f0 == edge.f1.getTarget()) { - out.collect(new Tuple2(edge.f0, edge.f1.getSource())); + if (Objects.equals(edge.f0, edge.f1.getTarget())) { + out.collect(new Tuple2<>(edge.f0, edge.f1.getSource())); } else { - out.collect(new Tuple2(edge.f0, edge.f1.getTarget())); + out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget())); } } } @@ -584,10 +585,10 @@ public void iterateEdges(Iterable>> edges, Collector> out) throws Exception { for (Tuple2> edge : edges) { if(edge.f0 != 5 && edge.f0 != 2) { - if (edge.f0 == edge.f1.getTarget()) { - out.collect(new Tuple2(edge.f0, edge.f1.getSource())); + if (Objects.equals(edge.f0, edge.f1.getTarget())) { + out.collect(new Tuple2<>(edge.f0, edge.f1.getSource())); } else { - out.collect(new Tuple2(edge.f0, edge.f1.getTarget())); + out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget())); } } } @@ -604,9 +605,9 @@ public void iterateEdges(Vertex v, Iterable> edges, for(Edge edge : edges) { if(v.getValue() > 4) { if(v.getId().equals(edge.getTarget())) { - out.collect(new Tuple2(v.getId(), edge.getSource())); + out.collect(new Tuple2<>(v.getId(), edge.getSource())); } else { - out.collect(new Tuple2(v.getId(), edge.getTarget())); + out.collect(new Tuple2<>(v.getId(), edge.getTarget())); } } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java index ab10947456cf9..61ef446d1b677 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java @@ -131,9 +131,9 @@ public void iterateEdges(Vertex v, Iterable> edges, for(Edge edge : edges) { if(v.getValue() > 4) { if(v.getId().equals(edge.getTarget())) { - out.collect(new Tuple2(v.getId(), edge.getSource())); + out.collect(new Tuple2<>(v.getId(), edge.getSource())); } else { - out.collect(new Tuple2(v.getId(), edge.getTarget())); + out.collect(new Tuple2<>(v.getId(), edge.getTarget())); } } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java index 7553b325833ac..90e3342ff11f9 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java @@ -420,7 +420,7 @@ public void iterateNeighbors(Vertex vertex, for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f1.getValue(); } - out.collect(new Tuple2(vertex.getId(), sum)); + out.collect(new Tuple2<>(vertex.getId(), sum)); } } @@ -437,7 +437,7 @@ public void iterateNeighbors(Vertex vertex, for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f0.getValue() * neighbor.f1.getValue(); } - out.collect(new Tuple2(vertex.getId(), sum)); + out.collect(new Tuple2<>(vertex.getId(), sum)); } } @@ -454,7 +454,7 @@ public void iterateNeighbors(Vertex vertex, for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f1.getValue(); } - out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue())); + out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue())); } } @@ -472,7 +472,7 @@ public void iterateNeighbors(Vertex vertex, sum += neighbor.f1.getValue(); } if(vertex.getId() > 3) { - out.collect(new Tuple2(vertex.getId(), sum)); + out.collect(new Tuple2<>(vertex.getId(), sum)); } } } @@ -491,7 +491,7 @@ public void iterateNeighbors(Vertex vertex, sum += neighbor.f0.getValue() * neighbor.f1.getValue(); } if(vertex.getId() > 3) { - out.collect(new Tuple2(vertex.getId(), sum)); + out.collect(new Tuple2<>(vertex.getId(), sum)); } } } @@ -510,7 +510,7 @@ public void iterateNeighbors(Vertex vertex, sum += neighbor.f1.getValue(); } if(vertex.getId() > 3) { - out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue())); + out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue())); } } } @@ -533,13 +533,11 @@ public void iterateNeighbors(Iterable, Vertex> out) throws Exception { long sum = 0; Tuple3, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); + for (Tuple3, Vertex> neighbor : neighbors) { + next = neighbor; sum += next.f2.getValue() * next.f1.getValue(); } - out.collect(new Tuple2(next.f0, sum)); + out.collect(new Tuple2<>(next.f0, sum)); } } @@ -553,15 +551,13 @@ public void iterateNeighbors(Iterable, Vertex, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); + for (Tuple3, Vertex> neighbor : neighbors) { + next = neighbor; sum += next.f2.getValue(); } if(next.f0 > 2) { - out.collect(new Tuple2(next.f0, sum)); - out.collect(new Tuple2(next.f0, sum * 2)); + out.collect(new Tuple2<>(next.f0, sum)); + out.collect(new Tuple2<>(next.f0, sum * 2)); } } } @@ -576,15 +572,13 @@ public void iterateNeighbors(Iterable, Vertex, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); + for (Tuple3, Vertex> neighbor : neighbors) { + next = neighbor; sum += next.f2.getValue() * next.f1.getValue(); } if(next.f0 > 2) { - out.collect(new Tuple2(next.f0, sum)); - out.collect(new Tuple2(next.f0, sum * 2)); + out.collect(new Tuple2<>(next.f0, sum)); + out.collect(new Tuple2<>(next.f0, sum * 2)); } } } @@ -599,15 +593,13 @@ public void iterateNeighbors(Iterable, Vertex, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); + for (Tuple3, Vertex> neighbor : neighbors) { + next = neighbor; sum += next.f2.getValue(); } if(next.f0 > 2) { - out.collect(new Tuple2(next.f0, sum)); - out.collect(new Tuple2(next.f0, sum * 2)); + out.collect(new Tuple2<>(next.f0, sum)); + out.collect(new Tuple2<>(next.f0, sum * 2)); } } } @@ -625,8 +617,8 @@ public void iterateNeighbors(Vertex vertex, for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f1.getValue(); } - out.collect(new Tuple2(vertex.getId(), sum)); - out.collect(new Tuple2(vertex.getId(), sum * 2)); + out.collect(new Tuple2<>(vertex.getId(), sum)); + out.collect(new Tuple2<>(vertex.getId(), sum * 2)); } } @@ -643,8 +635,8 @@ public void iterateNeighbors(Vertex vertex, for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f0.getValue() * neighbor.f1.getValue(); } - out.collect(new Tuple2(vertex.getId(), sum)); - out.collect(new Tuple2(vertex.getId(), sum - 1)); + out.collect(new Tuple2<>(vertex.getId(), sum)); + out.collect(new Tuple2<>(vertex.getId(), sum - 1)); } } @@ -661,8 +653,8 @@ public void iterateNeighbors(Vertex vertex, for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f1.getValue(); } - out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue())); - out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue() + 5)); + out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue())); + out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue() + 5)); } } } \ No newline at end of file diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java index b32abebfe7cbf..6cc0b6a02eb29 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java @@ -188,7 +188,7 @@ public void iterateNeighbors(Vertex vertex, for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f1.getValue(); } - out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue())); + out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue())); } } diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index b2bfd6bfc733c..608000dde3c60 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -162,8 +162,8 @@ public static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration if (executor.running()) { List tms = executor.getTaskManagersAsJava(); - List> bcVariableManagerResponseFutures = new ArrayList>(); - List> numActiveConnectionsResponseFutures = new ArrayList>(); + List> bcVariableManagerResponseFutures = new ArrayList<>(); + List> numActiveConnectionsResponseFutures = new ArrayList<>(); for (ActorRef tm : tms) { bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages @@ -297,7 +297,7 @@ public static void compareResultsByLinesInMemory(String expectedResultStr, Strin public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception { - ArrayList list = new ArrayList(); + ArrayList list = new ArrayList<>(); readAllResultLines(list, resultPath, excludePrefixes, false); String[] result = list.toArray(new String[list.size()]); @@ -317,7 +317,7 @@ public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedR public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception { - ArrayList list = new ArrayList(); + ArrayList list = new ArrayList<>(); readAllResultLines(list, resultPath, excludePrefixes, true); String[] result = list.toArray(new String[list.size()]); @@ -332,7 +332,7 @@ public static void checkLinesAgainstRegexp(String resultPath, String regexp){ Pattern pattern = Pattern.compile(regexp); Matcher matcher = pattern.matcher(""); - ArrayList list = new ArrayList(); + ArrayList list = new ArrayList<>(); try { readAllResultLines(list, resultPath, new String[]{}, false); } catch (IOException e1) { @@ -355,7 +355,7 @@ public static void compareKeyValuePairsWithDelta(String expectedLines, String re public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String[] excludePrefixes, String delimiter, double maxDelta) throws Exception { - ArrayList list = new ArrayList(); + ArrayList list = new ArrayList<>(); readAllResultLines(list, resultPath, excludePrefixes, false); String[] result = list.toArray(new String[list.size()]); @@ -421,9 +421,7 @@ protected static File asFile(String path) { } else { throw new IllegalArgumentException("This path does not denote a local file."); } - } catch (URISyntaxException e) { - throw new IllegalArgumentException("This path does not describe a valid local file URI."); - } catch (NullPointerException e) { + } catch (URISyntaxException | NullPointerException e) { throw new IllegalArgumentException("This path does not describe a valid local file URI."); } } @@ -441,7 +439,7 @@ public static void compareResultAsText(List result, String expected) { } private static void compareResult(List result, String expected, boolean asTuples) { - String[] extectedStrings = expected.split("\n"); + String[] expectedStrings = expected.split("\n"); String[] resultStrings = new String[result.size()]; for (int i = 0; i < resultStrings.length; i++) { @@ -467,13 +465,13 @@ private static void compareResult(List result, String expected, boolean a } } - assertEquals("Wrong number of elements result", extectedStrings.length, resultStrings.length); + assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length); - Arrays.sort(extectedStrings); + Arrays.sort(expectedStrings); Arrays.sort(resultStrings); - for (int i = 0; i < extectedStrings.length; i++) { - assertEquals(extectedStrings[i], resultStrings[i]); + for (int i = 0; i < expectedStrings.length; i++) { + assertEquals(expectedStrings[i], resultStrings[i]); } } @@ -510,7 +508,7 @@ public static void containsResultAsText(List result, String expected) { // -------------------------------------------------------------------------------------------- protected static Collection toParameterList(Configuration ... testConfigs) { - ArrayList configs = new ArrayList(); + ArrayList configs = new ArrayList<>(); for (Configuration testConfig : testConfigs) { Object[] c = { testConfig }; configs.add(c); @@ -519,7 +517,7 @@ protected static Collection toParameterList(Configuration ... testConf } protected static Collection toParameterList(List testConfigs) { - LinkedList configs = new LinkedList(); + LinkedList configs = new LinkedList<>(); for (Configuration testConfig : testConfigs) { Object[] c = { testConfig }; configs.add(c);