From 65f869d3a386bb1a7370c320f8092a54c3b0379c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Bali?= Date: Thu, 19 Feb 2015 14:58:09 +0100 Subject: [PATCH 1/5] [FLINK-1528] Added Local Clustering Coefficient Example --- .../LocalClusteringCoefficientExample.java | 52 +++++++ .../library/LocalClusteringCoefficient.java | 142 ++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100755 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java create mode 100755 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LocalClusteringCoefficient.java diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java new file mode 100755 index 0000000000000..2eca30d89fc9a --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.ExampleUtils; +import org.apache.flink.graph.library.LocalClusteringCoefficient; + +public class LocalClusteringCoefficientExample implements ProgramDescription { + + public static void main (String [] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> vertices = ExampleUtils.getLongDoubleVertexData(env); + DataSet> edges = ExampleUtils.getLongDoubleEdgeData(env); + Graph graph = Graph.fromDataSet(vertices, edges, env); + + DataSet> clusteringCoefficients = + graph.run(new LocalClusteringCoefficient()).getVertices(); + + clusteringCoefficients.print(); + + env.execute(); + } + + @Override + public String getDescription() { + return "Clustering Coefficient"; + } +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LocalClusteringCoefficient.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LocalClusteringCoefficient.java new file mode 100755 index 0000000000000..5a997bac50b1c --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LocalClusteringCoefficient.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.EdgesFunction; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.NeighborsFunctionWithVertexValue; +import org.apache.flink.graph.Vertex; + +import java.io.Serializable; +import java.util.HashSet; + +@SuppressWarnings("serial") +public class LocalClusteringCoefficient & Serializable> + implements GraphAlgorithm { + + @Override + public Graph run(Graph input) { + + // Get the neighbors of each vertex in a HashSet + DataSet>> neighborhoods = input + .reduceOnEdges(new NeighborhoodEdgesFunction(), EdgeDirection.OUT); + + // Construct a new graph where the neighborhood is the vertex value + Graph, Double> newGraph = input + .mapVertices(new EmptyVertexMapFunction()) + .joinWithVertices(neighborhoods, new NeighborhoodVertexMapFunction()); + + // Calculate clustering coefficient + DataSet> clusteringCoefficients = newGraph + .reduceOnNeighbors(new ClusteringCoefficientNeighborsFunction(), EdgeDirection.OUT); + + // Construct a new graph where the clustering coefficient is the vertex value + Graph result = input + .joinWithVertices(clusteringCoefficients, new ClusteringCoefficientVertexMapFunction()); + + return result; + } + + private static final class NeighborhoodEdgesFunction & Serializable> + implements EdgesFunction>> { + + @Override + public Tuple2> iterateEdges( + Iterable>> edges) throws Exception { + + K vertexId = null; + HashSet neighbors = new HashSet(); + + for (Tuple2> edge : edges) { + vertexId = edge.f0; + neighbors.add(edge.f1.f1); + } + + return new Tuple2>(vertexId, neighbors); + } + } + + private static final class EmptyVertexMapFunction & Serializable> + implements MapFunction, HashSet> { + + @Override + public HashSet map(Vertex arg0) throws Exception { + return new HashSet(); + } + } + + private static final class NeighborhoodVertexMapFunction & Serializable> + implements MapFunction, HashSet>, HashSet> { + + @Override + public HashSet map(Tuple2, HashSet> arg) throws Exception { + return arg.f1; + } + } + + private static final class ClusteringCoefficientNeighborsFunction & Serializable> + implements NeighborsFunctionWithVertexValue, Double, Tuple2> { + + @Override + public Tuple2 iterateNeighbors(Vertex> vertex, + Iterable, Vertex>>> neighbors) throws Exception { + + int deg = vertex.getValue().size(); + int e = 0; + + // Calculate common neighbor count (e) + for (Tuple2, Vertex>> neighbor : neighbors) { + // Iterate neighbor's neighbors + for (K nn : neighbor.f1.f1) { + if (vertex.getValue().contains(nn)) { + e++; + } + } + } + + // Calculate clustering coefficient + double cc; + + if (deg > 1) { + cc = (double) e / (double) (deg * (deg - 1)); + } else { + cc = 0.0; + } + + return new Tuple2(vertex.getId(), cc); + } + } + + private static final class ClusteringCoefficientVertexMapFunction & Serializable> + implements MapFunction, Double> { + + @Override + public Double map(Tuple2 arg) throws Exception { + return arg.f1; + } + + + } +} From 06ded160ddffdebe31645ac27c917b0cc612e68f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Bali?= Date: Fri, 27 Feb 2015 13:38:02 +0100 Subject: [PATCH 2/5] [FLINK-1528] Removed library method and added example instead --- .../LocalClusteringCoefficientExample.java | 190 +++++++++++++++++- .../library/LocalClusteringCoefficient.java | 142 ------------- ...calClusteringCoefficientExampleITCase.java | 104 ++++++++++ 3 files changed, 285 insertions(+), 151 deletions(-) delete mode 100755 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LocalClusteringCoefficient.java create mode 100755 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LocalClusteringCoefficientExampleITCase.java diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java index 2eca30d89fc9a..e14f5f8243009 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java @@ -19,34 +19,206 @@ package org.apache.flink.graph.example; import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.EdgesFunction; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.NeighborsFunctionWithVertexValue; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.ExampleUtils; -import org.apache.flink.graph.library.LocalClusteringCoefficient; + +import java.util.HashSet; public class LocalClusteringCoefficientExample implements ProgramDescription { - + + // -------------------------------------------------------------------------------------------- + // Program + // -------------------------------------------------------------------------------------------- + public static void main (String [] args) throws Exception { + if(!parseParameters(args)) { + return; + } + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> vertices = ExampleUtils.getLongDoubleVertexData(env); - DataSet> edges = ExampleUtils.getLongDoubleEdgeData(env); + DataSet> vertices = getVertexDataSet(env); + DataSet> edges = getEdgeDataSet(env); Graph graph = Graph.fromDataSet(vertices, edges, env); - DataSet> clusteringCoefficients = - graph.run(new LocalClusteringCoefficient()).getVertices(); + // Get the neighbors of each vertex in a HashSet + DataSet>> neighborhoods = graph + .reduceOnEdges(new NeighborhoodEdgesFunction(), EdgeDirection.OUT); + + // Construct a new graph where the neighborhood is the vertex value + Graph, Double> newGraph = graph + .mapVertices(new EmptyVertexMapFunction()) + .joinWithVertices(neighborhoods, new NeighborhoodVertexMapFunction()); + + // Calculate clustering coefficient + DataSet> clusteringCoefficients = newGraph + .reduceOnNeighbors(new ClusteringCoefficientNeighborsFunction(), EdgeDirection.OUT); + + // Emit results + if(fileOutput) { + clusteringCoefficients.writeAsCsv(outputPath, "\n", ","); + } else { + clusteringCoefficients.print(); + } + + env.execute("Local Clustering Coefficient Example"); + } + + // -------------------------------------------------------------------------------------------- + // Clustering Coefficient Functions + // -------------------------------------------------------------------------------------------- + + private static final class NeighborhoodEdgesFunction + implements EdgesFunction>> { + + @Override + public Tuple2> iterateEdges( + Iterable>> edges) throws Exception { + + Long vertexId = null; + HashSet neighbors = new HashSet(); + + for (Tuple2> edge : edges) { + vertexId = edge.f0; + neighbors.add(edge.f1.f1); + } + + return new Tuple2>(vertexId, neighbors); + } + } + + private static final class EmptyVertexMapFunction + implements MapFunction, HashSet> { + + @Override + public HashSet map(Vertex arg) throws Exception { + return new HashSet(); + } + } + + private static final class NeighborhoodVertexMapFunction + implements MapFunction, HashSet>, HashSet> { + + @Override + public HashSet map(Tuple2, HashSet> arg) throws Exception { + return arg.f1; + } + } + + private static final class ClusteringCoefficientNeighborsFunction + implements NeighborsFunctionWithVertexValue, Double, Tuple2> { + + @Override + public Tuple2 iterateNeighbors(Vertex> vertex, + Iterable, Vertex>>> neighbors) throws Exception { + + int deg = vertex.getValue().size(); + int e = 0; + + // Calculate common neighbor count (e) + for (Tuple2, Vertex>> neighbor : neighbors) { + // Iterate neighbor's neighbors + for (Long nn : neighbor.f1.f1) { + if (vertex.getValue().contains(nn)) { + e++; + } + } + } + + // Calculate clustering coefficient + double cc; + + if (deg > 1) { + cc = (double) e / (double) (deg * (deg - 1)); + } else { + cc = 0.0; + } + + return new Tuple2(vertex.getId(), cc); + } + } + + // -------------------------------------------------------------------------------------------- + // Util Methods + // -------------------------------------------------------------------------------------------- + + private static boolean fileOutput = false; + private static String vertexInputPath = null; + private static String edgeInputPath = null; + private static String outputPath = null; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 3) { + System.err.println("Usage: LocalClusteringCoefficient "); + return false; + } + + fileOutput = true; + vertexInputPath = args[0]; + edgeInputPath = args[1]; + outputPath = args[2]; + } else { + System.out.println("Executing LocalClusteringCoefficient example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: LocalClusteringCoefficient "); + } + return true; + } + + @SuppressWarnings("serial") + private static DataSet> getVertexDataSet(ExecutionEnvironment env) { + + if (fileOutput) { + return env.readCsvFile(vertexInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Double.class) + .map(new MapFunction, Vertex>() { + @Override + public Vertex map(Tuple2 value) throws Exception { + return new Vertex(value.f0, value.f1); + } + }); + } + + return ExampleUtils.getLongDoubleVertexData(env); + } + + @SuppressWarnings("serial") + private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { - clusteringCoefficients.print(); + if (fileOutput) { + return env.readCsvFile(edgeInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new MapFunction, Edge>() { + @Override + public Edge map(Tuple3 value) throws Exception { + return new Edge(value.f0, value.f1, value.f2); + } + }); + } - env.execute(); + return ExampleUtils.getLongDoubleEdgeData(env); } @Override public String getDescription() { - return "Clustering Coefficient"; + return "Local Clustering Coefficient Example"; } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LocalClusteringCoefficient.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LocalClusteringCoefficient.java deleted file mode 100755 index 5a997bac50b1c..0000000000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LocalClusteringCoefficient.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.EdgesFunction; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.NeighborsFunctionWithVertexValue; -import org.apache.flink.graph.Vertex; - -import java.io.Serializable; -import java.util.HashSet; - -@SuppressWarnings("serial") -public class LocalClusteringCoefficient & Serializable> - implements GraphAlgorithm { - - @Override - public Graph run(Graph input) { - - // Get the neighbors of each vertex in a HashSet - DataSet>> neighborhoods = input - .reduceOnEdges(new NeighborhoodEdgesFunction(), EdgeDirection.OUT); - - // Construct a new graph where the neighborhood is the vertex value - Graph, Double> newGraph = input - .mapVertices(new EmptyVertexMapFunction()) - .joinWithVertices(neighborhoods, new NeighborhoodVertexMapFunction()); - - // Calculate clustering coefficient - DataSet> clusteringCoefficients = newGraph - .reduceOnNeighbors(new ClusteringCoefficientNeighborsFunction(), EdgeDirection.OUT); - - // Construct a new graph where the clustering coefficient is the vertex value - Graph result = input - .joinWithVertices(clusteringCoefficients, new ClusteringCoefficientVertexMapFunction()); - - return result; - } - - private static final class NeighborhoodEdgesFunction & Serializable> - implements EdgesFunction>> { - - @Override - public Tuple2> iterateEdges( - Iterable>> edges) throws Exception { - - K vertexId = null; - HashSet neighbors = new HashSet(); - - for (Tuple2> edge : edges) { - vertexId = edge.f0; - neighbors.add(edge.f1.f1); - } - - return new Tuple2>(vertexId, neighbors); - } - } - - private static final class EmptyVertexMapFunction & Serializable> - implements MapFunction, HashSet> { - - @Override - public HashSet map(Vertex arg0) throws Exception { - return new HashSet(); - } - } - - private static final class NeighborhoodVertexMapFunction & Serializable> - implements MapFunction, HashSet>, HashSet> { - - @Override - public HashSet map(Tuple2, HashSet> arg) throws Exception { - return arg.f1; - } - } - - private static final class ClusteringCoefficientNeighborsFunction & Serializable> - implements NeighborsFunctionWithVertexValue, Double, Tuple2> { - - @Override - public Tuple2 iterateNeighbors(Vertex> vertex, - Iterable, Vertex>>> neighbors) throws Exception { - - int deg = vertex.getValue().size(); - int e = 0; - - // Calculate common neighbor count (e) - for (Tuple2, Vertex>> neighbor : neighbors) { - // Iterate neighbor's neighbors - for (K nn : neighbor.f1.f1) { - if (vertex.getValue().contains(nn)) { - e++; - } - } - } - - // Calculate clustering coefficient - double cc; - - if (deg > 1) { - cc = (double) e / (double) (deg * (deg - 1)); - } else { - cc = 0.0; - } - - return new Tuple2(vertex.getId(), cc); - } - } - - private static final class ClusteringCoefficientVertexMapFunction & Serializable> - implements MapFunction, Double> { - - @Override - public Double map(Tuple2 arg) throws Exception { - return arg.f1; - } - - - } -} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LocalClusteringCoefficientExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LocalClusteringCoefficientExampleITCase.java new file mode 100755 index 0000000000000..a08f362521d38 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LocalClusteringCoefficientExampleITCase.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.LocalClusteringCoefficientExample; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class LocalClusteringCoefficientExampleITCase extends MultipleProgramsTestBase { + + public LocalClusteringCoefficientExampleITCase(ExecutionMode mode){ + super(mode); + } + + private String verticesPath; + private String edgesPath; + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + File verticesFile = tempFolder.newFile(); + Files.write(LocalClusteringCoefficientExampleITCase.VERTICES, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(LocalClusteringCoefficientExampleITCase.EDGES, edgesFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testLocalClusteringCoefficientExample() throws Exception { + LocalClusteringCoefficientExample.main(new String[] {verticesPath, edgesPath, resultPath}); + + expectedResult = "1,0.5\n" + + "2,0.0\n" + + "3,0.5\n" + + "4,0.0\n" + + "5,0.0\n"; + } + + // -------------------------------------------------------------------------------------------- + // Sample data + // -------------------------------------------------------------------------------------------- + + private static final String VERTICES = "1 1.0\n" + + "2 2.0\n" + + "3 3.0\n" + + "4 4.0\n" + + "5 5.0\n"; + + private static final String EDGES = "1 2 12.0\n" + + "1 3 13.0\n" + + "2 3 23.0\n" + + "3 4 34.0\n" + + "3 5 35.0\n" + + "4 5 45.0\n" + + "5 1 51.0\n"; +} From 6b2a3966ab7a9ef494f5bce1a166f1b2b6cd79ab Mon Sep 17 00:00:00 2001 From: balidani Date: Thu, 5 Mar 2015 14:44:19 +0100 Subject: [PATCH 3/5] [FLINK-1528] [gelly] Moved, updated and added some tests --- .../LocalClusteringCoefficientExample.java | 77 +++---- .../utils/LocalClusteringCoefficientData.java | 68 ++++++ ...calClusteringCoefficientExampleITCase.java | 104 --------- ...calClusteringCoefficientExampleITCase.java | 197 ++++++++++++++++++ 4 files changed, 294 insertions(+), 152 deletions(-) create mode 100755 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java delete mode 100755 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LocalClusteringCoefficientExampleITCase.java create mode 100755 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java index e14f5f8243009..051a51acc580b 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java @@ -23,14 +23,14 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.EdgesFunction; import org.apache.flink.graph.Graph; import org.apache.flink.graph.NeighborsFunctionWithVertexValue; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.ExampleUtils; +import org.apache.flink.graph.example.utils.LocalClusteringCoefficientData; +import org.apache.flink.types.NullValue; import java.util.HashSet; @@ -48,16 +48,15 @@ public static void main (String [] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> vertices = getVertexDataSet(env); - DataSet> edges = getEdgeDataSet(env); - Graph graph = Graph.fromDataSet(vertices, edges, env); + DataSet> edges = getEdgeDataSet(env); + Graph graph = Graph.fromDataSet(edges, env); // Get the neighbors of each vertex in a HashSet DataSet>> neighborhoods = graph .reduceOnEdges(new NeighborhoodEdgesFunction(), EdgeDirection.OUT); // Construct a new graph where the neighborhood is the vertex value - Graph, Double> newGraph = graph + Graph, NullValue> newGraph = graph .mapVertices(new EmptyVertexMapFunction()) .joinWithVertices(neighborhoods, new NeighborhoodVertexMapFunction()); @@ -80,18 +79,18 @@ public static void main (String [] args) throws Exception { // -------------------------------------------------------------------------------------------- private static final class NeighborhoodEdgesFunction - implements EdgesFunction>> { + implements EdgesFunction>> { @Override public Tuple2> iterateEdges( - Iterable>> edges) throws Exception { + Iterable>> edges) throws Exception { Long vertexId = null; HashSet neighbors = new HashSet(); - for (Tuple2> edge : edges) { + for (Tuple2> edge : edges) { vertexId = edge.f0; - neighbors.add(edge.f1.f1); + neighbors.add(edge.f1.getTarget()); } return new Tuple2>(vertexId, neighbors); @@ -99,10 +98,10 @@ public Tuple2> iterateEdges( } private static final class EmptyVertexMapFunction - implements MapFunction, HashSet> { + implements MapFunction, HashSet> { @Override - public HashSet map(Vertex arg) throws Exception { + public HashSet map(Vertex arg) throws Exception { return new HashSet(); } } @@ -117,25 +116,28 @@ public HashSet map(Tuple2, HashSet> arg) throws Except } private static final class ClusteringCoefficientNeighborsFunction - implements NeighborsFunctionWithVertexValue, Double, Tuple2> { + implements NeighborsFunctionWithVertexValue, NullValue, Tuple2> { @Override public Tuple2 iterateNeighbors(Vertex> vertex, - Iterable, Vertex>>> neighbors) throws Exception { + Iterable, Vertex>>> neighbors) throws Exception { int deg = vertex.getValue().size(); int e = 0; // Calculate common neighbor count (e) - for (Tuple2, Vertex>> neighbor : neighbors) { + for (Tuple2, Vertex>> neighbor : neighbors) { // Iterate neighbor's neighbors - for (Long nn : neighbor.f1.f1) { + for (Long nn : neighbor.f1.getValue()) { if (vertex.getValue().contains(nn)) { e++; } } } + // We assume an undirected graph, so we need to divide e here + e /= 2; + // Calculate clustering coefficient double cc; @@ -154,67 +156,46 @@ public Tuple2 iterateNeighbors(Vertex> vertex, // -------------------------------------------------------------------------------------------- private static boolean fileOutput = false; - private static String vertexInputPath = null; private static String edgeInputPath = null; private static String outputPath = null; private static boolean parseParameters(String[] args) { if(args.length > 0) { - if(args.length != 3) { - System.err.println("Usage: LocalClusteringCoefficient "); + if(args.length != 2) { + System.err.println("Usage: LocalClusteringCoefficient "); return false; } fileOutput = true; - vertexInputPath = args[0]; - edgeInputPath = args[1]; - outputPath = args[2]; + edgeInputPath = args[0]; + outputPath = args[1]; } else { System.out.println("Executing LocalClusteringCoefficient example with default parameters and built-in default data."); System.out.println(" Provide parameters to read input data from files."); System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: LocalClusteringCoefficient "); + System.out.println(" Usage: LocalClusteringCoefficient "); } return true; } @SuppressWarnings("serial") - private static DataSet> getVertexDataSet(ExecutionEnvironment env) { - - if (fileOutput) { - return env.readCsvFile(vertexInputPath) - .fieldDelimiter(" ") - .lineDelimiter("\n") - .types(Long.class, Double.class) - .map(new MapFunction, Vertex>() { - @Override - public Vertex map(Tuple2 value) throws Exception { - return new Vertex(value.f0, value.f1); - } - }); - } - - return ExampleUtils.getLongDoubleVertexData(env); - } - - @SuppressWarnings("serial") - private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { + private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(edgeInputPath) .fieldDelimiter(" ") .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new MapFunction, Edge>() { + .types(Long.class, Long.class) + .map(new MapFunction, Edge>() { @Override - public Edge map(Tuple3 value) throws Exception { - return new Edge(value.f0, value.f1, value.f2); + public Edge map(Tuple2 value) throws Exception { + return new Edge(value.f0, value.f1, NullValue.getInstance()); } }); } - return ExampleUtils.getLongDoubleEdgeData(env); + return LocalClusteringCoefficientData.getDefaultEdgeDataSet(env); } @Override diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java new file mode 100755 index 0000000000000..7d8f99db5271a --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example.utils; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.types.NullValue; + +import java.util.ArrayList; +import java.util.List; + +public class LocalClusteringCoefficientData { + + public static final String EDGES = "1 2\n" + + "2 1\n" + + "1 3\n" + + "3 1\n" + + "2 3\n" + + "3 2\n" + + "3 4\n" + + "4 3\n" + + "3 5\n" + + "5 3\n" + + "4 5\n" + + "5 4\n" + + "5 1\n" + + "1 5\n"; + + public static final DataSet> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List> edges = new ArrayList>(); + edges.add(new Edge(1L, 2L, NullValue.getInstance())); + edges.add(new Edge(2L, 1L, NullValue.getInstance())); + edges.add(new Edge(1L, 3L, NullValue.getInstance())); + edges.add(new Edge(3L, 1L, NullValue.getInstance())); + edges.add(new Edge(2L, 3L, NullValue.getInstance())); + edges.add(new Edge(3L, 2L, NullValue.getInstance())); + edges.add(new Edge(3L, 4L, NullValue.getInstance())); + edges.add(new Edge(4L, 3L, NullValue.getInstance())); + edges.add(new Edge(3L, 5L, NullValue.getInstance())); + edges.add(new Edge(5L, 3L, NullValue.getInstance())); + edges.add(new Edge(4L, 5L, NullValue.getInstance())); + edges.add(new Edge(5L, 4L, NullValue.getInstance())); + edges.add(new Edge(5L, 1L, NullValue.getInstance())); + edges.add(new Edge(1L, 5L, NullValue.getInstance())); + + return env.fromCollection(edges); + } + + private LocalClusteringCoefficientData() {} +} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LocalClusteringCoefficientExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LocalClusteringCoefficientExampleITCase.java deleted file mode 100755 index a08f362521d38..0000000000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LocalClusteringCoefficientExampleITCase.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.LocalClusteringCoefficientExample; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; - -@RunWith(Parameterized.class) -public class LocalClusteringCoefficientExampleITCase extends MultipleProgramsTestBase { - - public LocalClusteringCoefficientExampleITCase(ExecutionMode mode){ - super(mode); - } - - private String verticesPath; - private String edgesPath; - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - File verticesFile = tempFolder.newFile(); - Files.write(LocalClusteringCoefficientExampleITCase.VERTICES, verticesFile, Charsets.UTF_8); - - File edgesFile = tempFolder.newFile(); - Files.write(LocalClusteringCoefficientExampleITCase.EDGES, edgesFile, Charsets.UTF_8); - - verticesPath = verticesFile.toURI().toString(); - edgesPath = edgesFile.toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testLocalClusteringCoefficientExample() throws Exception { - LocalClusteringCoefficientExample.main(new String[] {verticesPath, edgesPath, resultPath}); - - expectedResult = "1,0.5\n" + - "2,0.0\n" + - "3,0.5\n" + - "4,0.0\n" + - "5,0.0\n"; - } - - // -------------------------------------------------------------------------------------------- - // Sample data - // -------------------------------------------------------------------------------------------- - - private static final String VERTICES = "1 1.0\n" + - "2 2.0\n" + - "3 3.0\n" + - "4 4.0\n" + - "5 5.0\n"; - - private static final String EDGES = "1 2 12.0\n" + - "1 3 13.0\n" + - "2 3 23.0\n" + - "3 4 34.0\n" + - "3 5 35.0\n" + - "4 5 45.0\n" + - "5 1 51.0\n"; -} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java new file mode 100755 index 0000000000000..ba5b643b24e23 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.LocalClusteringCoefficientExample; +import org.apache.flink.graph.example.utils.LocalClusteringCoefficientData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class LocalClusteringCoefficientExampleITCase extends MultipleProgramsTestBase { + + public LocalClusteringCoefficientExampleITCase(ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testSimpleGraph() throws Exception { + /* + * Test Local Clustering Coefficient on the default data + */ + + String edgesPath = createTempFile(LocalClusteringCoefficientData.EDGES); + + LocalClusteringCoefficientExample.main(new String[] {edgesPath, resultPath}); + + expectedResult = "1,0.5\n" + + "2,0.0\n" + + "3,0.5\n" + + "4,0.0\n" + + "5,0.0\n"; + } + + @Test + public void testNoEdgesBetweenNeighbors() throws Exception { + /* + * Test Local Clustering Coefficient where there are no edges between the neighbors of a node + */ + + // Generate a 9x9 lattice + String edges = ""; + expectedResult = ""; + + for (int i = 1; i <= 9; ++i) { + for (int j = 1; j <= 9; ++j) { + String vertex = Integer.toString(i) + Integer.toString(j); + expectedResult += vertex + ",0.0\n"; + + if (i > 1) { + String topNeighbor = Integer.toString(i-1) + Integer.toString(j); + edges += vertex + " " + topNeighbor + "\n"; + } + if (i < 9) { + String bottomNeighbor = Integer.toString(i+1) + Integer.toString(j); + edges += vertex + " " + bottomNeighbor + "\n"; + } + if (j > 1) { + String leftNeighbor = Integer.toString(i) + Integer.toString(j-1); + edges += vertex + " " + leftNeighbor + "\n"; + } + if (j < 9) { + String rightNeighbor = Integer.toString(i) + Integer.toString(j+1); + edges += vertex + " " + rightNeighbor + "\n"; + } + } + } + + String edgesPath = createTempFile(edges); + + LocalClusteringCoefficientExample.main(new String[] {edgesPath, resultPath}); + } + + @Test + public void testAllEdgesBetweenNeighbors() throws Exception { + /* + * Test Local Clustering Coefficient where all edges between the neighbors are present + */ + + // Generate 3 disjoint K5 graphs + String edges = ""; + expectedResult = ""; + + for (int k = 1; k <= 3; ++k) { + String component = Integer.toString(k); + + for (int i = 1; i <= 5; ++i) { + String vertexA = component + Integer.toString(i); + expectedResult += vertexA + ",1.0\n"; + + for (int j = 1; j <= 5; ++j) { + if (i == j) { + continue; + } + + String vertexB = component + Integer.toString(j); + + edges += vertexA + " " + vertexB + "\n"; + edges += vertexB + " " + vertexA + "\n"; + } + } + } + + String edgesPath = createTempFile(edges); + + LocalClusteringCoefficientExample.main(new String[] {edgesPath, resultPath}); + } + + @Test + public void testCircleWithCentralVertex() throws Exception { + /* + * Test Local Clustering Coefficient where a vertex in a circle is connected to all vertices of a circle + */ + + // Generate circle of length 9 + String edges = ""; + expectedResult = ""; + + for (int i = 1; i <= 9; ++i) { + String vertex = Integer.toString(i); + expectedResult += vertex + "," + Double.toString(2.0/3.0) + "\n"; + + // Connect to neighbors + if (i > 1) { + String leftNeighbor = Integer.toString(i - 1); + edges += vertex + " " + leftNeighbor + "\n"; + } + if (i < 9) { + String rightNeighbor = Integer.toString(i + 1); + edges += vertex + " " + rightNeighbor + "\n"; + } + + // Connect to the central node + String centralVertex = "10"; + edges += vertex + " 10\n"; + edges += "10 " + vertex + "\n"; + } + + // Expected result for the central node + expectedResult += "10,0.25\n"; + + String edgesPath = createTempFile(edges); + + LocalClusteringCoefficientExample.main(new String[] {edgesPath, resultPath}); + } + + // ------------------------------------------------------------------------- + // Util methods + // ------------------------------------------------------------------------- + + private String createTempFile(final String rows) throws Exception { + File tempFile = tempFolder.newFile(); + Files.write(rows, tempFile, Charsets.UTF_8); + return tempFile.toURI().toString(); + } +} From a3a282d37dae5d618b51fdf25ff5bc6c0ece4759 Mon Sep 17 00:00:00 2001 From: balidani Date: Thu, 5 Mar 2015 15:40:51 +0100 Subject: [PATCH 4/5] [FLINK-1528] [gelly] Fixed test parameters --- .../utils/LocalClusteringCoefficientData.java | 12 +++++------- .../LocalClusteringCoefficientExampleITCase.java | 14 +++++++++----- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java index 7d8f99db5271a..905a1c70d10c4 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java @@ -32,16 +32,14 @@ public class LocalClusteringCoefficientData { "2 1\n" + "1 3\n" + "3 1\n" + + "1 4\n" + + "4 1\n" + "2 3\n" + "3 2\n" + + "2 4\n" + + "4 2\n" + "3 4\n" + - "4 3\n" + - "3 5\n" + - "5 3\n" + - "4 5\n" + - "5 4\n" + - "5 1\n" + - "1 5\n"; + "4 3\n"; public static final DataSet> getDefaultEdgeDataSet(ExecutionEnvironment env) { diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java index ba5b643b24e23..47933ad5955d8 100755 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java @@ -67,10 +67,9 @@ public void testSimpleGraph() throws Exception { LocalClusteringCoefficientExample.main(new String[] {edgesPath, resultPath}); expectedResult = "1,0.5\n" + - "2,0.0\n" + + "2,0.5\n" + "3,0.5\n" + - "4,0.0\n" + - "5,0.0\n"; + "4,0.5\n"; } @Test @@ -159,16 +158,21 @@ public void testCircleWithCentralVertex() throws Exception { for (int i = 1; i <= 9; ++i) { String vertex = Integer.toString(i); - expectedResult += vertex + "," + Double.toString(2.0/3.0) + "\n"; + expectedResult += vertex + "," + Double.toString(1.0/3.0) + "\n"; // Connect to neighbors if (i > 1) { String leftNeighbor = Integer.toString(i - 1); edges += vertex + " " + leftNeighbor + "\n"; + } else { + edges += vertex + " 9\n"; } + if (i < 9) { String rightNeighbor = Integer.toString(i + 1); edges += vertex + " " + rightNeighbor + "\n"; + } else { + edges += vertex + " 1\n"; } // Connect to the central node @@ -178,7 +182,7 @@ public void testCircleWithCentralVertex() throws Exception { } // Expected result for the central node - expectedResult += "10,0.25\n"; + expectedResult += "10,0.125\n"; String edgesPath = createTempFile(edges); From 9554598a61fb118b6198d363629edb6e414654ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Bali?= Date: Fri, 20 Mar 2015 17:53:37 +0100 Subject: [PATCH 5/5] [FLINK-1528] [gelly] Changed the algorithm to work on undirected graphs --- .../LocalClusteringCoefficientExample.java | 18 ++++++++++++++---- .../utils/LocalClusteringCoefficientData.java | 18 +++--------------- ...ocalClusteringCoefficientExampleITCase.java | 16 +++++++--------- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java index 051a51acc580b..d20a7a1e143f2 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LocalClusteringCoefficientExample.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.example; import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -31,6 +32,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.LocalClusteringCoefficientData; import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; import java.util.HashSet; @@ -48,7 +50,18 @@ public static void main (String [] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> edges = getEdgeDataSet(env); + // Assume an undirected graph without multiple edges, so we create all reverse edges and call distinct + DataSet> edges = getEdgeDataSet(env) + .flatMap(new FlatMapFunction, Edge>() { + @Override + public void flatMap(Edge edge, Collector> out) + throws Exception { + out.collect(edge); + out.collect(edge.reverse()); + } + }) + .distinct(); + Graph graph = Graph.fromDataSet(edges, env); // Get the neighbors of each vertex in a HashSet @@ -135,9 +148,6 @@ public Tuple2 iterateNeighbors(Vertex> vertex, } } - // We assume an undirected graph, so we need to divide e here - e /= 2; - // Calculate clustering coefficient double cc; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java index 905a1c70d10c4..3762eefa6c54e 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LocalClusteringCoefficientData.java @@ -29,35 +29,23 @@ public class LocalClusteringCoefficientData { public static final String EDGES = "1 2\n" + - "2 1\n" + "1 3\n" + - "3 1\n" + - "1 4\n" + - "4 1\n" + "2 3\n" + - "3 2\n" + - "2 4\n" + - "4 2\n" + "3 4\n" + - "4 3\n"; + "3 5\n" + + "4 5\n" + + "5 1\n"; public static final DataSet> getDefaultEdgeDataSet(ExecutionEnvironment env) { List> edges = new ArrayList>(); edges.add(new Edge(1L, 2L, NullValue.getInstance())); - edges.add(new Edge(2L, 1L, NullValue.getInstance())); edges.add(new Edge(1L, 3L, NullValue.getInstance())); - edges.add(new Edge(3L, 1L, NullValue.getInstance())); edges.add(new Edge(2L, 3L, NullValue.getInstance())); - edges.add(new Edge(3L, 2L, NullValue.getInstance())); edges.add(new Edge(3L, 4L, NullValue.getInstance())); - edges.add(new Edge(4L, 3L, NullValue.getInstance())); edges.add(new Edge(3L, 5L, NullValue.getInstance())); - edges.add(new Edge(5L, 3L, NullValue.getInstance())); edges.add(new Edge(4L, 5L, NullValue.getInstance())); - edges.add(new Edge(5L, 4L, NullValue.getInstance())); edges.add(new Edge(5L, 1L, NullValue.getInstance())); - edges.add(new Edge(1L, 5L, NullValue.getInstance())); return env.fromCollection(edges); } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java index 47933ad5955d8..af474ef80ecce 100755 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LocalClusteringCoefficientExampleITCase.java @@ -36,7 +36,7 @@ @RunWith(Parameterized.class) public class LocalClusteringCoefficientExampleITCase extends MultipleProgramsTestBase { - public LocalClusteringCoefficientExampleITCase(ExecutionMode mode){ + public LocalClusteringCoefficientExampleITCase(TestExecutionMode mode){ super(mode); } @@ -66,10 +66,11 @@ public void testSimpleGraph() throws Exception { LocalClusteringCoefficientExample.main(new String[] {edgesPath, resultPath}); - expectedResult = "1,0.5\n" + - "2,0.5\n" + + expectedResult = "1," + (2/3.0) + "\n" + + "2,1.0\n" + "3,0.5\n" + - "4,0.5\n"; + "4,1.0\n" + + "5," + (2/3.0) + "\n"; } @Test @@ -136,7 +137,6 @@ public void testAllEdgesBetweenNeighbors() throws Exception { String vertexB = component + Integer.toString(j); edges += vertexA + " " + vertexB + "\n"; - edges += vertexB + " " + vertexA + "\n"; } } } @@ -158,7 +158,7 @@ public void testCircleWithCentralVertex() throws Exception { for (int i = 1; i <= 9; ++i) { String vertex = Integer.toString(i); - expectedResult += vertex + "," + Double.toString(1.0/3.0) + "\n"; + expectedResult += vertex + "," + Double.toString(2/3.0) + "\n"; // Connect to neighbors if (i > 1) { @@ -176,13 +176,11 @@ public void testCircleWithCentralVertex() throws Exception { } // Connect to the central node - String centralVertex = "10"; edges += vertex + " 10\n"; - edges += "10 " + vertex + "\n"; } // Expected result for the central node - expectedResult += "10,0.125\n"; + expectedResult += "10,0.25\n"; String edgesPath = createTempFile(edges);