Skip to content

Commit

Permalink
#5 add in ZeroMQ publishing between the same percept in different pro…
Browse files Browse the repository at this point in the history
…cesses; allows direct access to other-process ratios. This could allow external history as well, if extended
  • Loading branch information
eastein committed Oct 22, 2011
1 parent 917d350 commit 2372f67
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 10 deletions.
File renamed without changes.
19 changes: 12 additions & 7 deletions lidless
Expand Up @@ -39,7 +39,6 @@ class ThreadManager(object) :

def usage() :
# TODO take a 'what you did wrong' string and display it with the usage
raise RuntimeError
print 'usage: lidless <configfile> [role]\n\n\tconfigfile should be json\n\trole is default, allows selecting certain roles for processing in different processes.'
sys.exit(1)

Expand All @@ -48,9 +47,12 @@ def all_in(stanza, keys) :
if k not in stanza :
usage()

def roled(role, stanza, thr) :
def roled(role, stanza, thr, deactivate=False) :
if stanza['role'] != role :
thr.stop()
if deactivate :
thr.deactivate()
else :
thr.stop()
return True
return False

Expand Down Expand Up @@ -96,19 +98,22 @@ if __name__ == '__main__' :
continue

auth = None
zm_auth_hash_secret = None
zm_ahs = None
zmq_url = None
if 'username' in stanza or 'password' in stanza :
all_in(stanza, ['username', 'password'])
auth = stanza['username'], stanza['password']
if 'zm_auth_hash_secret' in stanza :
zm_auth_hash_secret = stanza['zm_auth_hash_secret']
perc = percept.Percept(stanza['name'], stanza['url'], auth=auth, zm_auth_hash_secret=zm_auth_hash_secret)
zm_ahs = stanza['zm_auth_hash_secret']
if 'zmq_url' in stanza :
zmq_url = stanza['zmq_url']
perc = percept.Percept(stanza['name'], stanza['url'], auth=auth, zm_auth_hash_secret=zm_ahs, zmq_url=zmq_url)
t.add(perc)

perc.history = historical.History(perc.camname, perc)
t.add(perc.history)

disqualified &= roled(role, stanza, perc)
disqualified &= roled(role, stanza, perc, deactivate=True)
disqualified &= roled(role, stanza, perc.history)

percepts[perc.camname] = perc
Expand Down
5 changes: 4 additions & 1 deletion lidlessweb.py
Expand Up @@ -11,7 +11,7 @@
import ramirez.mcore.events
import os.path

CAM_MATCH = re.compile('^/api/([^/]+)/.*$')
CAM_MATCH = re.compile('^/api/([^/]+)(|/.*)$')
DEFAULT_RANGE_MATCH = re.compile('^/api/[^/]+/(ticks|history)$')
RANGE_MATCH = re.compile('^/api/[^/]+/history/([0-9]+)$')

Expand Down Expand Up @@ -207,6 +207,9 @@ def _wj(self, j) :

@tornado.web.asynchronous
def get(self, *a):
cn = self.__class__.__name__
cn += ' ' * (14 - len(cn))
print '[json/%s] %s GET args %s' % (cn, self.request.remote_ip, str(a))
result = self.process_request(*a)
if isinstance(result, (futures.Future, ProtoFuture)) :
result.add_done_callback(self.handle_response)
Expand Down
47 changes: 45 additions & 2 deletions percept.py
@@ -1,12 +1,14 @@
import zmstream
import cv
import zmq
import math
import pprint
import os
import sys
import tempfile
import threading
import time
import json

SOCKET_RETRY_SEC = 10
NO_FRAME_THR = 10
Expand All @@ -17,12 +19,14 @@
BUSY_THR = FPS * BUSY_SEC

class Percept(threading.Thread) :
def __init__(self, camname, url, auth=None, zm_auth_hash_secret=None) :
def __init__(self, camname, url, auth=None, zm_auth_hash_secret=None, zmq_url=None) :
self.camname = camname
self.url = url
self.auth = auth
self.zm_auth_hash_secret = zm_auth_hash_secret
self.zmq_url = zmq_url
self.ok = True
self.active = True
self.frame_time = None
self.ratio_busy = None
threading.Thread.__init__(self)
Expand Down Expand Up @@ -182,6 +186,12 @@ def ratio_lte_thr(self, img, thr) :
c += 1
return c / float(width * height)

def deactivate(self) :
if self.zmq_url is not None :
self.active = False
else :
self.stop()

def stop(self) :
self.ok = False
if hasattr(self, 'streamer') :
Expand Down Expand Up @@ -210,7 +220,7 @@ def busy(self) :
return None

return self.ratio_busy
else :
elif not self.active : #FIXME database based sharing is unreliable
if hasattr(self, 'history') :
b = self.history.busy
if b is not None :
Expand All @@ -223,7 +233,33 @@ def checkedwait(self, secs) :
break
time.sleep(0.1)

def run_zmq(self) :
c = zmq.Context(1)
s = c.socket(zmq.SUB)
s.connect(self.zmq_url)
s.setsockopt (zmq.SUBSCRIBE, "")

while self.ok :
r, w, x = zmq.core.poll.select([s], [], [], 0.1)
if r :
msg = s.recv()

msg = json.loads(msg)
if msg['camname'] == self.camname :
self.frame_time = msg['frame_time']
self.ratio_busy = msg['ratio_busy']

def run(self) :
zmq_socket = None
if self.zmq_url is not None :
if self.active :
c = zmq.Context(1)
zmq_socket = c.socket(zmq.PUB)
zmq_socket.bind(self.zmq_url)
else :
self.run_zmq()
return

edge_bin = {}
history = None

Expand Down Expand Up @@ -254,6 +290,13 @@ def run(self) :
history = self.frames_ago(blob_motion, history)

self.ratio_busy = self.ratio_lte_thr(history, BUSY_THR)
if zmq_socket is not None :
msg = {
'camname' : self.camname,
'ratio_busy' : self.ratio_busy,
'frame_time' : ts
}
zmq_socket.send(json.dumps(msg))
print '%s ratio busy: %0.3f' % (self.camname, self.ratio_busy)
#cv.SaveImage('cumulative.png', history)

Expand Down

0 comments on commit 2372f67

Please sign in to comment.