From 0a09e85a1bf4319852296a3ba64ebb2af0a361bf Mon Sep 17 00:00:00 2001 From: andralungu Date: Sat, 6 Jun 2015 13:02:16 +0200 Subject: [PATCH] [FLINK-2178][gelly] Fixed groupReduceOnNeighbors bug [FLINK-2178][gelly] Threw exception when iterator did not have next [FLINK-2178][gelly] Added further checks and tests --- .../java/org/apache/flink/graph/Graph.java | 25 ++- .../ReduceOnEdgesWithExceptionITCase.java | 141 ++++++++++++ .../ReduceOnNeighborsWithExceptionITCase.java | 202 ++++++++++++++++++ 3 files changed, 365 insertions(+), 3 deletions(-) create mode 100644 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java create mode 100644 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index dab1a8f69771c..5e13ce17e1ca6 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -872,7 +872,14 @@ public ApplyCoGroupFunction(EdgesFunctionWithVertexValue fun) { public void coGroup(Iterable> vertex, Iterable> edges, Collector out) throws Exception { - function.iterateEdges(vertex.iterator().next(), edges, out); + + Iterator> vertexIterator = vertex.iterator(); + + if(vertexIterator.hasNext()) { + function.iterateEdges(vertexIterator.next(), edges, out); + } else { + throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds"); + } } @Override @@ -920,7 +927,13 @@ public Iterator> iterator() { } }; - function.iterateEdges(vertex.iterator().next(), edgesIterable, out); + Iterator> vertexIterator = vertex.iterator(); + + if(vertexIterator.hasNext()) { + function.iterateEdges(vertexIterator.next(), edgesIterable, out); + } else { + throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds"); + } } @Override @@ -1554,7 +1567,13 @@ public Iterator, Vertex>> iterator() { } }; - function.iterateNeighbors(vertex.iterator().next(), neighborsIterable, out); + Iterator> vertexIterator = vertex.iterator(); + + if (vertexIterator.hasNext()) { + function.iterateNeighbors(vertexIterator.next(), neighborsIterable, out); + } else { + throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds"); + } } @Override diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java new file mode 100644 index 0000000000000..c53227b1dc13c --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.EdgesFunctionWithVertexValue; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class ReduceOnEdgesWithExceptionITCase { + + private static final int PARALLELISM = 4; + + private static ForkableFlinkMiniCluster cluster; + + + @BeforeClass + public static void setupCluster() { + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); + cluster = new ForkableFlinkMiniCluster(config, false); + } + catch (Exception e) { + e.printStackTrace(); + fail("Error starting test cluster: " + e.getMessage()); + } + } + + @AfterClass + public static void tearDownCluster() { + try { + cluster.stop(); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Cluster shutdown caused an exception: " + t.getMessage()); + } + } + + /** + * Test groupReduceOnEdges() with an edge having a srcId that does not exist in the vertex DataSet + */ + @Test + public void testGroupReduceOnEdgesInvalidEdgeSrcId() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env); + + try { + DataSet> verticesWithAllNeighbors = + graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL); + + verticesWithAllNeighbors.output(new DiscardingOutputFormat>()); + env.execute(); + } catch (Exception e) { + // We expect the job to fail with an exception + } + } + + /** + * Test groupReduceOnEdges() with an edge having a trgId that does not exist in the vertex DataSet + */ + @Test + public void testGroupReduceOnEdgesInvalidEdgeTrgId() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env); + + try { + DataSet> verticesWithAllNeighbors = + graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL); + + verticesWithAllNeighbors.output(new DiscardingOutputFormat>()); + env.execute(); + } catch (Exception e) { + // We expect the job to fail with an exception + } + } + + + @SuppressWarnings("serial") + private static final class SelectNeighborsValueGreaterThanFour implements + EdgesFunctionWithVertexValue> { + + @Override + public void iterateEdges(Vertex v, Iterable> edges, + Collector> out) throws Exception { + for(Edge edge : edges) { + if(v.getValue() > 4) { + if(v.getId().equals(edge.getTarget())) { + out.collect(new Tuple2(v.getId(), edge.getSource())); + } else { + out.collect(new Tuple2(v.getId(), edge.getTarget())); + } + } + } + } + } +} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java new file mode 100644 index 0000000000000..21799c729fe37 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.NeighborsFunctionWithVertexValue; +import org.apache.flink.graph.ReduceNeighborsFunction; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class ReduceOnNeighborsWithExceptionITCase { + + private static final int PARALLELISM = 4; + + private static ForkableFlinkMiniCluster cluster; + + + @BeforeClass + public static void setupCluster() { + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); + cluster = new ForkableFlinkMiniCluster(config, false); + } + catch (Exception e) { + e.printStackTrace(); + fail("Error starting test cluster: " + e.getMessage()); + } + } + + @AfterClass + public static void tearDownCluster() { + try { + cluster.stop(); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Cluster shutdown caused an exception: " + t.getMessage()); + } + } + + /** + * Test groupReduceOnNeighbors() -NeighborsFunctionWithVertexValue- + * with an edge having a srcId that does not exist in the vertex DataSet + */ + @Test + public void testGroupReduceOnNeighborsWithVVInvalidEdgeSrcId() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env); + + try { + DataSet> verticesWithSumOfOutNeighborValues = + graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat>()); + env.execute(); + } catch (Exception e) { + // We expect the job to fail with an exception + } + } + + /** + * Test groupReduceOnNeighbors() -NeighborsFunctionWithVertexValue- + * with an edge having a trgId that does not exist in the vertex DataSet + */ + @Test + public void testGroupReduceOnNeighborsWithVVInvalidEdgeTrgId() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env); + + try { + DataSet> verticesWithSumOfOutNeighborValues = + graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.output(new DiscardingOutputFormat>()); + env.execute(); + } catch (Exception e) { + // We expect the job to fail with an exception + } + } + + /** + * Test groupReduceOnNeighbors() -NeighborsFunction- + * with an edge having a srcId that does not exist in the vertex DataSet + */ + @Test + public void testGroupReduceOnNeighborsInvalidEdgeSrcId() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env); + + try { + DataSet> verticesWithSumOfAllNeighborValues = + graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL); + + verticesWithSumOfAllNeighborValues.output(new DiscardingOutputFormat>()); + env.execute(); + } catch (Exception e) { + // We expect the job to fail with an exception + } + } + + /** + * Test groupReduceOnNeighbors() -NeighborsFunction- + * with an edge having a trgId that does not exist in the vertex DataSet + */ + @Test + public void testGroupReduceOnNeighborsInvalidEdgeTrgId() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env); + + try { + DataSet> verticesWithSumOfAllNeighborValues = + graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL); + + verticesWithSumOfAllNeighborValues.output(new DiscardingOutputFormat>()); + env.execute(); + } catch (Exception e) { + // We expect the job to fail with an exception + } + } + + @SuppressWarnings("serial") + private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + for (Tuple2, Vertex> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue())); + } + } + + @SuppressWarnings("serial") + private static final class SumNeighbors implements ReduceNeighborsFunction { + + @Override + public Long reduceNeighbors(Long firstNeighbor, Long secondNeighbor) { + return firstNeighbor + secondNeighbor; + } + } +}