Permalink
Browse files

initial check-in as a git repo

  • Loading branch information...
0 parents commit e0a1900d424d8902af3e9f4e7ae8c267e2c7f6b4 @trey0 trey0 committed Jul 19, 2010
@@ -0,0 +1 @@
+.svn
@@ -0,0 +1,5 @@
+
+from PubSubWrapper import PubSubWrapper
+
+class CorbaWrapper(PubSubWrapper):
+ pollForMessages = True
@@ -0,0 +1,130 @@
+
+import sys
+import SharedScheduler
+from comExceptions import *
+
+class Dispatcher:
+ def __init__(self, moduleName, scheduler = None, period = 0.1):
+ if scheduler == None:
+ import SharedScheduler
+ scheduler = SharedScheduler.scheduler
+ self._moduleName = moduleName
+ self._scheduler = scheduler
+ self._period = period
+ self._handlers = {}
+ self._protocols = {}
+ self._polling = False
+
+ def connectToNotificationService(self, notificationService):
+ protocol, addr = self._splitProtocol(notificationService)
+ protocol.connectToNotificationService(self._moduleName, addr)
+ self._startHandling()
+
+ def getProtocol(self, protoName, endpoint=None):
+ if endpoint == None:
+ endpoint = protoName
+ try:
+ return self._protocols[protoName]
+ except KeyError:
+ protocol = self._loadProtocol(protoName, endpoint)
+ self._protocols[protoName] = protocol
+ return protocol
+
+ def subscribe(self, event, handler):
+ if event in self._handlers:
+ raise comExceptions.EventAlreadyBoundError(event)
+ else:
+ self._handlers[event] = handler
+ protocol, addr = self._splitProtocol(event)
+ protocol.subscribe(addr)
+
+ def unsubscribe(self, event):
+ protocol, eventName = self._splitProtocol(event)
+ protocol.unsubscribe(addr)
+ del self._handlers[eventName]
+
+ def publish(self, event, data):
+ protocol, eventName = self._splitProtocol(event)
+ protocol.publish(eventName, data)
+
+ def handleMessage(self):
+ for protocolName, protocol in self._protocols.iteritems():
+ if protocol.pollForMessages and protocol.isMessage():
+ eventName, data = protocol.getMessage()
+ event = ':'.join((protocolName, eventName))
+ self._handlers[event](event, data)
+ return True
+ return False
+
+ def handleMessages(self):
+ while self.handleMessage():
+ pass
+
+ def connect(self, endpoint, lineMode=True, readHandler=None, connectHandler=None,
+ lineHandler=None):
+ protocol, addr = self._splitProtocol(endpoint)
+ sock = protocol.connect(self, addr,
+ dict(lineMode=lineMode,
+ readHandler=readHandler,
+ connectHandler=connectHandler,
+ lineHandler=lineHandler))
+ self._startHandling()
+ return sock
+
+ def listen(self, endpoint, maxConnections=1, lineMode=True, acceptHandler=None,
+ connectHandler=None, readHandler=None, lineHandler=None, createSocketHandler=None):
+ protocol, addr = self._splitProtocol(endpoint)
+ sock = protocol.listen(self, addr,
+ dict(maxConnections=maxConnections,
+ lineMode=lineMode,
+ acceptHandler=acceptHandler,
+ createSocketHandler=createSocketHandler,
+ connectHandler=connectHandler,
+ readHandler=readHandler,
+ lineHandler=lineHandler))
+ self._startHandling()
+ return sock
+
+ def findServices(self, protoName, announceServices=[], serviceHandler=None):
+ if ':' in protoName:
+ protoName, _ = self._splitProtocol(protoName)
+ self.getProtocol(protoName).findServices(announceServices, serviceHandler)
+
+ def runForever(self):
+ return self._scheduler.runForever()
+
+ def waitForResponse(self, collectResponseHandler):
+ return self._scheduler.waitForResponse(collectResponseHandler)
+
+ def close(self):
+ if self._polling:
+ self._scheduler.cancelPeriodic(self._handleMessagesTimer)
+ for protocol in self._protocols.itervalues():
+ protocol.close()
+
+ def _loadProtocol(self, protoName, endpoint):
+ if protoName == 'console':
+ protoName = 'file' # alias
+ implName = '%sWrapper' % protoName.capitalize() # e.g. TcpWrapper
+ modName = 'irgCom.%s' % implName # e.g. irgCom.tcpWrapper
+ try:
+ __import__(modName)
+ except ImportError:
+ raise BadEndpointError("can't load protocol", endpoint)
+ mod = sys.modules[modName]
+ cls = getattr(mod, implName) # e.g. irgCom.TcpWrapper.TcpWrapper
+ return cls(self, protoName)
+
+ def _splitProtocol(self, endpoint):
+ try:
+ protoName, rest = endpoint.split(':', 1)
+ except ValueError:
+ raise BadEndpointError('not in form "protocol:address"', endpoint)
+ return self.getProtocol(protoName), rest
+
+ def _startHandling(self, period=0.1):
+ if not self._polling:
+ if sum([p.pollForMessages for p in self._protocols.itervalues()]):
+ self._handleMessagesTimer = (self._scheduler.enterPeriodic
+ (period=self._period, action=self.handleMessages))
+ self._polling = True
@@ -0,0 +1,66 @@
+
+import asyncore, sys, traceback, re
+
+OPTS_KEYS = ('lineMode', 'connectHandler', 'lineHandler')
+
+class FileStreamSocket(asyncore.file_dispatcher):
+ """Implement part of async_chat but on top of asyncore.file_dispatcher instead
+ of asyncore.dispatcher. Also implement default handlers for simple line-oriented
+ I/O."""
+ def __init__(self, optsDict):
+ for k in OPTS_KEYS:
+ setattr(self, '_' + k, optsDict[k])
+ assert self._lineMode # wrapper only currently supports line buffering
+
+ def connect(self):
+ inputFd = sys.stdin.fileno()
+ self._outputFile = sys.stdout
+ asyncore.file_dispatcher.__init__(self, inputFd)
+ self.set_terminator('\n') # default, can change
+ self._ibuffer = []
+
+ def write(self, text):
+ self._outputFile.write(text)
+
+ def set_terminator(self, terminator):
+ self._terminator = terminator
+
+ def handle_connect(self):
+ if self._connectHandler:
+ self._connectHandler(self)
+
+ def writable(self):
+ return False
+
+ def handle_read(self):
+ remainingData = self.read(1024)
+ while 1:
+ remainingData = self._processUpToTerminator(remainingData)
+ if not remainingData:
+ break
+
+ def _processUpToTerminator(self, data):
+ termIndex = data.find(self._terminator)
+ if termIndex == -1:
+ self.collect_incoming_data(data)
+ return ''
+ else:
+ self.collect_incoming_data(data[:termIndex])
+ self.found_terminator()
+ return data[(termIndex+1):]
+
+ def collect_incoming_data(self, data):
+ self._ibuffer.append(data)
+
+ def found_terminator(self):
+ line = ''.join(self._ibuffer)
+ line = re.sub(r'\r$', '', line)
+ self.handleLine(line)
+ self._ibuffer = []
+
+ def handle_error(self):
+ raise # pass the buck to scheduler error handling
+
+ def handleLine(self, line):
+ if self._lineHandler != None:
+ self._lineHandler(self, line)
@@ -0,0 +1,15 @@
+
+from FileStreamSocket import FileStreamSocket
+
+class FileWrapper:
+ pollForMessages = False
+
+ def __init__(self, dispatcher, protoName):
+ self._dispatcher = dispatcher
+ self._protoName = protoName
+
+ def connect(self, dispatcher, unusedAddr, optsDict):
+ sock = FileStreamSocket(optsDict)
+ assert unusedAddr == ''
+ sock.connect()
+ return sock
@@ -0,0 +1,6 @@
+
+from cmuIpcPackage import CmuIpc
+from PubSubWrapper import PubSubWrapper
+
+class IpcWrapper(CmuIpc, PubSubWrapper):
+ pollForMessages = True
@@ -0,0 +1,14 @@
+
+class PubSubBaseSocket(object):
+ def __init__(self, protocol, dispatcher):
+ self._protocol = protocol
+ self._dispatcher = dispatcher
+ self._subscriptions = []
+
+ def _subscribe(self, name, handler):
+ self._protocol.subscribeWithHandler(name, handler)
+ self._subscriptions.append(name)
+
+ def close(self):
+ for sub in self._subscriptions:
+ self._dispatcher.unsubscribe(sub)
@@ -0,0 +1,47 @@
+
+import time
+from PubSubBaseSocket import PubSubBaseSocket
+from PubSubStreamSocket import PubSubStreamSocket
+
+OPTS_KEYS = ('maxConnections', 'acceptHandler', 'createSocketHandler')
+
+class PubSubListenSocket(PubSubBaseSocket):
+ def __init__(self, protocol, dispatcher, optsDict):
+ super(PubSubListenSocket, self).__init__(protocol, dispatcher)
+ for k in OPTS_KEYS:
+ setattr(self, '_'+k, optsDict[k])
+ self._optsDict = optsDict
+
+ def listen(self, listenEvent):
+ self.endpoint = '%s:%s' % (self._protocol.protoName, listenEvent)
+ self._listenEvent = listenEvent
+ self._protocol.subscribeWithHandler(self._listenEvent, self._pingHandler)
+
+ def _pingHandler(self, name, data):
+ cmd, self._acceptSendEvent = data.split()
+ assert cmd == 'ping'
+ self.handleAccept()
+ self._acceptSendEvent = None
+
+ def accept(self):
+ recvEvent = '%s-%f' % (self._listenEvent, time.time())
+ self._protocol.publish(self._acceptSendEvent, 'ack ' + recvEvent)
+ acceptSocket = self.handleCreateSocket()
+ acceptSocket._initServerSocket(self._acceptSendEvent, recvEvent)
+ return acceptSocket
+
+ def handleAccept(self):
+ """What to do when a client tries to connect."""
+ # clientSock, clientEventName = self.accept()
+ # ...
+ if self._acceptHandler != None:
+ self._acceptHandler(self)
+ else:
+ self.accept()
+
+ def handleCreateSocket(self):
+ """How to create a stream socket."""
+ if self._createSocketHandler != None:
+ return self._createSocketHandler(self)
+ else:
+ return PubSubStreamSocket(self._protocol, self._dispatcher, self._optsDict)
@@ -0,0 +1,89 @@
+
+import re, platform, os, time
+from PubSubBaseSocket import PubSubBaseSocket
+
+OPTS_KEYS = ('lineMode', 'connectHandler', 'readHandler', 'lineHandler')
+
+class PubSubStreamSocket(PubSubBaseSocket):
+ def __init__(self, protocol, dispatcher, optsDict):
+ super(PubSubStreamSocket, self).__init__(protocol, dispatcher)
+ for k in OPTS_KEYS:
+ setattr(self, '_'+k, optsDict[k])
+ self._clear()
+
+ def _getUniqueName(self):
+ return '%s:%s-%d-%f' % (self._protocol._protoName, platform.node(), os.getpid(), time.time())
+
+ def _clear(self):
+ self._connected = False
+ self._recvBuf = []
+
+ def connect(self, pingEvent):
+ self.endpoint = '%s:%s' % (self._protocol.protoName, pingEvent)
+ self._pingEvent = pingEvent
+ self._recvEvent = self._getUniqueName()
+ self._subscribe(self._recvEvent, self._recvHandler)
+ self._protocol.publish(self._pingEvent, 'ping ' + self._recvEvent)
+
+ def _initServerSocket(self, sendEvent, recvEvent):
+ self._sendEvent = sendEvent
+ self._recvEvent = recvEvent
+ self._subscribe(self._recvEvent, self._recvHandler)
+ self._connected = True
+ self.handleConnect()
+
+ def _recvHandler(self, name, data):
+ if self._connected:
+ if self._lineMode:
+ self._checkForNewLine(data)
+ else:
+ self._recvBuf.append(data)
+ self.handleRead()
+ else:
+ cmd, self._sendEvent = data.split()
+ assert cmd == 'ack'
+ self._connected = True
+ self.handleConnect()
+
+ def _checkForNewLine(self, data):
+ newLineIndex = data.find('\n')
+ if newLineIndex == -1:
+ self._recvBuf.append(data)
+ else:
+ line = ''.join(self._recvBuf) + data[:newLineIndex]
+ line = re.sub('\r$', '', line)
+ self._recvBuf = [data[(newLineIndex+1):]]
+ self.handleLine(line)
+
+ def read(self, numBytes=None):
+ joined = ''.join(self._recvBuf)
+ if numBytes != None and numBytes < len(joined):
+ self._recvBuf = [joined[numBytes:]]
+ return joined[:numBytes]
+ else:
+ self._recvBuf = []
+ return joined
+
+ def write(self, data):
+ self._protocol.publish(self._sendEvent, data)
+
+ push = write # emulate async_chat.push
+
+ def close(self):
+ self._clear()
+ super(PubSubStreamSocket, self).close()
+
+ def handleLine(self, line):
+ """What to do when lineMode is True and we get a new line."""
+ if self._lineHandler != None:
+ self._lineHandler(self, line)
+
+ def handleRead(self):
+ """What to do when lineMode is False and there is data available for reading."""
+ if self._readHandler != None:
+ self._readHandler(self)
+
+ def handleConnect(self):
+ """What to do when there is a valid connection."""
+ if self._connectHandler != None:
+ self._connectHandler(self)
Oops, something went wrong. Retry.

0 comments on commit e0a1900

Please sign in to comment.