Permalink
Browse files

Added threaded model

  • Loading branch information...
1 parent cd4c67d commit f2cf384dc9fb4ddad1f57a6a2cb07f76cb5a83b0 @deavid committed Apr 7, 2011
Showing with 101 additions and 52 deletions.
  1. +4 −0 bjsonrpc/__init__.py
  2. +93 −51 bjsonrpc/connection.py
  3. +4 −1 bjsonrpc/request.py
View
@@ -23,6 +23,10 @@
"exceptions"
]
+bjsonrpc_options = {
+ 'threaded' : False
+}
+
from bjsonrpc.main import createserver, connect
import bjsonrpc.server
View
@@ -38,6 +38,7 @@
from bjsonrpc.proxies import Proxy
from bjsonrpc.request import Request
from bjsonrpc.exceptions import EofError, ServerError
+from bjsonrpc import bjsonrpc_options
import bjsonrpc.jsonlib as json
import select
@@ -215,7 +216,12 @@ def __init__(self, sck, address = None, handler_factory = None):
self.method = Proxy(self, sync_type=1)
self.notify = Proxy(self, sync_type=2)
self._wbuffer = []
-
+ self.write_lock = threading.RLock()
+ self.read_lock = threading.RLock()
+ self.reading_event = threading.Event()
+ self.threaded = bjsonrpc_options['threaded']
+
+
@property
def socket(self):
"""
@@ -449,9 +455,8 @@ def dispatch_until_empty(self):
count += 1
newline_idx = self._buffer.find('\n')
return count
-
-
- def read_and_dispatch(self, timeout=None):
+
+ def read_and_dispatch(self, timeout=None, thread=True, condition=None):
"""
Read one message from socket (with timeout specified by the optional
argument *timeout*) and dispatches that message.
@@ -469,28 +474,56 @@ def read_and_dispatch(self, timeout=None):
been received.
"""
- data = self.read(timeout=timeout)
- if not data:
- return False
+ self.read_lock.acquire()
+ self.reading_event.set()
try:
- item = json.loads(data, self)
- if type(item) is list: # batch call
- for i in item:
- self.dispatch_item(i)
- elif type(item) is dict: # std call
- self.dispatch_item(item)
- else: # Unknown format :-(
- print "Received message with unknown format type:" , type(item)
+ if condition:
+ if condition() == False:
+ return False
+ if thread:
+ dispatch_item = self.dispatch_item_threaded
+ else:
+ dispatch_item = self.dispatch_item_single
+
+ data = self.read(timeout=timeout)
+ if not data:
+ return False
+ try:
+ item = json.loads(data, self)
+ if type(item) is list: # batch call
+ for i in item:
+ dispatch_item(i)
+ elif type(item) is dict: # std call
+ if 'result' in item:
+ self.dispatch_item_single(item)
+ else:
+ dispatch_item(item)
+ else: # Unknown format :-(
+ print "Received message with unknown format type:" , type(item)
+ return False
+ except Exception:
+ print traceback.format_exc()
return False
- except Exception:
- print traceback.format_exc()
- return False
+ return True
+ finally:
+ self.reading_event.clear()
+ self.read_lock.release()
- return True
-
- def dispatch_item(self, item):
+ def dispatch_item_threaded(self, item):
+ if self.threaded:
+ th1 = threading.Thread(target = self.dispatch_item_single, args = [ item ] )
+ th1.start()
+ return True
+ else:
+ return self.dispatch_item_single(item)
+
+
+
+
+
+ def dispatch_item_single(self, item):
"""
Given a JSON item received from socket, determine its type and
process the message.
@@ -591,32 +624,36 @@ def write_line(self, data):
String containing the data to be sent.
"""
assert('\n' not in data)
- if self._debug_socket:
- print "<:%d:" % len(data), data[:80]
+ self.write_lock.acquire()
+ try:
+ if self._debug_socket:
+ print "<:%d:" % len(data), data[:130]
- self._wbuffer += list(str(data + '\n'))
- sbytes = 0
- while len(self._wbuffer) > 0:
- try:
- sbytes = self._sck.send("".join(self._wbuffer))
- except IOError:
- print "Read socket error: IOError (timeout: %s)" % (
- repr(self._sck.gettimeout()) )
- print traceback.format_exc(0)
- return ''
- except socket.error:
- print "Read socket error: socket.error (timeout: %s)" % (
- repr(self._sck.gettimeout()) )
- print traceback.format_exc(0)
- return ''
- except:
- raise
- if sbytes == 0:
- break
- self._wbuffer[0:sbytes] = []
- if len(self._wbuffer):
- print "warn: %d bytes left in write buffer" % len(self._wbuffer)
- return len(self._wbuffer)
+ self._wbuffer += list(str(data + '\n'))
+ sbytes = 0
+ while len(self._wbuffer) > 0:
+ try:
+ sbytes = self._sck.send("".join(self._wbuffer))
+ except IOError:
+ print "Read socket error: IOError (timeout: %s)" % (
+ repr(self._sck.gettimeout()) )
+ print traceback.format_exc(0)
+ return ''
+ except socket.error:
+ print "Read socket error: socket.error (timeout: %s)" % (
+ repr(self._sck.gettimeout()) )
+ print traceback.format_exc(0)
+ return ''
+ except:
+ raise
+ if sbytes == 0:
+ break
+ self._wbuffer[0:sbytes] = []
+ if len(self._wbuffer):
+ print "warn: %d bytes left in write buffer" % len(self._wbuffer)
+ return len(self._wbuffer)
+ finally:
+ self.write_lock.release()
@@ -630,11 +667,16 @@ def read_line(self):
Returns the line of *data* received from the socket.
"""
- data = self._readn()
- if len(data) and self._debug_socket:
- print ">:%d:" % len(data), data[:80]
- return data
-
+
+ self.read_lock.acquire()
+ try:
+ data = self._readn()
+ if len(data) and self._debug_socket:
+ print ">:%d:" % len(data), data[:130]
+ return data
+ finally:
+ self.read_lock.release()
+
def settimeout(self, operation, timeout):
"""
configures a timeout for the connection for a given operation.
View
@@ -96,8 +96,11 @@ def wait(self):
Block until there is a response. Will manage the socket and dispatch
messages until the response is found.
"""
+ #if self.response is None:
+ # self.conn.read_ensure_thread()
+
while self.response is None:
- self.conn.read_and_dispatch()
+ self.conn.read_and_dispatch(condition=lambda: self.response is None)
@property
def value(self):

0 comments on commit f2cf384

Please sign in to comment.