Permalink
Browse files

ZGuide chapter 2 start

Add more specific error conditions
  • Loading branch information...
1 parent 537f4e6 commit 5904107326ff2d8ca56f36ed3074a33836850e21 Michael Compton committed Jul 27, 2012
Showing with 154 additions and 16 deletions.
  1. +4 −2 little-zmq.asd
  2. +22 −5 src/bindings.lisp
  3. +1 −0 src/grovel.spec.lisp
  4. +5 −2 src/little-zmq.lisp
  5. +2 −1 src/zmq-bindings-grovel.lisp
  6. +68 −6 zguide/chapter-1.lisp
  7. +52 −0 zguide/chapter-2.lisp
View
@@ -21,8 +21,10 @@
:components ((:file "devices/standard-devices")))
(asdf:defsystem #:little-zmq.zguide-examples
- :depends-on (#:bordeaux-threads #:little-zmq)
- :components ((:file "zguide/chapter-1")))
+ :depends-on (#:bordeaux-threads #:little-zmq #:cl-ppcre)
+ :serial t
+ :components ((:file "zguide/chapter-1")
+ (:file "zguide/chapter-2")))
(asdf:defsystem #:little-zmq.tests
:serial t
View
@@ -7,7 +7,10 @@
(:shadow #:close)
(:export
#:with-eintr-retry
- #:version))
+ #:version
+ #:error-number
+ #:eagain))
+
(in-package #:zmq-bindings)
@@ -21,6 +24,8 @@
(declaim (optimize (speed 3)))
+;;;; ZMQ error conditions
+
(define-condition zmq-error
(error)
((error-number :initarg :error-number
@@ -29,7 +34,20 @@
(declare (cl:type stream stream))
(format stream "An error was raised on a ZMQ funcall.~%")
(format stream "Error string: ~a~%"
- (strerror (error-number condition))))))
+ (strerror (error-number condition)))))
+ (:documentation "Parent-type condition for all ZMQ error"))
+
+(define-condition eagain
+ (zmq-error)
+ ()
+ (:documentation "EAGAIN condition"))
+
+(defun raise-error (errno)
+ (declare (inline raise-error)
+ (type fixnum errno))
+ (cond
+ ((eq +eagain+ errno) (error 'eagain :error-number errno))
+ (t (error 'zmq-error :error-number errno))))
;;; EINTR Retry
@@ -108,8 +126,7 @@
(when ,(if (eq return-type :pointer)
'(cffi:null-pointer-p ret)
'(< ret 0))
- (error 'zmq-error
- :error-number (errno)))
+ (raise-error (errno)))
ret)))))
(defmacro defcfun+ (name-and-options return-type &body args)
@@ -136,7 +153,7 @@
(declare (type fixnum err))
(if (and eintr-retry (= +eintr+ err))
(go retry)
- (error 'zmq-error :error-number err)))))
+ (raise-error err)))))
ret)))))
(defun version ()
View
@@ -9,5 +9,6 @@
(constant (+emfile+ "EMFILE"))
(constant (+efault+ "EFAULT"))
(constant (+enodev+ "ENODEV"))
+ (constant (+eagain+ "EAGAIN"))
(ctype size-t "size_t"))
View
@@ -3,7 +3,7 @@
(:use #:cl #:message #:poll #:socket)
(:nicknames #:zmq)
(:shadowing-import-from #:socket #:push #:identity)
- (:import-from #:%zmq #:version)
+ (:import-from #:%zmq #:version #:eagain #:error-number)
(:export
#:with-context
#:with-eintr-retry
@@ -24,13 +24,16 @@
#:poll
#:has-events-p
#:with-socket
+ #:with-sockets
#:size
#:msg-t
#:msg-t-ptr
#:version
#:subscribe
#:data
- #:string-message))
+ #:string-message
+ #:error-number
+ #:eagain))
(in-package #:little-zmq)
@@ -6,4 +6,5 @@
#:+einval+
#:+emfile+
#:+efault+
- #:+enodev+))
+ #:+enodev+
+ #:+eagain+))
View
@@ -59,16 +59,78 @@
(zmq:with-context (ctx)
(zmq:with-socket (subscriber ctx :sub :connect "tcp://localhost:5556")
(format t "Collecting updates from weather server...~%")
- (setf (zmq:subscribe subscriber) "10001")
- (zmq:with-message (msg)
+ (let ((filter "10001"))
+ (setf (zmq:subscribe subscriber) filter)
(loop
- :for update-number :below 100 :do
- (zmq:recvmsg subscriber msg :as 'zmq:string-message)
- (format t "~S~%" (zmq:data msg)))))))
+ :for update-number :below 100
+ :with total-temp = 0
+ :do
+ (destructuring-bind (zipcode temperature relhumidity)
+ (ppcre:split " " (zmq:recvmsg subscriber :string))
+ (declare (ignore zipcode relhumidity))
+ (incf total-temp (read-from-string temperature)))
+ :finally
+ (format t "Average temperature for zipcode ~D was ~D"
+ filter (/ total-temp update-number)))))))
(defun run-weather-server ()
(let ((server (bt:make-thread #'weather-server
:name "Weather Server")))
(weather-client)
- (bt:destroy-thread server)))
+ (bt:destroy-thread server)))
+
+(let ((out *standard-output*))
+ (defun taskwork ()
+ (zmq:with-context (ctx)
+ (zmq:with-sockets ((receiver ctx :pull :connect "tcp://localhost:5557")
+ (sender ctx :push :connect "tcp://localhost:5558"))
+ (loop
+ (let ((string (zmq:recvmsg receiver :string)))
+ (format out "~A." string)
+ (sleep (/ (read-from-string string) 1000.0))
+ (zmq:sendmsg sender "")))))))
+
+(let ((out *standard-output*))
+ (defun tasksink ()
+ (zmq:with-context (ctx)
+ (zmq:with-socket (receiver ctx :pull :bind "tcp://*:5558")
+ (zmq:recvmsg receiver :string)
+ (loop
+ :for task-number :below 100
+ :with start-time = (get-internal-real-time)
+ :do
+ (zmq:recvmsg receiver :string)
+ (if (zerop (mod task-number 10))
+ (format out ":")
+ (format out "."))
+ :finally (format out "Total elapsed time: ~D msec~%"
+ (- (get-internal-real-time) start-time)))))))
+
+(defun ventilator ()
+ (zmq:with-context (ctx)
+ (zmq:with-sockets ((sender ctx :push :bind "tcp://*:5557")
+ (sink ctx :push :connect "tcp://localhost:5558"))
+ (format t "Press any key when the workers are ready: ~%")
+ (read-char)
+ (format t "Sending tasks to workers...~%")
+ (zmq:sendmsg sink "0")
+ (loop
+ :for task-number :below 100
+ :with total-msec = 0 :do
+ (let ((workload (1+ (random 100))))
+ (incf total-msec workload)
+ (zmq:sendmsg sender (format nil "~D" workload)))
+ :finally (format t "Total expected cost: ~D msec~%" total-msec))
+ (sleep 1))))
+
+(defun run-parallel-pipeline (&optional (worker-count 1))
+ (let ((workers (loop
+ :for count :below worker-count
+ :collect (bt:make-thread #'taskwork
+ :name (format nil "Worker-~D"
+ (1+ count)))))
+ (sink (bt:make-thread #'tasksink :name "Sink")))
+ (ventilator)
+ (bt:join-thread sink)
+ (map nil #'bt:destroy-thread workers)))
View
@@ -0,0 +1,52 @@
+
+(defpackage #:zguide.chapter-2
+ (:nicknames #:chapter-2)
+ (:use #:common-lisp)
+ (:export))
+
+(in-package zguide.chapter-2)
+
+(defun msreader ()
+ (zmq:with-context (ctx)
+ (zmq:with-sockets ((receiver ctx :pull :connect "tcp://localhost:5557")
+ (subscriber ctx :sub :connect "tcp://localhost:5556"))
+ (setf (zmq:subscribe subscriber) "10001 ")
+ (zmq:with-message (msg)
+ (loop
+ (loop
+ (restart-case
+ (handler-bind ((zmq:eagain
+ #'(lambda (condition)
+ (declare (ignore condition))
+ (invoke-restart 'stop-processing))))
+ (zmq:recvmsg receiver msg :blocking nil))
+ (stop-processing () (return))))
+ (loop
+ (restart-case
+ (handler-bind ((zmq:eagain
+ #'(lambda (condition)
+ (declare (ignore condition))
+ (invoke-restart 'stop-processing))))
+ (zmq:recvmsg subscriber msg :blocking nil))
+ (stop-processing () (return))))
+ (sleep 1))))))
+
+(defun run-msreader ()
+ (let ((weather-server (bt:make-thread #'chapter-1::weather-server
+ :name "Weather Server")))
+ (msreader)))
+
+(defun mspoller ()
+ (zmq:with-context (ctx)
+ (zmq:with-sockets ((receiver ctx :pull :connect "tcp://localhost:5557")
+ (subscriber ctx :sub :connect "tcp://localhost:5556"))
+ (setf (zmq:subscribe subscriber) "10001 ")
+ (zmq:with-poll-list (poll-list (recv-item receiver :pollin)
+ (sub-item subscriber :pollin))
+ (zmq:with-message (msg)
+ (loop
+ (when (zmq:poll poll-list)
+ (when (zmq:has-events-p recv-item)
+ (zmq:recvmsg receiver msg))
+ (when (zmq:has-events-p sub-item)
+ (zmq:recvmsg subscriber msg)))))))))

0 comments on commit 5904107

Please sign in to comment.