Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

zmqCentral: log messages with binary attachments

  • Loading branch information...
commit 5bfbd0684b37e63b21beb5b4e8a4a27a0deef906 1 parent e84d1c2
@trey0 trey0 authored
View
14 geocamUtil/zmq/exampleMessageWithAttachment.txt
@@ -0,0 +1,14 @@
+Content-Type: multipart/mixed; boundary="d3b9176a-9e55-4afd-aca4-5f1a09fce37b"
+
+--d3b9176a-9e55-4afd-aca4-5f1a09fce37b
+Content-Disposition: inline
+Content-Type: application/json; charset="utf-8"
+
+{"cameraId": 1}
+--d3b9176a-9e55-4afd-aca4-5f1a09fce37b
+Content-Disposition: attachment; filename="processedData.bmp"
+Content-Type: image/bmp
+Content-Transfer-Encoding: binary
+
+hello
+--d3b9176a-9e55-4afd-aca4-5f1a09fce37b
View
41 geocamUtil/zmq/testPublishAttachments.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+
+import logging
+
+from zmq.eventloop import ioloop
+ioloop.install()
+
+from geocamUtil.zmq.publisher import ZmqPublisher
+from geocamUtil.zmq.util import zmqLoop
+
+TEST_MESSAGE = open('exampleMessageWithAttachment.txt', 'rb').read()
+
+def pubMessage(p):
+ topic = 'dds.Resolve.RESOLVE_CAM_ProcessedImage'
+ body = TEST_MESSAGE
+ logging.debug('publishing: %s:%s', topic, body)
+ p.sendRaw(topic, body)
+
+
+def main():
+ import optparse
+ parser = optparse.OptionParser('usage: %prog')
+ ZmqPublisher.addOptions(parser, 'testPublishAttachments')
+ opts, args = parser.parse_args()
+ if args:
+ parser.error('expected no args')
+ logging.basicConfig(level=logging.DEBUG)
+
+ # set up networking
+ p = ZmqPublisher(**ZmqPublisher.getOptionValues(opts))
+ p.start()
+
+ # start publishing an arbitrary message that central should forward
+ pubTimer = ioloop.PeriodicCallback(lambda: pubMessage(p), 1000)
+ pubTimer.start()
+
+ zmqLoop()
+
+
+if __name__ == '__main__':
+ main()
View
35 geocamUtil/zmq/util.py
@@ -9,6 +9,7 @@
import time
import platform
import datetime
+import email.parser
from zmq.eventloop import ioloop
@@ -17,8 +18,10 @@
DEFAULT_CENTRAL_PUBLISH_PORT = 7816
-def getTimestamp():
- return int(time.time() * 1000000)
+def getTimestamp(posixTime=None):
+ if posixTime is None:
+ posixTime = time.time()
+ return int(posixTime * 1000000)
def getTimestampFields(timestampStr):
@@ -65,5 +68,33 @@ def parseEndpoint(endpoint,
raise ValueError('can\'t resolve endpoint format "%s"' % endpoint)
+def hasAttachments(msg):
+ colonIndex = msg.find(':')
+ ctype = ':Content-Type: '
+ return msg[colonIndex : (colonIndex + len(ctype))] == ctype
+
+def parseMessageBody(body):
+ if body.startswith('Content-Type:'):
+ msg = email.parser.Parser().parsestr(body)
+ assert msg.is_multipart()
+ jsonSection = msg.get_payload()[0]
+ attachments = msg.get_payload()[1:]
+
+ if attachments:
+ # parser quirk: remove last section if it's blank
+ lastSectionText = attachments[-1].get_payload()
+ if isinstance(lastSectionText, basestring) and re.match('^\s*$', lastSectionText):
+ attachments.pop()
+
+ return {'json': jsonSection.get_payload(), 'attachments': attachments}
+ else:
+ return {'json': body, 'attachments': []}
+
+def parseMessage(msg):
+ topic, body = msg.split(':', 1)
+ parsed = parseMessageBody(body)
+ parsed['topic'] = topic
+ return parsed
+
def zmqLoop():
ioloop.IOLoop.instance().start()
View
72 geocamUtil/zmq/zmqCentral.py
@@ -12,6 +12,7 @@
import time
import traceback
import atexit
+import random
import zmq
from zmq.eventloop.zmqstream import ZMQStream
@@ -25,7 +26,9 @@
DEFAULT_CENTRAL_SUBSCRIBE_PORT, \
DEFAULT_CENTRAL_PUBLISH_PORT, \
getTimestamp, \
- parseEndpoint
+ parseEndpoint, \
+ hasAttachments, \
+ parseMessage
THIS_MODULE = 'zmqCentral'
DEFAULT_KEEPALIVE_US = 10000000
@@ -49,12 +52,45 @@ def announceDisconnect(self, moduleName):
% (moduleName,
json.dumps({'timestamp': str(getTimestamp())})))
- def logMessage(self, msg):
+ def logMessage(self, msg, posixTime=None, attachmentDir='-'):
mlog = self.messageLog
- mlog.write('@@@ %d %d ' % (getTimestamp(), len(msg)))
+ mlog.write('@@@ %d %d %s ' % (getTimestamp(posixTime), len(msg), attachmentDir))
mlog.write(msg)
mlog.write('\n')
+ def logMessageWithAttachments0(self, msg):
+ parsed = parseMessage(msg)
+ posixTime = time.time()
+
+ # construct attachment directory
+ dt = datetime.datetime.utcfromtimestamp(posixTime)
+ dateText = dt.strftime('%Y-%m-%d')
+ timeText = dt.strftime('%H-%M-%S') + '.%06d' % dt.microsecond
+ uniq = '%08x' % random.getrandbits(32)
+ attachmentSuffix = os.path.join('attachments',
+ dateText,
+ timeText,
+ parsed['topic'],
+ uniq)
+ attachmentPath = os.path.join(self.logDir, attachmentSuffix)
+ os.makedirs(attachmentPath)
+
+ # write attachments to attachment directory
+ for attachment in parsed['attachments']:
+ fullName = os.path.join(attachmentPath, attachment.get_filename())
+ open(fullName, 'wb').write(attachment.get_payload())
+
+ # log message with a pointer to the attachment directory
+ self.logMessage(':'.join((parsed['topic'], parsed['json'])),
+ posixTime,
+ attachmentSuffix)
+
+ def logMessageWithAttachments(self, msg):
+ try:
+ return self.logMessageWithAttachments0(msg)
+ except: # pylint: disable=W0702
+ self.logException('logging message with attachments')
+
def handleHeartbeat(self, params):
moduleName = params['module'].encode('utf-8')
now = getTimestamp()
@@ -75,21 +111,27 @@ def handleHeartbeat(self, params):
def handleInfo(self):
return self.info
+ def logException(self, whileClause):
+ errClass, errObject, errTB = sys.exc_info()[:3]
+ errText = '%s.%s: %s' % (errClass.__module__,
+ errClass.__name__,
+ str(errObject))
+ logging.warning(''.join(traceback.format_tb(errTB)))
+ logging.warning(errText)
+ logging.warning('[error while %s at time %s]' % (whileClause, getTimestamp()))
+
def handleMessages(self, messages):
for msg in messages:
- self.logMessage(msg)
+ if hasAttachments(msg):
+ self.logMessageWithAttachments(msg)
+ else:
+ self.logMessage(msg)
if msg.startswith('central.heartbeat.'):
try:
_topic, body = msg.split(':', 1)
self.handleHeartbeat(json.loads(body))
except: # pylint: disable=W0702
- errClass, errObject, errTB = sys.exc_info()[:3]
- errText = '%s.%s: %s' % (errClass.__module__,
- errClass.__name__,
- str(errObject))
- logging.warning(''.join(traceback.format_tb(errTB)))
- logging.warning(errText)
- logging.warning('[error while handling heartbeat %s]', msg)
+ self.logException('handling heartbeat')
def handleRpcCall(self, messages):
for msg in messages:
@@ -111,13 +153,7 @@ def handleRpcCall(self, messages):
'error': None,
'id': callId}))
except: # pylint: disable=W0702
- errClass, errObject, errTB = sys.exc_info()[:3]
- errText = '%s.%s: %s' % (errClass.__module__,
- errClass.__name__,
- str(errObject))
- logging.warning(''.join(traceback.format_tb(errTB)))
- logging.warning(errText)
- logging.warning('while handling rpc message: %s', msg)
+ self.logException('handling rpc message')
self.rpcStream.send(json.dumps({'result': None,
'error': errText,
'id': callId}))
Please sign in to comment.
Something went wrong with that request. Please try again.