Permalink
Browse files

-Added queue system

-Each actor now has three queues (in, out, err) rather than one
-Exported queue-related functions
-.asd converted to sequential. Followed what seems to be convention in putting my name in
  • Loading branch information...
1 parent 129b293 commit 1811022fe6b75a52a30daefd5ef1e26ef6f70946 inaimathi committed Jul 25, 2012
Showing with 108 additions and 106 deletions.
  1. +51 −97 actors.lisp
  2. +6 −1 cl-actors.asd
  3. +4 −8 package.lisp
  4. +47 −0 queue.lisp
View
@@ -1,131 +1,85 @@
(in-package #:cl-actors)
-;; ----------------------------------------------------------------------------
-(defclass actor()
+(defclass actor ()
((name :initarg :name
:initform (error ":name must be specified")
- :accessor name
- :documentation "Hold the name of actor")
+ :accessor name)
(behavior :initarg :behavior
- :initform (error ":behav must be specified")
- :accessor behavior
- :documentation "Behavior")
- (messages :initform '()
- :accessor messages
- :documentation "Message stream sent to actor")
- (lock :initform (bt:make-lock)
- :accessor lock
- :documentation
- "The lock is used when adding a message to the message queue")
- (cv :initarg :cv
- :initform (bt:make-condition-variable)
- :accessor cv
- :documentation "conditional variable used by the thread")
+ :initform (error ":behav must be specified")
+ :accessor behavior
+ :documentation "Behavior")
+
+ (in :initform (make-queue) :accessor in
+ :documentation "Queue of incoming messages")
+ (out :initform (make-queue 50)
+ :documentation "Queue of outgoing messages")
+ (err :initform (make-queue 10)
+ :documentation "Queue of error/debug messages")
+
thread))
-;; ----------------------------------------------------------------------------
-(defmethod initialize-instance :after((self actor) &key)
+(defmethod initialize-instance :after ((self actor) &key)
"Uses the main functiona name to create a thread"
(with-slots (name thread) self
- (setf thread
- (bt:make-thread #'(lambda() (main self))
- :name name))))
+ (setf thread
+ (bt:make-thread #'(lambda() (main self))
+ :name name))))
-;; ----------------------------------------------------------------------------
(defmethod send ((self actor) &rest message)
- "
-Creates a message sending thread which
-1. Holds lock to the message (queue)
-2. Appends messages (queue) with incoming message
-3. Releases lock
-4. Notifies the waiting thread that there is a message
-"
- (with-slots (messages lock cv) self
- (bt:make-thread #'(lambda ()
- (with-lock-held (lock)
- (setf messages (nconc messages (list message))))
- (condition-notify cv)))
- (values)))
+ "Creates a message sending thread to push a new message into the in` queue of the target actor."
+ (bt:make-thread
+ (lambda () (enqueue message (in self))))
+ (values))
-;; ----------------------------------------------------------------------------
-;; (defun stop-actor (actor) (destroy-thread (get-thread actor)))
(defmethod stop-actor ((self actor))
"Stops the actor thread"
- (with-slots (thread) self
- (destroy-thread thread)))
+ (with-slots (thread) self (destroy-thread thread)))
-;; ----------------------------------------------------------------------------
-;; (defun get-thread (actor) (first actor))
-(defmethod get-thread ((self actor))
- "Returns the handle of a thread"
- (with-slots (thread) self
- thread))
+;; I think that this should be more of an internal function
+;; than a method (experiment with funcallable-standard-class)
+;;;; -Naveen Sundar G.
-;; ----------------------------------------------------------------------------
-;; The main which is started as a thread from the constructor I think that this
-;; should be more of an internal function than a method (experiment with
-;; funcallable-standard-class)
-(defmethod main((self actor))
- (with-slots (lock cv (behav behavior) messages) self
+;; No opinion
+;;;; -Inaimathi
+(defmethod main ((self actor))
+ "The main which is started as a thread from the constructor."
+ (with-slots (behavior in) self
(loop
- (thread-yield)
- (with-lock-held (lock)
- (if (not (null messages))
- (setf behav (apply behav
- (pop messages)))
- (condition-wait cv lock ))
- (unless behav (return))))))
+ (let ((res (apply behavior (dequeue in))))
+ (unless res (return))))))
-;; ----------------------------------------------------------------------------
-;; Create a behavior that can be attached to any actor
-(defmacro behav (state vars &body body)
+;; Not sure whether the below would benefit from
+;; 1. automatically adding a next call after ,@body
+;; 2. automatically sending the result of ,@body to the out queue
+;;;; -Inaimathi
+(defmacro behav (state vars &body body)
+ "Create a behavior that can be attached to any actor."
`(let ,state
- (labels ((me ,(append vars `(&key self (next #'me next-supplied-p)))
+ (labels ((me ,(append vars `(&key self (next #'me next-supplied-p)))
(setf next (curry next :self self))
- x ,@body))
+ x ,@body))
#'me)))
-;; ----------------------------------------------------------------------------
-;; Macro for creating actors with the behavior specified by body
+;; Same concerns as above (in fact, I'm not entirely clear on why
+;; defactor doesn't simply call behav, since that seems to be the point)
+;;;; -Inaimathi
(defmacro defactor (name state vars &body body)
+ "Macro for creating actors with the behavior specified by body"
`(defun ,name (&key (self) ,@state)
(labels ((me ,(append vars `(&key (next #'me next-supplied-p)))
- (if next-supplied-p
- (setf next (curry next :self self)))
+ (when next-supplied-p
+ (setf next (curry next :self self)))
,@body))
- (setf self (make-actor #'me ,(string name))) self)))
+ (setf self (make-actor #'me ,(string name)))
+ self)))
-;; ----------------------------------------------------------------------------
-;; The shell of an actor
(defun make-actor (behav name)
+ "The shell of an actor"
(make-instance 'actor
:name (concatenate 'string "Actor: " name)
:behavior behav))
-;; ----------------------------------------------------------------------------
-(defun if-single (x)
- (if (eq (length x) 1)
- (car x)
- x))
-
-;; ----------------------------------------------------------------------------
-(defun sink (&rest args)
- (declare (ignore args)) #'sink)
-
-;; ----------------------------------------------------------------------------
-;; Currying.
(defun curry (f &rest args)
+ "Simple currying implementation."
(lambda (&rest rem)
- (apply f (append rem args) )))
-
-;; ----------------------------------------------------------------------------
-;; Easy priting to repl from threads.
-(defun pr (x)
- (print x *standard-output*)
- (format t "~%"))
-
-;; ----------------------------------------------------------------------------
-;; A printing actor
-(defactor printer ()
- (x)
- (pr x) next)
+ (apply f (append rem args))))
View
@@ -5,6 +5,9 @@
;;;; @author Naveen Sundar G. <naveensundarg@gmail.com>
;;;; @date Thu Apr 5 2012
;;;; @brief asdf-install package file for cl-actors
+;;;; @author Inaimathi <leo.zovic@gmail.com>
+;;;; @date Jup 25 2012
+;;;; @brief queue system and output queues
;;;;===========================================================================
(defpackage #:cl-actors-asd (:use #:asdf #:cl))
@@ -16,7 +19,9 @@
:licence "BSD"
:description ""
:depends-on (:bordeaux-threads)
+ :serial t
:components ((:file "package")
- (:file "actors" :depends-on ("package"))))
+ (:file "queue")
+ (:file "actors")))
View
@@ -8,11 +8,7 @@
(in-package #:cl-user)
(defpackage #:cl-actors
- (:use #:cl
- #:bordeaux-threads)
- (:export :defactor
- :next
- :behav
- :send
- :stop-actor
- :printer))
+ (:use #:cl #:bordeaux-threads)
+ (:export
+ #:defactor #:self #:next #:send #:stop-actor #:behav
+ #:make-queue #:enqueue #:dequeue #:len #:messages))
View
@@ -0,0 +1,47 @@
+(in-package #:cl-actors)
+
+(defclass message-queue ()
+ ((messages :accessor messages :initarg :messages :initform nil)
+ (last-cons :accessor last-cons :initarg :last-cons :initform nil
+ :documentation "Cached end of the list")
+ (len :accessor len :initarg :len :initform 0
+ :documentation "Cached message queue length. Modified by enqueue and dequeue")
+ (max-len :accessor max-len :initarg :max-len :initform nil
+ :documentation "If present, queue maintains at most this many elements")
+ (lock :initform (bt:make-lock) :accessor lock
+ :documentation "Lock for this message queue")
+ (flag :initform (bt:make-condition-variable) :accessor flag
+ :documentation "Condition variable used to notify that a message was enqueued")))
+
+(defun make-queue (&optional max-len)
+ (make-instance 'message-queue :max-len max-len))
+
+(defmethod enqueue (object (queue message-queue))
+ "Adds an element to the back of the given queue in a thread-safe way."
+ (with-slots (lock messages max-len len flag last-cons) queue
+ (with-lock-held (lock)
+ (let ((o (list object)))
+ (cond ((or (null messages) (null last-cons))
+ (setf messages o
+ last-cons messages
+ len 1))
+ ((= len max-len)
+ (pop messages)
+ (setf (cdr last-cons) o
+ last-cons o))
+ (t (setf (cdr last-cons) o
+ last-cons o)
+ (incf len)))))
+ (condition-notify flag)
+ messages))
+
+(defmethod dequeue ((queue message-queue))
+ "Pops a message from the given queue."
+ (with-slots (messages lock flag len) queue
+ (loop
+ (thread-yield)
+ (with-lock-held (lock)
+ (when messages
+ (decf len)
+ (return (pop messages)))
+ (condition-wait flag lock)))))

0 comments on commit 1811022

Please sign in to comment.