Skip to content

Commit

Permalink
[FLINK-1587][gelly] Added additional check for edge src/trg id validity
Browse files Browse the repository at this point in the history
This closes #440
  • Loading branch information
andralungu authored and vasia committed Mar 7, 2015
1 parent 92b1eb7 commit fb62f6b
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 8 deletions.
Expand Up @@ -19,10 +19,11 @@
package org.apache.flink.graph;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.List;
import java.util.Arrays;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
Expand Down Expand Up @@ -633,7 +634,14 @@ public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdg
for (Edge<K, EV> edge : outEdges) {
count++;
}
out.collect(new Tuple2<K, Long>(vertex.iterator().next().f0, count));

Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();

if(vertexIterator.hasNext()) {
out.collect(new Tuple2<K, Long>(vertexIterator.next().f0, count));
} else {
throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
}
}
}

Expand Down
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.test;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.junit.Assert;
import org.junit.Test;

import java.util.NoSuchElementException;

import static org.junit.Assert.fail;

public class DegreesWithExceptionITCase {

@Test
public void testOutDegreesInvalidEdgeSrcId() throws Exception {
/*
* Test outDegrees() with an edge having a srcId that does not exist in the vertex DataSet
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);

try {
graph.outDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
env.execute();

fail("graph.outDegrees() did not throw NoSuchElementException");
} catch (Exception e) {
Assert.assertEquals("The edge src/trg id could not be found within the vertexIds", e.getCause().getMessage());
Assert.assertTrue(e.getCause() instanceof NoSuchElementException);
}
}

@Test
public void testInDegreesInvalidEdgeTrgId() throws Exception {
/*
* Test inDegrees() with an edge having a trgId that does not exist in the vertex DataSet
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);

try {
graph.inDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
env.execute();

fail("graph.inDegrees() did not throw NoSuchElementException");
} catch (Exception e) {
Assert.assertEquals("The edge src/trg id could not be found within the vertexIds", e.getCause().getMessage());
Assert.assertTrue(e.getCause() instanceof NoSuchElementException);
}
}

@Test
public void testGetDegreesInvalidEdgeTrgId() throws Exception {
/*
* Test getDegrees() with an edge having a trgId that does not exist in the vertex DataSet
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);

try {
graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
env.execute();

fail("graph.getDegrees() did not throw NoSuchElementException");
} catch (Exception e) {
Assert.assertEquals("The edge src/trg id could not be found within the vertexIds", e.getCause().getMessage());
Assert.assertTrue(e.getCause() instanceof NoSuchElementException);
}
}

@Test
public void testGetDegreesInvalidEdgeSrcId() throws Exception {
/*
* Test getDegrees() with an edge having a srcId that does not exist in the vertex DataSet
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);

try {
graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
env.execute();

fail("graph.getDegrees() did not throw NoSuchElementException");
} catch (Exception e) {
Assert.assertEquals("The edge src/trg id could not be found within the vertexIds", e.getCause().getMessage());
Assert.assertTrue(e.getCause() instanceof NoSuchElementException);
}
}

@Test
public void testGetDegreesInvalidEdgeSrcTrgId() throws Exception {
/*
* Test getDegrees() with an edge having a srcId and a trgId that does not exist in the vertex DataSet
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);

try {
graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
env.execute();

fail("graph.getDegrees() did not throw NoSuchElementException");
} catch (Exception e) {
Assert.assertEquals("The edge src/trg id could not be found within the vertexIds", e.getCause().getMessage());
Assert.assertTrue(e.getCause() instanceof NoSuchElementException);
}
}
}
Expand Up @@ -58,25 +58,23 @@ public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidTrgData(
List<Edge<Long, Long>> edges = getLongLongEdges();

edges.remove(0);
edges.add(new Edge<Long, Long>(13L, 3L, 13L));
edges.add(new Edge<Long, Long>(3L, 13L, 13L));

return env.fromCollection(edges);
}

public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcTrgData(
ExecutionEnvironment env) {
List<Edge<Long, Long>> edges = getLongLongEdges();

List<Edge<Long, Long>> edges = getLongLongEdges();
edges.remove(0);
edges.remove(1);
edges.remove(2);
edges.add(new Edge<Long, Long>(13L, 3L, 13L));
edges.add(new Edge<Long, Long>(1L, 12L, 12L));
edges.add(new Edge<Long, Long>(13L, 33L, 13L));

return env.fromCollection(edges);
}

public static final DataSet<Edge<String, Long>> getStringLongEdgeData(
ExecutionEnvironment env) {
List<Edge<String, Long>> edges = new ArrayList<Edge<String, Long>>();
Expand All @@ -87,7 +85,6 @@ public static final DataSet<Edge<String, Long>> getStringLongEdgeData(
edges.add(new Edge<String, Long>("3", "5", 35L));
edges.add(new Edge<String, Long>("4", "5", 45L));
edges.add(new Edge<String, Long>("5", "1", 51L));

return env.fromCollection(edges);
}

Expand Down

0 comments on commit fb62f6b

Please sign in to comment.