Skip to content
This repository has been archived by the owner on Aug 18, 2021. It is now read-only.

Commit

Permalink
generate vertices on their host machine
Browse files Browse the repository at this point in the history
  • Loading branch information
claudiugh committed Mar 3, 2011
1 parent 5fdd6ae commit 2667497
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/nl/vu/cs/amstel/Worker.java
Expand Up @@ -230,7 +230,7 @@ public void run() throws Exception, InterruptedException {
loadReceivedInput();
// initialize vertex arrays
initInboxes();

state.activeVertices = vertices.size();
state.msg = messageFactory.create();
state.edgeIterator = new ArrayOutEdgeIterator<E>();
Expand Down
33 changes: 21 additions & 12 deletions src/nl/vu/cs/amstel/graph/io/WheelGraphGenerator.java
Expand Up @@ -22,21 +22,18 @@ public class WheelGraphGenerator implements Reader {
private int edges;
private int maxVertexValue;
private int maxEdgeValue;
private int fromVertex;
private int toVertex;

private int workerIndex;
private int workers;
private int crtVertex;

private class Partitioner implements InputPartitioner {

@Override
public InputPartition[] getPartitions(int workers) throws IOException {
int perWorker = vertices / workers;
InputPartition[] partitions = new WheelGraphPartition[workers];
for (int i = 0; i < workers; i++) {
int toVertex = (i < workers - 1) ?
(i + 1) * perWorker : vertices;
partitions[i] = new WheelGraphPartition(i * perWorker, toVertex,
partitions[i] = new WheelGraphPartition(workers, i,
vertices, edges, maxVertexValue, maxEdgeValue);
}
return partitions;
Expand All @@ -63,13 +60,13 @@ public WheelGraphGenerator(String filename) throws IOException {
public void init(InputPartition inputPartition) throws Exception {
if (inputPartition instanceof WheelGraphPartition) {
WheelGraphPartition partition = (WheelGraphPartition) inputPartition;
this.fromVertex = partition.fromVertex;
this.toVertex = partition.toVertex;
this.workers = partition.workers;
this.workerIndex = partition.workerIndex;
this.vertices = partition.vertices;
this.edges = partition.edges;
this.maxVertexValue = partition.maxVertexValue;
this.maxEdgeValue = partition.maxEdgeValue;
crtVertex = fromVertex;
crtVertex = 0;
} else {
throw new Exception("Input partition not valid");
}
Expand All @@ -87,14 +84,26 @@ public InputPartitioner getPartitioner() {

@Override
public boolean hasNext() throws IOException {
return crtVertex < toVertex;
return vertices - crtVertex >= workers;
}

@SuppressWarnings("unchecked")
@Override
public <V extends Value, E extends Value> VertexState<V, E> nextVertex(
VertexFactory<V, E> factory) throws IOException {
String vid = "V" + crtVertex;
String vid = "";
while (crtVertex < vertices) {
vid = "V" + crtVertex;
int code = vid.hashCode();
if (code < 0) {
code = -code;
}
if (code % workers == workerIndex) {
break;
}
crtVertex++;
}

String[] edgeTargets = new String[edges];
V value = factory.createValue(vertexRand.nextInt(maxVertexValue));
E[] edgeValues = null;
Expand Down
10 changes: 5 additions & 5 deletions src/nl/vu/cs/amstel/graph/io/WheelGraphPartition.java
Expand Up @@ -4,17 +4,17 @@ public class WheelGraphPartition implements InputPartition {

private static final long serialVersionUID = 3855311417929126951L;

int fromVertex;
int toVertex;
int vertices;
int edges;
int maxVertexValue;
int maxEdgeValue;
int workers;
int workerIndex;

public WheelGraphPartition(int fromVertex, int toVertex, int vertices,
public WheelGraphPartition(int workers, int workerIndex, int vertices,
int edges, int maxVertexValue, int maxEdgeValue) {
this.fromVertex = fromVertex;
this.toVertex = toVertex;
this.workers = workers;
this.workerIndex = workerIndex;
this.vertices = vertices;
this.edges = edges;
this.maxVertexValue = maxVertexValue;
Expand Down
15 changes: 10 additions & 5 deletions src/nl/vu/cs/amstel/msg/MessageRouter.java
@@ -1,6 +1,5 @@
package nl.vu.cs.amstel.msg;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -102,12 +101,18 @@ private OutgoingQueue<M> createOutgoingQueue() {
return new CombinedOutgoingQueue<M>(msgFactory);
}

private OutgoingQueue<M> getQueue(IbisIdentifier worker) {
if (!outQueues.containsKey(worker)) {
outQueues.put(worker, createOutgoingQueue());
}
return outQueues.get(worker);
}

public synchronized SendPort getSender(IbisIdentifier worker) throws IOException {
if (!senders.containsKey(worker)) {
SendPort sender = ibis.createSendPort(Node.W2W_PORT);
sender.connect(worker, "worker");
senders.put(worker, sender);
outQueues.put(worker, createOutgoingQueue());
}
return senders.get(worker);
}
Expand Down Expand Up @@ -176,7 +181,7 @@ public void send(String toVertex, M msg) throws IOException {
inbox.deliverLocally(vertices.get(toVertex).getIndex(), msg);
} else {
// enqueue for sending
OutgoingQueue<M> outQueue = outQueues.get(owner);
OutgoingQueue<M> outQueue = getQueue(owner);
outQueue.add(toVertex, msg);
if (outQueue.reachedThreshold()) {
sendQueue(owner, outQueue, false);
Expand All @@ -199,8 +204,8 @@ private void sendFlush(IbisIdentifier worker) throws IOException {
* @throws IOException
*/
public void flush() throws IOException {
for (IbisIdentifier worker : senders.keySet()) {
OutgoingQueue<M> outQueue = outQueues.get(worker);
for (IbisIdentifier worker : outQueues.keySet()) {
OutgoingQueue<M> outQueue = getQueue(worker);
if (!outQueue.isEmpty()) {
sendQueue(worker, outQueue, true);
}
Expand Down

0 comments on commit 2667497

Please sign in to comment.