/
taskwork2.c
60 lines (51 loc) · 1.7 KB
/
taskwork2.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//
// Task worker - design 2
// Adds pub-sub flow to receive and respond to kill signal
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Socket to receive messages on
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Socket for control input
void *controller = zmq_socket (context, ZMQ_SUB);
zmq_connect (controller, "tcp://localhost:5559");
zmq_setsockopt (controller, ZMQ_SUBSCRIBE, "", 0);
// Process messages from receiver and controller
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ controller, 0, ZMQ_POLLIN, 0 }
};
// Process messages from both sockets
while (1) {
zmq_msg_t message;
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (receiver, &message, 0);
// Do the work
s_sleep (atoi ((char *) zmq_msg_data (&message)));
// Send results to sink
zmq_msg_init (&message);
zmq_send (sender, &message, 0);
// Simple progress indicator for the viewer
printf (".");
fflush (stdout);
zmq_msg_close (&message);
}
// Any waiting controller command acts as 'KILL'
if (items [1].revents & ZMQ_POLLIN)
break; // Exit loop
}
// Finished
zmq_close (receiver);
zmq_close (sender);
zmq_close (controller);
zmq_term (context);
return 0;
}