Permalink
Browse files

Merge pull request #90 from mjakl/master

taskvent sends start-batch to sink; Java example sleeps the correct amount of time
  • Loading branch information...
hintjens committed Jul 29, 2011
2 parents 040b60c + 2739e40 commit 43de2967d744e7566e93b43757d4fc22fed12a4d
Showing with 22 additions and 4 deletions.
  1. +6 −1 examples/C/taskvent.c
  2. +2 −0 examples/Java/tasksink.java
  3. +8 −1 examples/Java/taskvent.java
  4. +6 −2 examples/Java/taskwork.java
View
@@ -13,12 +13,16 @@ int main (void)
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
+ // Socket to send start of batch message on
+ void *sink = zmq_socket (context, ZMQ_PUSH);
+ zmq_connect (sink, "tcp://localhost:5558");
+
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers...\n");
// The first message is "0" and signals start of batch
- s_send (sender, "0");
+ s_send (sink, "0");
// Initialize random number generator
srandom ((unsigned) time (NULL));
@@ -38,6 +42,7 @@ int main (void)
printf ("Total expected cost: %d msec\n", total_msec);
sleep (1); // Give 0MQ time to deliver
+ zmq_close (sink);
zmq_close (sender);
zmq_term (context);
return 0;
@@ -38,5 +38,7 @@ public static void main(String[] args) throws Exception {
long tend = System.currentTimeMillis(), tdiff;
System.out.println("Total elapsed time: " + (tend - tstart) + " msec");
+ receiver.close();
+ context.term();
}
}
@@ -17,12 +17,16 @@ public static void main(String[] args) throws Exception {
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");
+ // Socket to send messages on
+ ZMQ.Socket sink = context.socket(ZMQ.PUSH);
+ sink.connect("tcp://localhost:5558");
+
System.out.println("Press Enter when the workers are ready: ");
System.in.read();
System.out.println("Sending tasks to workers...\n");
// The first message is "0" and signals start of batch
- sender.send("0\u0000".getBytes(), 0);
+ sink.send("0\u0000".getBytes(), 0);
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
@@ -41,6 +45,9 @@ public static void main(String[] args) throws Exception {
}
System.out.println("Total expected cost: " + total_msec + " msec");
+ sink.close();
+ sender.close();
+ context.term();
Thread.sleep(1000); // Give 0MQ time to deliver
}
}
@@ -25,16 +25,20 @@ public static void main(String[] args) throws Exception {
// Process tasks forever
while (true) {
String string = new String(receiver.recv(0)).trim();
- long nsec = Long.parseLong(string) * 1000000;
+ long msec = Long.parseLong(string);
// Simple progress indicator for the viewer
System.out.flush();
System.out.print(string + '.');
// Do the work
- Thread.sleep(nsec);
+ Thread.sleep(msec);
// Send results to sink
sender.send("".getBytes(), 0);
}
+ // If the code was reachable, this is how you could close the sockets and the context.
+ //sender.close();
+ //receiver.close();
+ //context.term();
}
}

0 comments on commit 43de296

Please sign in to comment.