Skip to content

Commit

Permalink
limping towards usability
Browse files Browse the repository at this point in the history
  • Loading branch information
adlai committed Apr 25, 2015
1 parent 6352b83 commit 9a2e788
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 87 deletions.
121 changes: 71 additions & 50 deletions src/actors.lisp
Expand Up @@ -3,15 +3,19 @@
;;;; Copyright © 2015 Adlai Chandrasekhar
;;;;
;;;; Channel-Chattering Actors - A Prototype
;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;; TODO: http://archive.adaic.com/standards/83rat/html/ratl-13-02.html#13.2.4
;;;; The goal is for channels to be as invisible as threads and pointers
;;;; When that happens, this may very well just belong in a separate library
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defpackage #:chanl.actors
(:use #:cl #:chanl) (:import-from #:chanl #:ensure-list)
(:export #:actor #:perform #:halt #:name #:slot-channel #:compute-tubes
#:execute #:command #:abbrev #:state #:ensure-running #:boss))
#:execute #:command #:abbrev #:state #:ensure-running #:boss #:die))

(in-package #:chanl.actors)

(defvar *boss*)

;;; TODO: factor all this apart (delegates -> sheeple? merge into bossing?)
(defclass actor ()
((name :initarg :name :reader name ; if you must, use (setf slot-value)
Expand All @@ -20,24 +24,31 @@
;; this is traditional "message passing", each actor gets its own state
(state :initform 'perform :documentation "Represents/performs actor's state")
(tubes :documentation "Channels used for communication")
(boss :documentation "For whom['s benefit] the bell tolls"
:reader boss :initarg :boss)
(boss :documentation "For whom['s benefit] the bell tolls" :reader boss
:initform *boss* :initarg :boss :type (or bt:thread boss))
(command :documentation "Command being executed by the actor")))

(defun slot-channel (actor slot) (cdr (assoc slot (slot-value actor 'tubes))))
(defun slot-channel (actor slot)
"Returns the channel associated with `slot' in `actor'"
(let ((spec (cdr (assoc slot (slot-value actor 'tubes)))))
(etypecase spec (channel spec) (symbol (slot-value actor spec)))))

(defgeneric compute-tubes (actor)
(:documentation "Calculates the list of communication slots for `actor'")
(:documentation "Calculates the list of communication slots for `actor'.
Methods should return a list of specifications (or a single one as an atom)")
(:method-combination list :most-specific-last) ; TODO: lazy-append
(:method :around ((actor actor))
(:method :around ((actor actor)) ; &rest?
"Combines the specifications, creating channels if necessary"
(mapcan (lambda (tubing)
(mapcar (lambda (tube)
(destructuring-bind (name . spec) (ensure-list tube)
(cons name (apply #'make-instance
(or spec '(channel))))))
(ensure-list tubing)))
(cons name ; ( data-slot :from channel-slot )
(if (member (car spec)'(:to :from)) (cadr spec)
(apply #'make-instance
(or spec '(channel)))))))
(ensure-list tubing))) ; in case a method returns an atom
(call-next-method)))
(:method list ((actor actor)) 'command))
(:method list ((actor actor)) '(command death)))

;;; from scalpl.util ; TODO: #.(if (find-package :scalpl.util) ...)
(defun strftime (&optional datep &aux bits)
Expand All @@ -50,29 +61,30 @@
(when datep (collect " " next "-" next)))))
(apply 'concatenate 'string bits))

(defgeneric christen (actor type)
(:method ((actor actor) (type (eql 'actor))) (strftime t))
(:method ((actor actor) (type (eql 'task)))
(defgeneric christen (actor)
(:method ((actor actor))
(with-slots (name abbrev) actor (format nil "~A~@[ ~A~]" name abbrev)))
(:method :around ((actor actor) (type (eql 'task)))
(:method :around ((actor actor))
(concatenate 'string (strftime) " " (call-next-method))))

(defmethod slot-unbound ((class t) (actor actor) (slot-name (eql 'name)))
(setf (slot-value actor 'name) (christen actor 'actor)))
(setf (slot-value actor 'name) (strftime t)))

(defmethod print-object ((actor actor) stream)
(print-unreadable-object (actor stream :type t :identity t)
(write-string (name actor) stream)))

(defmacro define-delegated-slot-operation (operation return)
`(defmethod slot-missing ((class t) (actor actor) slot-name
(operation (eql ',operation)) &optional new-value)
(declare (ignore new-value)) ; a sufficiently smart compiler...
(if (and (slot-boundp actor 'boss) (slot-boundp (boss actor) slot-name))
,return (call-next-method))))
(define-delegated-slot-operation slot-value (slot-value (boss actor) slot-name))

(define-method-combination select (&optional (sleep 1))
(macrolet ((delegate-slot-operation (op return) ;P
`(defmethod slot-missing ((class t) (actor actor) slot
(op (eql ',op)) &optional new-value)
(declare (ignore new-value)) ; a sufficiently smart compiler...
(with-slots (boss) actor
(if (and (typep boss 'actor) (slot-boundp boss slot))
,return (call-next-method))))))
(delegate-slot-operation slot-value (slot-value boss slot))
(delegate-slot-operation slot-boundp t))

(define-method-combination select (&optional (sleep 1/7))
((select *)) (:arguments actor)
(let (before after around recv send default)
(dolist (method select)
Expand Down Expand Up @@ -110,35 +122,31 @@

(defgeneric execute (actor command)
(:method ((actor actor) (command function)) (funcall command actor))
(:method ((actor actor) (command (eql :halt))) (throw :halt actor)))
(:method ((actor actor) (command (eql :die))) (throw :die (current-thread))))

(defun launch (actor)
(bt:make-thread (lambda ()
(catch :halt
(catch :die
(loop (funcall (slot-value actor 'state) actor))))
:name (christen actor 'task)))
:name (christen actor)))

(defgeneric ensure-running (actor)
(:method ((actor actor))
(with-slots (boss) actor
(if (not (slot-boundp actor 'boss)) (setf boss (launch actor))
(typecase boss
(bt:thread
(cond ((eq boss (bt:current-thread))
(warn "~A tried to revive itself" actor))
((bt:thread-alive-p boss)
(warn "~A revived before death" actor))
(t (warn "races ahoy!") (setf boss (launch actor)))))
(boss (send (slot-channel boss 'to-run) actor)))))))
(symbol-macrolet ((launch (setf boss (launch actor))))
(typecase boss
(null launch)
(bt:thread (cond ((eq boss (bt:current-thread)) ; nop
(warn "~A tried to revive itself" actor))
((bt:thread-alive-p boss) ; nop
(warn "~A revived before death" actor))
(t (warn "races ahoy!") launch)))
(boss (send (slot-channel boss 'to-run) actor)))))))

(defgeneric act (class &key)
(:method ((class symbol) &rest initargs) ; :metaclass actor-class
(ensure-running (apply #'make-instance class initargs))))

(defgeneric halt (actor) ; TODO: blocking? timeouts? kill?
(:documentation "Signals `actor' to terminate")
(:method ((actor actor)) (send (slot-channel actor 'command) :halt)))

(defmethod initialize-instance :before ((actor actor) &key)
(setf (slot-value actor 'tubes) (compute-tubes actor)))

Expand All @@ -155,19 +163,32 @@
(to-halt :documentation "Actor to halt, but keep its link")
(to-fire :documentation "Actor to both halt and unlink")))

(defmethod compute-tubes list ((boss boss)) '(to-run to-halt to-fire))
(defmethod compute-tubes list ((boss boss))
'((to-run unbounded-channel) to-halt to-fire))

(define-delegated-slot-operation slot-boundp t)
(defvar *boss*
(make-instance 'boss :name "atp" :boss
(prog1 (bt:make-thread #'list) (sleep 3)))) ; bootstrap!

(defun map-workers (boss function) ; ... i'm not sure what i expected
(mapcar function (mapcar #'car (slot-value boss 'workers))))

(defmethod ensure-running :after ((boss boss))
(unless (eq (bt:current-thread) (boss boss))
(send (slot-channel boss 'command)
(lambda (boss) (map-workers boss #'ensure-running)))))
(map-workers boss #'ensure-running))

(defun %kill (actor) (send (slot-channel actor 'command) :die))

(defgeneric die (actor) ; TODO: blocking? timeouts?
(:documentation "Signals `actor' to terminate")
(:method :before ((boss boss)) (map-workers boss #'halt))
(:method ((actor actor)) (%kill actor)))

(defun halt (actor)
(typecase (boss actor)
(bt:thread (die actor))
(boss (send (slot-channel (boss actor) 'to-halt) actor))))

(defmethod halt :before ((boss boss)) (map-workers boss #'halt))
(defun fire (actor) (send (slot-channel (boss actor) 'to-fire) actor))

(defmethod perform recv to-run ((boss boss))
(with-slots (to-run workers) boss
Expand All @@ -183,12 +204,12 @@
(if (eq boss to-halt) (warn "~A told to halt itself" boss)
(let ((link (assoc to-halt workers))) ; makes an ass out of you and me
(declare (type (or null (cons actor bt:thread)) link))
(and link (bt:thread-alive-p (cdr link)) (halt to-halt))))))
(and link (bt:thread-alive-p (cdr link)) (%kill to-halt))))))

(defmethod perform recv to-fire ((boss boss))
(with-slots (to-fire workers) boss ; FIXME: pater, pater everywhere, but...
(if (eq boss to-fire) (warn "~A told to fire itself" boss)
(let ((link (assoc to-fire workers))) ; this isn't even funny anymore
(declare (type (or null (cons actor bt:thread)) link))
(and link (or (not (bt:thread-alive-p (cdr link))) (halt to-fire))
(and link (or (not (bt:thread-alive-p (cdr link))) (%kill to-fire))
(setf workers (remove link workers)))))))
65 changes: 29 additions & 36 deletions tests/actors.lisp
Expand Up @@ -12,6 +12,9 @@
(:use :cl :chanl :chanl.actors) (:export #:run-all-tests))
(in-package :chanl.actors.tests)

(defun divulge-thread (actor &aux (channel (slot-channel actor 'command)))
(recv (send channel (lambda (actor) actor (send channel (current-thread))))))

(def-suite actors :in chanl)
(def-suite action :in actors)
(in-suite action)
Expand All @@ -20,14 +23,11 @@
(let ((actor (make-instance 'actor)))
(is (not (null (name actor))))
(is (eq 'perform (slot-value actor 'state)))
(is (threadp (slot-value actor 'boss)))
(is (channelp (slot-channel actor 'command)))
(let ((channel (make-instance 'channel)))
(send (slot-channel actor 'command)
(lambda (actor) (send channel actor)))
(is (eq actor (recv channel))))
(halt actor) (sleep 1)
(is (not (thread-alive-p (slot-value actor 'boss))))))
(let ((thread (divulge-thread actor)))
(is (threadp thread))
(die actor) (sleep 1)
(is (not (thread-alive-p thread))))))

(test actors-insanity
#.(let ((class-name (gensym "actor")))
Expand All @@ -38,17 +38,19 @@
(setf (slot-value actor 'output) (slot-value actor 'input)))
(defmethod perform send output ((actor ,class-name)))
(let ((actor (make-instance ',class-name)))
(is (equal '(command input output)
(mapcar 'car (slot-value actor 'tubes))))
(is (intersection '(command input output)
(mapcar 'car (slot-value actor 'tubes))))
(is (null (recv (slot-channel actor 'output))))
(is (null (recv (slot-channel actor 'output))))
(send (slot-channel actor 'input) ())
(is (null (recv (slot-channel actor 'output))))
(send (slot-channel actor 'input) 'tubes)
(is (eq 'tubes (recv (slot-channel actor 'output))))
(is (eq 'tubes (recv (slot-channel actor 'output))))
(halt actor) (sleep 1)
(is (not (thread-alive-p (slot-value actor 'boss)))))
(let ((thread (divulge-thread actor)))
(is (threadp thread))
(die actor) (sleep 1)
(is (not (thread-alive-p thread)))))
(flet ((remmethod (genfun quals specs)
(remove-method genfun (find-method genfun quals specs))))
(mapcar #'remmethod (list #'perform #'perform #'compute-tubes)
Expand All @@ -60,28 +62,19 @@
(in-suite bossing)

(test bossing-sanity
(let ((n (length (all-threads))))
(let ((boss (make-instance 'boss)))
(is (not (null (name boss))))
(is (eq 'perform (slot-value boss 'state)))
(is (threadp (slot-value boss 'boss)))
(is (channelp (slot-channel boss 'command)))
(let ((channel (make-instance 'channel)))
(send (slot-channel boss 'command)
(lambda (boss) (send channel boss)))
(is (eq boss (recv channel))))
(let ((worker (make-instance 'actor :boss boss)))
(let ((channel (make-instance 'channel)))
(send (slot-channel worker 'command)
(lambda (worker) (send channel worker)))
(is (eq worker (recv channel))))
(send (slot-channel boss 'chanl.actors::to-halt) worker) (sleep 2)
(is (= (1+ n) (length (all-threads))))
(send (slot-channel boss 'chanl.actors::to-run) worker) (sleep 1)
(let ((channel (make-instance 'channel)))
(send (slot-channel worker 'command)
(lambda (worker) (send channel worker)))
(is (eq worker (recv channel))))
(halt boss) (sleep 2)
(is (not (thread-alive-p (slot-value boss 'boss))))
(is (= n (length (all-threads))))))))
(let ((boss (make-instance 'boss :name "test boss")))
(is (not (null (name boss))))
(is (eq 'perform (slot-value boss 'state)))
(is (channelp (slot-channel boss 'command)))
(let ((boss-thread (divulge-thread boss)))
(is (threadp boss-thread))
(let ((worker (make-instance 'actor :boss boss :name "test worker")))
(let ((worker-thread (divulge-thread worker)))
(is (thread-alive-p worker-thread))
(halt worker) (sleep 1)
(is (not (thread-alive-p worker-thread)))
(send (slot-channel boss 'chanl.actors::to-run) worker)
(setf worker-thread (divulge-thread worker))
(is (thread-alive-p worker-thread))
(die boss) (sleep 1)
(is (notany #'thread-alive-p `(,boss-thread ,worker-thread))))))))
2 changes: 1 addition & 1 deletion tests/setup-tests.lisp
Expand Up @@ -28,7 +28,7 @@
(format t "~2&*******************~@
** Starting test **~@
*******************~%")
(run-all-tests)
(handler-bind ((style-warning #'muffle-warning)) (run-all-tests))
(format t "~2&*****************************************~@
** Tests finished **~@
*****************************************~@
Expand Down

0 comments on commit 9a2e788

Please sign in to comment.