Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 2 commits
  • 7 files changed
  • 0 commit comments
  • 1 contributor
Showing with 144 additions and 26 deletions.
  1. +6 −0 Changelog
  2. +3 −1 src/packages.lisp
  3. +64 −19 src/zmq.lisp
  4. +60 −0 test/multithreading.lisp
  5. +2 −1 test/suites.lisp
  6. +3 −1 zmq-test.asd
  7. +6 −4 zmq.asd
View
6 Changelog
@@ -1,4 +1,10 @@
+Version 1.3.0 (2012-03-01)
+
+- Make socket optionally thread-safe.
+- Add POLL-ITEM-FD.
+
+
Version 1.2.0 (2012-02-03)
- Fix DO-POLL-ITEMS to avoid infinite loop (thanks to Yaroslav Shirokov).
View
4 src/packages.lisp
@@ -5,6 +5,8 @@
:init :term :with-context
+ :socket-%socket :socket-lock :with-socket-locked
+
:socket :close :with-socket :with-sockets
:bind :connect
@@ -19,7 +21,7 @@
:send :recv
:with-poll-items
- :poll-item-socket
+ :poll-item-socket :poll-item-fd
:poll-items-aref :do-poll-items :poll-item-events-signaled-p
:poll
View
83 src/zmq.lisp
@@ -94,19 +94,48 @@ with IO-THREADS threads."
(progn ,@body)
(term ,var))))
-(defun socket (context type)
- "Create and return a new socket."
- (call-ffi (null-pointer)
- '%socket context (foreign-enum-value 'socket-type type)))
+(defclass socket ()
+ ((%socket
+ :accessor socket-%socket
+ :initarg :%socket
+ :documentation "A foreign pointer to the underlying zeromq socket.")
+ (lock
+ :accessor socket-lock
+ :initarg :lock
+ :initform nil
+ :documentation "A lock used for thread-safe sockets, or NIL if the socket
+ isn't thread-safe."))
+ (:documentation "A zeromq socket."))
+
+(defun socket (context type &key thread-safe)
+ "Create and return a new socket. If THREAD-SAFE is not NIL, the socket will
+be protected against concurrent access."
+ (make-instance 'socket
+ :%socket (call-ffi (null-pointer)
+ '%socket context
+ (foreign-enum-value 'socket-type type))
+ :lock (when thread-safe
+ (bordeaux-threads:make-recursive-lock))))
+
+(defmacro with-socket-locked ((socket) &body body)
+ "Evaluate BODY in an environment where SOCKET is protected against
+ concurrent access."
+ `(if (socket-lock socket)
+ (bordeaux-threads:with-recursive-lock-held ((socket-lock ,socket))
+ ,@body)
+ (progn
+ ,@body)))
(defun close (socket)
"Close and release a socket."
- (call-ffi -1 '%close socket))
+ (with-socket-locked (socket)
+ (call-ffi -1 '%close (socket-%socket socket))))
-(defmacro with-socket ((var context type) &body body)
+(defmacro with-socket ((var context type &key thread-safe) &body body)
"Evaluate BODY in an environment where VAR is bound to a socket created in
-context CONTEXT with type TYPE."
- `(let ((,var (socket ,context ,type)))
+context CONTEXT with type TYPE. Key arguments are the same as the arguments of
+SOCKET."
+ `(let ((,var (socket ,context ,type :thread-safe ,thread-safe)))
(unwind-protect
(progn ,@body)
(close ,var))))
@@ -121,12 +150,14 @@ context CONTEXT with type TYPE."
(defun bind (socket endpoint)
"Bind SOCKET to the address ENDPOINT."
(with-foreign-string (%endpoint endpoint)
- (call-ffi -1 '%bind socket %endpoint)))
+ (with-socket-locked (socket)
+ (call-ffi -1 '%bind (socket-%socket socket) %endpoint))))
(defun connect (socket endpoint)
"Connect SOCKET to the address ENDPOINT."
(with-foreign-string (%endpoint endpoint)
- (call-ffi -1 '%connect socket %endpoint)))
+ (with-socket-locked (socket)
+ (call-ffi -1 '%connect (socket-%socket socket) %endpoint))))
(defvar *socket-options-type* (make-hash-table)
"A table to store the foreign type of each socket option.")
@@ -165,7 +196,8 @@ context CONTEXT with type TYPE."
(error "Unknown socket option ~A." option))
(destructuring-bind (type length) info
(with-foreign-objects ((%value type length) (%size 'size-t))
- (call-ffi -1 '%getsockopt socket option %value %size)
+ (with-socket-locked (socket)
+ (call-ffi -1 '%getsockopt (socket-%socket socket) option %value %size))
(case option
(:identity
(let ((size (mem-ref %size 'size-t)))
@@ -187,14 +219,18 @@ context CONTEXT with type TYPE."
(let ((length (length value)))
(with-foreign-object (%value :char (+ length 1))
(lisp-string-to-foreign value %value (+ length 1))
- (call-ffi -1 '%setsockopt socket option %value length))))
+ (with-socket-locked (socket)
+ (call-ffi -1 '%setsockopt (socket-%socket socket) option
+ %value length)))))
(t
(with-foreign-object (%value type length)
(setf (mem-ref %value type) (case option
(:events (foreign-bitfield-value
'event-types value))
(t value)))
- (call-ffi -1 '%setsockopt socket option %value length)))))))
+ (with-socket-locked (socket)
+ (call-ffi -1 '%setsockopt (socket-%socket socket) option
+ %value length))))))))
(defun device (type frontend backend)
"Connect a frontend socket to a backend socket. This function always returns
@@ -328,20 +364,23 @@ the call, SOURCE is an empty message."
(defun send (socket message &optional flags)
"Queue MESSAGE to be sent on SOCKET."
- (call-ffi -1 '%send socket message
- (foreign-bitfield-value 'send-options flags)))
+ (with-socket-locked (socket)
+ (call-ffi -1 '%send (socket-%socket socket) message
+ (foreign-bitfield-value 'send-options flags))))
(defun recv (socket message &optional flags)
"Receive a message from SOCKET and store it in MESSAGE."
- (call-ffi -1 '%recv socket message
- (foreign-bitfield-value 'recv-options flags)))
+ (with-socket-locked (socket)
+ (call-ffi -1 '%recv (socket-%socket socket) message
+ (foreign-bitfield-value 'recv-options flags))))
(defmacro with-poll-items ((items-var size-var) items &body body)
"Evaluate BODY in an environment where ITEMS-VAR is bound to a foreign array
of poll items, and SIZE-VAR is bound to the number of polled items. Poll
items are filled according to ITEMS. ITEMS is a list where each element
describe a poll item. Each description is a list where the first element is
- a socket or file descriptor, and other elements are the events to watch
+ a socket instance, a foreign pointer to a zeromq socket, or a file
+ descriptor, and other elements are the events to watch
for, :POLLIN, :POLLOUT or :POLLERR."
(let ((i 0)
(pollitem-size (foreign-type-size 'pollitem)))
@@ -355,6 +394,8 @@ the call, SOURCE is an empty message."
(destructuring-bind (handle &rest event-list)
(list ,@item)
(cond
+ ((typep handle 'socket)
+ (setf socket (socket-%socket handle)))
((pointerp handle)
(setf socket handle))
(t
@@ -389,9 +430,13 @@ ITEMS."
(foreign-bitfield-value 'event-types events)) 0))
(defun poll-item-socket (poll-item)
- "Return the SOCKET of the poll item POLL-ITEM."
+ "Return a foreign pointer to the zeromq socket of the poll item POLL-ITEM."
(foreign-slot-value poll-item 'pollitem 'socket))
+(defun poll-item-fd (poll-item)
+ "Return the file descriptor of the poll item POLL-ITEM."
+ (foreign-slot-value poll-item 'pollitem 'fd))
+
(defun poll (items nb-items timeout)
"Poll ITEMS with a timeout of TIMEOUT microseconds, -1 meaning no time
limit. Return the number of items with signaled events."
View
60 test/multithreading.lisp
@@ -0,0 +1,60 @@
+
+(in-package :zmq-test)
+
+(in-suite multithreading)
+
+(test multiple-readers
+ (zmq:with-context (context 1)
+ (zmq:with-sockets ((pub-socket context :pub)
+ (sub-socket context :sub :thread-safe t))
+ (zmq:bind pub-socket "inproc://zmq-test")
+ (zmq:setsockopt sub-socket :subscribe "")
+ (zmq:connect sub-socket "inproc://zmq-test")
+ (let ((lock (bordeaux-threads:make-lock))
+ (messages nil))
+ (flet ((reader-main ()
+ (dotimes (i 100)
+ (zmq:with-msg-init (message)
+ (zmq:recv sub-socket message)
+ (bordeaux-threads:with-lock-held (lock)
+ (push (zmq:msg-data-string message) messages))))))
+ (let ((thread-1 (bordeaux-threads:make-thread
+ #'reader-main :name "zmq-test thread 1"))
+ (thread-2 (bordeaux-threads:make-thread
+ #'reader-main :name "zmq-test thread 2")))
+ (dotimes (i 200)
+ (zmq:with-msg-init-data (message "test")
+ (zmq:send pub-socket message)))
+ (bordeaux-threads:join-thread thread-1)
+ (bordeaux-threads:join-thread thread-2)
+ (is-true (and (= (length messages) 200)
+ (every (lambda (message)
+ (string= message "test"))
+ messages)))))))))
+
+(test multiple-writers
+ (zmq:with-context (context 1)
+ (zmq:with-sockets ((pub-socket context :pub :thread-safe t)
+ (sub-socket context :sub))
+ (zmq:bind pub-socket "inproc://zmq-test")
+ (zmq:setsockopt sub-socket :subscribe "")
+ (zmq:connect sub-socket "inproc://zmq-test")
+ (flet ((writer-main ()
+ (dotimes (i 100)
+ (zmq:with-msg-init-data (message "test")
+ (zmq:send pub-socket message)))))
+ (let ((thread-1 (bordeaux-threads:make-thread
+ #'writer-main :name "zmq-test thread 1"))
+ (thread-2 (bordeaux-threads:make-thread
+ #'writer-main :name "zmq-test thread 2")))
+ (let ((messages nil))
+ (dotimes (i 200)
+ (zmq:with-msg-init (message)
+ (zmq:recv sub-socket message)
+ (push (zmq:msg-data-string message) messages)))
+ (bordeaux-threads:join-thread thread-1)
+ (bordeaux-threads:join-thread thread-2)
+ (is-true (and (= (length messages) 200)
+ (every (lambda (message)
+ (string= message "test"))
+ messages)))))))))
View
3 test/suites.lisp
@@ -1,4 +1,5 @@
(in-package :zmq-test)
-(def-suite main :description "main test suite")
+(def-suite main :description "main tests")
+(def-suite multithreading :description "multithreading tests")
View
4 zmq-test.asd
@@ -4,10 +4,12 @@
:author "Nicolas Martyanoff"
:license "BSD"
:description "Tests for the zmq binding."
- :depends-on (:zmq :fiveam)
+ :depends-on (:zmq :fiveam :bordeaux-threads)
:components ((:module "test"
:components ((:file "packages")
(:file "suites"
:depends-on ("packages"))
(:file "main"
+ :depends-on ("suites"))
+ (:file "multithreading"
:depends-on ("suites"))))))
View
10 zmq.asd
@@ -4,11 +4,11 @@
(defsystem zmq
:name "zmq"
- :version "1.2.0"
+ :version "1.3.0"
:author "Nicolas Martyanoff"
:license "BSD"
:description "A binding of the zmq transport layer."
- :depends-on (:cffi)
+ :depends-on (:cffi :bordeaux-threads)
:in-order-to ((test-op (load-op zmq-test)))
:components ((:module "src"
:components ((:file "packages")
@@ -18,5 +18,7 @@
(:file "zmq" :depends-on ("ffi"))))))
(defmethod perform ((o asdf:test-op) (c (eql (find-system :zmq))))
- (funcall (intern "RUN!" :5am)
- (intern "MAIN" :zmq-test)))
+ (let ((suites '(main multithreading)))
+ (dolist (suite suites)
+ (funcall (intern "RUN!" :5am)
+ (intern (symbol-name suite) :zmq-test)))))

No commit comments for this range

Something went wrong with that request. Please try again.