Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Branch: master
Fetching contributors…

Cannot retrieve contributors at this time

83 lines (66 sloc) 2.269 kB
"""A lite weight actors implementation for python
"""
from threading import Thread, currentThread
from Queue import Queue
import time
def basic_actorify(o):
"""Add the basic message queue and thread to a given object to make it
ready for actorification.
This installs the message queue, and thread object with a daemonic mainloop.
"""
o._messages = Queue()
o._thread = Thread(target=loop_over_queue(o._messages))
o._thread.setDaemon(True)
o._thread.start()
def message(method):
"""A message is a method that when called will be run inside the actor.
"""
def handler(self, *args, **kwargs):
if currentThread() is not self._thread:
self._messages.put(lambda:method(self, *args, **kwargs))
else:
method(self, *args, **kwargs)
return handler
def sync_message(method):
"""A sync_message is a method that when called will be run inside the actor,
but the calling thread will wait till the actor generates a return value.
"""
def handler(self, *args, **kwargs):
if currentThread() is not self._thread:
result = Queue(maxsize=1)
self._messages.put(lambda:result.put(method(self, *args, **kwargs)))
return result.get()
else:
return method(self, *args, **kwargs)
return handler
def loop_over_queue(q):
"""A simple forever loop over a queue, executing the actions in it
"""
def loop():
while True: q.get()()
return loop
def alarm(seconds, callback):
def waiter():
time.sleep(seconds)
callback()
Thread(target=waiter).start()
def fire_and_forget(func, *args, **kwargs):
"""Runs a function in its own thread.
func shouldnt mess with it args.
"""
Thread(target=lambda:func(*args, **kwargs)).start()
class ThreadComplete(Exception):
pass
def wait_for_complete(fun):
"""A function decorator that will catch ThreadComplete for you
and ignore it, while letting other exceptions through.
"""
def inner(*args, **kwargs):
try:
fun(*args, **kwargs)
except ThreadComplete, e:
pass
return inner
@message
def complete(self):
raise ThreadComplete()
Jump to Line
Something went wrong with that request. Please try again.