Permalink
Browse files

Add tasksink2 and taskwork2 from guide

  • Loading branch information...
1 parent 78c0de9 commit 3f0f50f519817671646715f7317d6df032e2f5d0 @doublec committed Nov 5, 2011
Showing with 184 additions and 1 deletion.
  1. +11 −1 TEST/Makefile
  2. +77 −0 TEST/tasksink2.dats
  3. +96 −0 TEST/taskwork2.dats
View
12 TEST/Makefile
@@ -22,7 +22,9 @@ compall: \
taskwork \
tasksink \
msreader \
- mspoller
+ mspoller \
+ taskwork2 \
+ tasksink2
######
@@ -56,6 +58,12 @@ msreader: msreader.dats
mspoller: mspoller.dats
$(ATSCC) $(CFLAGS) -o $@ $< $(ATSCTRB) $(LIBS)
+taskwork2: taskwork2.dats
+ $(ATSCC) $(CFLAGS) -o $@ $< $(ATSCTRB) $(LIBS)
+
+tasksink2: tasksink2.dats
+ $(ATSCC) $(CFLAGS) -o $@ $< $(ATSCTRB) $(LIBS)
+
######
clean:
@@ -72,5 +80,7 @@ cleanall: clean
rm -f tasksink
rm -f msreader
rm -f mspoller
+ rm -f taskwork2
+ rm -f tasksink2
###### end of [Makefile] ######
View
77 TEST/tasksink2.dats
@@ -0,0 +1,77 @@
+(*
+ Task sink - design 2
+ Adds pub-sub flow to send kill signal to workers
+*)
+staload "contrib/libzmq/SATS/libzmq.sats"
+staload "libc/sys/SATS/time.sats"
+staload "libc/sys/SATS/types.sats"
+staload "libc/SATS/stdio.sats"
+
+fun s_clock (): double = let
+ var tv: timeval?
+ val r = gettimeofday (tv)
+ val () = assertloc (r = 0)
+ prval () = opt_unsome {timeval} (tv)
+in
+ double_of (lint_of (tv.tv_sec) * 1000L + lint_of(tv.tv_usec) / 1000L)
+end
+
+implement main () = {
+ (* Prepare our context and socket *)
+ val context = zmq_init (1)
+ val () = assertloc (~context)
+
+ (* Socket to receive messages on *)
+ val receiver = zmq_socket (context, ZMQ_PULL)
+ val () = assertloc (~receiver)
+
+ val r = zmq_bind (receiver, "tcp://*:5558")
+ val () = assertloc (r = 0)
+
+ (* Socket for worker control *)
+ val controller = zmq_socket (context, ZMQ_PUB)
+ val () = assertloc (~controller)
+
+ val r = zmq_bind (controller, "tcp://*:5559")
+ val () = assertloc (r = 0)
+
+ (* Wait for start of batch *)
+ val str = s_recv (receiver)
+ val () = strptr_free (str)
+
+ (* Start our clock now *)
+ val start_time = s_clock ()
+
+ (* Process 100 confirmations *)
+ val () = loop (receiver, 0, 100) where {
+ fun loop {l:agz} {n,m:nat | n <= m} .< m-n >. (receiver: !zmqsocket l, n: int n, max: int m): void =
+ if n = max then
+ ()
+ else let
+ val str = s_recv (receiver)
+ val () = strptr_free (str)
+
+ val () = print_string (if n mod 10 = 0 then ":" else ".")
+ val () = fflush_stdout ()
+ in
+ loop (receiver, n + 1, max)
+ end
+ }
+
+ (* Calculate and report duration of batch *)
+ val end_time = s_clock ()
+ val () = printf ("Total elapsed time: %f msec\n", @(end_time - start_time));
+
+ (* Send kill signal to workers *)
+ val r = s_send (controller, "KILL")
+ val () = assertloc (r = 0)
+
+ val r = zmq_close (receiver)
+ val () = assertloc (r = 0)
+
+ val r = zmq_close (controller)
+ val () = assertloc (r = 0)
+
+ val r = zmq_term (context)
+ val () = assertloc (r = 0)
+}
View
96 TEST/taskwork2.dats
@@ -0,0 +1,96 @@
+(*
+ Task worker - design 2
+ Adds pub-sub flow to receive and respond to kill signal
+*)
+staload "contrib/libzmq/SATS/libzmq.sats"
+staload "libc/SATS/unistd.sats"
+staload "libc/SATS/stdio.sats"
+staload "prelude/SATS/unsafe.sats"
+
+fun check_revents (item: &zmq_pollitem_t, flag: int16): bool = let
+ val r = uint_of_int (int_of_int16 (item.revents))
+ val f = uint_of_int (int_of_int16 (flag))
+in
+ (r land f) <> uint_of_int (0)
+end
+
+implement main () = {
+ val context = zmq_init (1)
+ val () = assertloc (~context)
+
+ (* Socket to receive messages on *)
+ val receiver = zmq_socket (context, ZMQ_PULL)
+ val () = assertloc (~receiver)
+
+ val r = zmq_connect (receiver, "tcp://localhost:5557")
+ val () = assertloc (r = 0)
+
+ (* Socket to send messages to *)
+ val sender = zmq_socket (context, ZMQ_PUSH)
+ val () = assertloc (~sender)
+
+ val r = zmq_connect (sender, "tcp://localhost:5558")
+ val () = assertloc (r = 0)
+
+ (* Socket for control input *)
+ val controller = zmq_socket (context, ZMQ_SUB)
+ val () = assertloc (~controller)
+
+ val r = zmq_connect (controller, "tcp://localhost:5559")
+ val () = assertloc (r = 0)
+
+ val filter = ""
+ val r = zmq_setsockopt_string (controller, ZMQ_SUBSCRIBE, filter, string_length (filter))
+ val () = assertloc (r = 0)
+
+ (* Process messages from receiver and controller *)
+ val zero = int16_of_int 0
+ var !p_items = @[zmq_pollitem_t](@{socket= ptr_of_zmqsocket (receiver), fd= 0, events= ZMQ_POLLIN, revents= zero},
+ @{socket= ptr_of_zmqsocket (controller), fd= 0, events= ZMQ_POLLIN, revents= zero})
+
+ (* Process messages from both sockets *)
+ val () = loop (sender, !p_items) where {
+ fun loop {l:agz} (sender: !zmqsocket l, items: &(@[zmq_pollitem_t][2])): void = {
+ val r = zmq_poll (view@ items | &items, 2, lint_of_int (~1))
+ val () = assertloc (r >= 0)
+
+ val () = if check_revents (items.[0], ZMQ_POLLIN) then {
+ val (pff_s | s) = zmqsocket_of_ptr (items.[0].socket)
+ val () = assertloc (~s)
+
+ val str = s_recv (s)
+ prval () = pff_s (s)
+
+ (* Do the work *)
+ val ms = int1_of (castvwtp1 {string} (str)) * 1000
+ val ms = max(0, min(MILLION, ms))
+ val () = usleep (ms)
+ val () = strptr_free (str)
+
+ (* Send results to sink *)
+ val r = s_send (sender, "")
+ val () = assertloc (r = 0)
+
+ (* Simple progress indicator for the viewer *)
+ val () = print (".")
+ val () = fflush_stdout ()
+ }
+
+ (* Any waiting controller command acts as 'KILL' *)
+ val () = if check_revents (items.[1], ZMQ_POLLIN) then () else loop (sender, items)
+ }
+ }
+
+ (* Finished *)
+ val r = zmq_close (receiver)
+ val () = assertloc (r = 0)
+
+ val r = zmq_close (sender)
+ val () = assertloc (r = 0)
+
+ val r = zmq_close (controller)
+ val () = assertloc (r = 0)
+
+ val r = zmq_term (context)
+ val () = assertloc (r = 0)
+}

0 comments on commit 3f0f50f

Please sign in to comment.