Skip to content

Commit

Permalink
[FLINK-1523] [gelly] Added VertexWithDegrees as a subclass of Vertex
Browse files Browse the repository at this point in the history
  • Loading branch information
andralungu authored and vasia committed May 19, 2015
1 parent 1dfbcba commit e172067
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 44 deletions.
Expand Up @@ -31,19 +31,11 @@ public class Vertex<K, V> extends Tuple2<K, V> {

private static final long serialVersionUID = 1L;

private Long inDegree;
private Long outDegree;

public Vertex(){
inDegree = -1L;
outDegree = -1L;
}
public Vertex(){}

public Vertex(K k, V val) {
this.f0 = k;
this.f1 = val;
inDegree = 0L;
outDegree = 0L;
}

public K getId() {
Expand All @@ -61,28 +53,4 @@ public void setId(K id) {
public void setValue(V val) {
this.f1 = val;
}

public Long getInDegree() throws Exception{
if(inDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return inDegree;
}

public void setInDegree(Long inDegree) {
this.inDegree = inDegree;
}

public Long getOutDegree() throws Exception{
if(outDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return outDegree;
}

public void setOutDegree(Long outDegree) {
this.outDegree = outDegree;
}
}
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph;

import java.io.Serializable;

/**
* Represents the graph's nodes. It carries an ID and a value as well as the vertex inDegree and outDegree.
* For vertices with no value, use {@link org.apache.flink.types.NullValue} as the value type.
*
* @param <K>
* @param <V>
*/
public class VertexWithDegrees<K extends Comparable<K> & Serializable, V extends Serializable>
extends Vertex<K, V> {

private long inDegree;

private long outDegree;

public VertexWithDegrees() {
super();
inDegree = -1l;
outDegree = -1l;
}

public VertexWithDegrees(K k, V v) {
super(k,v);
inDegree = 0l;
outDegree = 0l;
}

public Long getInDegree() throws Exception{
if(inDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return inDegree;
}

public void setInDegree(Long inDegree) {
this.inDegree = inDegree;
}

public Long getOutDegree() throws Exception{
if(outDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return outDegree;
}

public void setOutDegree(Long outDegree) {
this.outDegree = outDegree;
}
}
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.graph.example.utils.IncrementalSSSPData;
import org.apache.flink.graph.spargel.IterationConfiguration;
import org.apache.flink.graph.spargel.MessageIterator;
Expand Down Expand Up @@ -159,7 +160,7 @@ public static final class VertexDistanceUpdater extends VertexUpdateFunction<Lon
@Override
public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
if (inMessages.hasNext()) {
Long outDegree = vertex.getOutDegree() - 1;
Long outDegree = ((VertexWithDegrees)vertex).getOutDegree() - 1;
// check if the vertex has another SP-Edge
if (outDegree > 0) {
// there is another shortest path from the source to this vertex
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.InaccessibleMethodException;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -281,7 +282,7 @@ public Iterator<Edge<VertexKey, EdgeValue>> iterator() {
*/
void sendMessagesFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState)
throws Exception {
Vertex<VertexKey, VertexValue> vertex = new Vertex<VertexKey, VertexValue>(newVertexState.getId(),
VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey, VertexValue>(newVertexState.getId(),
newVertexState.getValue().f0);
vertex.setInDegree(newVertexState.getValue().f1);
vertex.setOutDegree(newVertexState.getValue().f2);
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.util.Collector;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -630,7 +631,7 @@ public void join(Vertex<VertexKey, VertexValue> vertex,
Tuple3<VertexKey, Long, Long> degrees,
Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {

out.collect(new Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>(vertex.getId(),
out.collect(new VertexWithDegrees<VertexKey, Tuple3<VertexValue, Long, Long>>(vertex.getId(),
new Tuple3<VertexValue, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
}
});
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.InaccessibleMethodException;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -205,7 +206,7 @@ void setOutput(Vertex<VertexKey, VertexValue> outVal, Collector<Vertex<VertexKey
*/
void updateVertexFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState,
MessageIterator<Message> inMessages) throws Exception {
Vertex<VertexKey, VertexValue> vertex = new Vertex<VertexKey, VertexValue>(vertexState.getId(),
VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey, VertexValue>(vertexState.getId(),
vertexState.getValue().f0);
vertex.setInDegree(vertexState.getValue().f1);
vertex.setOutDegree(vertexState.getValue().f2);
Expand Down
Expand Up @@ -442,7 +442,7 @@ public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Lo
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(vertex.getInDegree());
setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
} catch (Exception e) {
e.printStackTrace();
}
Expand All @@ -455,7 +455,7 @@ public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<L
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(vertex.getOutDegree());
setNewVertexValue(((VertexWithDegrees) vertex).getOutDegree());
} catch (Exception e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -523,7 +523,7 @@ public void updateVertex(Vertex<Long, Boolean> vertex, MessageIterator<Long> mes
count++;
}

setNewVertexValue(count == (vertex.getInDegree() + vertex.getOutDegree()));
setNewVertexValue(count == (((VertexWithDegrees)vertex).getInDegree() + ((VertexWithDegrees)vertex).getOutDegree()));
}
}

Expand All @@ -533,8 +533,8 @@ public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Lon
@Override
public void updateVertex(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex, MessageIterator<Long> inMessages) {
try {
setNewVertexValue(new Tuple3(vertex.getValue().f0, vertex.getValue().f1, (vertex.getInDegree() == vertex.getValue().f0)
&& (vertex.getOutDegree() == vertex.getValue().f1) && vertex.getValue().f2));
setNewVertexValue(new Tuple3(vertex.getValue().f0, vertex.getValue().f1, (((VertexWithDegrees)vertex).getInDegree() == vertex.getValue().f0)
&& (((VertexWithDegrees)vertex).getOutDegree() == vertex.getValue().f1) && vertex.getValue().f2));
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexWithDegrees;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
Expand Down Expand Up @@ -155,7 +156,7 @@ public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Lo

@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
setNewVertexValue(vertex.getInDegree());
setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
}
}

Expand All @@ -164,7 +165,7 @@ public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<L

@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
setNewVertexValue(vertex.getOutDegree());
setNewVertexValue(((VertexWithDegrees)vertex).getOutDegree());
}
}

Expand Down

0 comments on commit e172067

Please sign in to comment.