Browse files

Added more chapter 2 examples (zguide)

Corrected poll socket symbol
  • Loading branch information...
1 parent 5904107 commit 767ba8101f41c9ef3036ea71b2e8c6f33d0f38fb Michael Compton committed Jul 29, 2012
Showing with 67 additions and 5 deletions.
  1. +3 −3 src/poll.lisp
  2. +64 −2 zguide/chapter-2.lisp
View
6 src/poll.lisp
@@ -1,6 +1,6 @@
(defpackage #:poll
(:documentation "Socket Poll API")
- (:use #:common-lisp #:socket)
+ (:use #:common-lisp)
(:export
#:poll
#:has-events-p
@@ -10,11 +10,11 @@
(defgeneric set-events (socket poll-item-ptr &rest events))
-(defmethod set-events ((socket socket) poll-item-ptr &rest events)
+(defmethod set-events ((socket socket:socket) poll-item-ptr &rest events)
(cffi:with-foreign-slots ((%zmq::socket %zmq::events)
poll-item-ptr
%zmq::pollitem-t)
- (setf socket (slot-value socket '%zmq::ptr)
+ (setf %zmq::socket (slot-value socket 'socket:ptr)
%zmq::events (+ (if (member :pollin events) %zmq::+pollin+ 0)
(if (member :pollout events) %zmq::+pollout+ 0)
(if (member :pollerr events) %zmq::+pollerr+ 0)))))
View
66 zguide/chapter-2.lisp
@@ -34,7 +34,7 @@
(defun run-msreader ()
(let ((weather-server (bt:make-thread #'chapter-1::weather-server
:name "Weather Server")))
- (msreader)))
+ (mspoller)))
(defun mspoller ()
(zmq:with-context (ctx)
@@ -49,4 +49,66 @@
(when (zmq:has-events-p recv-item)
(zmq:recvmsg receiver msg))
(when (zmq:has-events-p sub-item)
- (zmq:recvmsg subscriber msg)))))))))
+ (zmq:recvmsg subscriber msg)))))))))
+
+
+(let ((out *standard-output*))
+ (defun taskwork2 ()
+ (zmq:with-context (ctx)
+ (zmq:with-sockets ((receiver ctx :pull :connect "tcp://localhost:5557")
+ (sender ctx :push :connect "tcp://localhost:5558")
+ (controller ctx :sub :connect "tcp://localhost:5559"
+ :subscribe ""))
+ (zmq:with-message (msg)
+ (zmq:with-poll-list (poll-list (recv receiver :pollin)
+ (control controller :pollin))
+ (loop
+ (when (zmq:poll poll-list)
+ (when (zmq:has-events-p recv)
+ (zmq:recvmsg receiver msg :as 'zmq:string-message)
+ (sleep (/ (read-from-string (zmq:data msg)) 1000.0))
+ (zmq:sendmsg sender msg))
+ (when (zmq:has-events-p control)
+ (return))))))))))
+
+(let ((out *standard-output*))
+ (defun tasksink2 ()
+ (zmq:with-context (ctx)
+ (zmq:with-sockets ((receiver ctx :pull :bind "tcp://*:5558")
+ (controller ctx :pub :bind "tcp://*:5559"))
+ (zmq:with-message (msg)
+ (zmq:recvmsg receiver msg)
+ (loop
+ :for task-number :below 100
+ :with start-time = (get-internal-real-time)
+ :do
+ (zmq:recvmsg receiver msg)
+ (if (zerop (mod task-number 10))
+ (format out ":")
+ (format out "."))
+ :finally (format out "Total elapsed time: ~d msec~%"
+ (- (get-internal-real-time) start-time))))
+ (zmq:sendmsg controller "KILL")))))
+
+
+(defun run-parallel-pipeline (&optional (worker-count 1))
+ (let ((workers (loop
+ :for count :below worker-count
+ :collect (bt:make-thread #'taskwork2
+ :name (format nil "Worker-~D"
+ (1+ count)))))
+ (sink (bt:make-thread #'tasksink2 :name "Sink")))
+ (chapter-1::ventilator)
+ (bt:join-thread sink)))
+
+(defun wuproxy ()
+ (zmq:with-context (ctx)
+ (zmq:with-sockets ((frontend ctx :sub :connect "tcp://192.168.55.210:5556"
+ :subscribe "")
+ (backend ctx :pub :bind "tcp://10.1.1.0:0100"))
+ (zmq:with-message (msg)
+ (loop
+ (loop
+ :do
+ (zmq:recvmsg frontend msg)
+ (zmq:sendmsg backend msg :send-more (zmq:rcvmore frontend))))))))

0 comments on commit 767ba81

Please sign in to comment.