Permalink
Browse files

Add ECL threads implementation to swank

  • Loading branch information...
1 parent a4e85dc commit 5415cdcce2bef683ff632ef68e1f5ab03d9a1a3d Geo Carncross committed Dec 15, 2007
Showing with 157 additions and 0 deletions.
  1. +157 −0 swank-ecl.lisp
View
@@ -244,3 +244,160 @@
;;;; Definitions
(defimplementation find-definitions (name) nil)
+
+;;;; Threads
+
+#+threads
+(progn
+ (defvar *thread-id-counter* 0)
+
+ (defvar *thread-id-counter-lock*
+ (mp:make-lock :name "thread id counter lock"))
+
+ (defun next-thread-id ()
+ (mp:with-lock (*thread-id-counter-lock*)
+ (incf *thread-id-counter*)))
+
+ (defparameter *thread-id-map* (make-hash-table))
+
+ (defvar *thread-id-map-lock*
+ (mp:make-lock :name "thread id map lock"))
+
+ ; ecl doesn't have weak pointers
+ (defimplementation spawn (fn &key name)
+ (let ((thread (mp:make-process :name name))
+ (id (next-thread-id)))
+ (mp:process-preset
+ thread
+ #'(lambda ()
+ (unwind-protect
+ (mp:with-lock (*thread-id-map-lock*)
+ (setf (gethash id *thread-id-map*) thread))
+ (funcall fn)
+ (mp:with-lock (*thread-id-map-lock*)
+ (remhash id *thread-id-map*)))))
+ (mp:process-enable thread)))
+
+ (defimplementation thread-id (thread)
+ (block thread-id
+ (mp:with-lock (*thread-id-map-lock*)
+ (loop for id being the hash-key in *thread-id-map*
+ using (hash-value thread-pointer)
+ do (if (eq thread thread-pointer)
+ (return-from thread-id id))))))
+
+ (defimplementation find-thread (id)
+ (mp:with-lock (*thread-id-map-lock*)
+ (gethash id *thread-id-map*)))
+
+ (defimplementation thread-name (thread)
+ (mp:process-name thread))
+
+ (defimplementation thread-status (thread)
+ (if (mp:process-active-p thread)
+ "RUNNING"
+ "STOPPED"))
+
+ (defimplementation make-lock (&key name)
+ (mp:make-lock :name name))
+
+ (defimplementation call-with-lock-held (lock function)
+ (declare (type function function))
+ (mp:with-lock (lock) (funcall function)))
+
+ (defimplementation make-recursive-lock (&key name)
+ (mp:make-lock :name name))
+
+ (defimplementation call-with-recursive-lock-held (lock function)
+ (declare (type function function))
+ (mp:with-lock (lock) (funcall function)))
+
+ (defimplementation current-thread ()
+ mp:*current-process*)
+
+ (defimplementation all-threads ()
+ (mp:all-processes))
+
+ (defimplementation interrupt-thread (thread fn)
+ (mp:interrupt-process thread fn))
+
+ (defimplementation kill-thread (thread)
+ (mp:process-kill thread))
+
+ (defimplementation thread-alive-p (thread)
+ (mp:process-active-p thread))
+
+ (defvar *mailbox-lock* (mp:make-lock :name "mailbox lock"))
+
+ (defstruct (mailbox (:conc-name mailbox.))
+ (mutex (mp:make-lock :name "process mailbox"))
+ (queue '() :type list))
+
+ (defun mailbox (thread)
+ "Return THREAD's mailbox."
+ (mp:with-lock (*mailbox-lock*)
+ (or (find thread *mailboxes* :key #'mailbox.thread)
+ (let ((mb (make-mailbox :thread thread)))
+ (push mb *mailboxes*)
+ mb))))
+
+ (defimplementation send (thread message)
+ (let* ((mbox (mailbox thread))
+ (mutex (mailbox.mutex mbox)))
+ (mp:interrupt-process
+ thread
+ (lambda ()
+ (mp:with-lock (mutex)
+ (setf (mailbox.queue mbox)
+ (nconc (mailbox.queue mbox) (list message))))))))
+
+ (defimplementation receive ()
+ (block got-mail
+ (let* ((mbox (mailbox mp:*current-process*))
+ (mutex (mailbox.mutex mbox)))
+ (loop
+ (mp:with-lock (mutex)
+ (if (mailbox.queue mbox)
+ (return-from got-mail (pop (mailbox.queue mbox)))))
+ ;interrupt-process will halt this if it takes longer than 1sec
+ (sleep 1)))))
+
+ ;; Auto-flush streams
+ (defvar *auto-flush-interval* 0.15
+ "How often to flush interactive streams. This valu is passed
+ directly to cl:sleep.")
+
+ (defvar *auto-flush-lock* (make-recursive-lock :name "auto flush"))
+
+ (defvar *auto-flush-thread* nil)
+
+ (defvar *auto-flush-streams* '())
+
+ (defimplementation make-stream-interactive (stream)
+ (call-with-recursive-lock-held
+ *auto-flush-lock*
+ (lambda ()
+ (pushnew stream *auto-flush-streams*)
+ (unless *auto-flush-thread*
+ (setq *auto-flush-thread*
+ (spawn #'flush-streams
+ :name "auto-flush-thread"))))))
+
+ (defmethod stream-finish-output ((stream stream))
+ (finish-output stream))
+
+ (defun flush-streams ()
+ (loop
+ (call-with-recursive-lock-held
+ *auto-flush-lock*
+ (lambda ()
+ (setq *auto-flush-streams*
+ (remove-if (lambda (x)
+ (not (and (open-stream-p x)
+ (output-stream-p x))))
+ *auto-flush-streams*))
+ (mapc #'stream-finish-output *auto-flush-streams*)))
+ (sleep *auto-flush-interval*)))
+
+ )
+

0 comments on commit 5415cdc

Please sign in to comment.