Skip to content

Commit

Permalink
simple monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
popravich committed Jan 13, 2015
1 parent cff5158 commit 41b49a1
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions aiozmq/cli/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

def get_arguments():
ap = argparse.ArgumentParser(description="ZMQ Proxy tool")
# ap.set_defaults(action=lambda opt: ap.print_help())
ap.set_defaults(action=lambda opt: ap.print_help())

def common_arguments(ap):
ap.add_argument('--front-bind', metavar="ADDR", action='append',
Expand Down Expand Up @@ -55,8 +55,12 @@ def common_arguments(ap):
sub = parsers.add_parser(
'monitor',
help="Connects/binds to monitor socket and dumps all traffic")
sub.set_defaults(actions=monitor)

sub.set_defaults(action=monitor)
sub.add_argument('--connect', metavar="ADDR",
help="Connect to monitor socket")
sub.add_argument('--bind', metavar="ADDR",
help="Bind monitor socket")
sub.add_argument('--echo', default=False, action='store_true')
return ap


Expand Down Expand Up @@ -91,8 +95,10 @@ def serve_proxy(options):
bind_connect(back, options.back_bind, options.back_connect)
try:
if monitor:
print("Connecting with capture socket")
zmq.proxy(front, back, monitor)
else:
print("Connecting without capture socket")
zmq.proxy(front, back)
except:
return
Expand All @@ -101,27 +107,6 @@ def serve_proxy(options):
back.close()


def serve_proxy_async(cfg):
ctx = zmq.Context().instance()

TYPES = {
'queue': (zmq.ROUTER, zmq.DEALER),
'forwarder': (zmq.XSUB, zmq.XPUB),
'streamer': (zmq.PULL, zmq.PUSH),
}
front_type, back_type = TYPES[cfg['type']]

front = ctx.socket(front_type)
back = ctx.socket(back_type)

bind_connect(front, cfg['front-bind'], cfg['front-connect'])
bind_connect(back, cfg['back-bind'], cfg['back-connect'])
try:
zmq.proxy(front, back)
except:
return


def bind_connect(sock, bind=None, connect=None):
if bind:
for address in bind:
Expand All @@ -132,7 +117,27 @@ def bind_connect(sock, bind=None, connect=None):


def monitor(options):
pass
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.SUB)

bind = [options.bind] if options.bind else []
connect = [options.connect] if options.connect else []
bind_connect(sock, bind, connect)
sock.setsockopt(zmq.SUBSCRIBE, b'')

try:
while True:
try:
msg = sock.recv(zmq.NOBLOCK)
except Exception as err:
print("Error: {!r}".format(err))
else:
print("Got message")
if options.echo:
print(msg)
finally:
sock.close()
ctx.term()


if __name__ == "__main__":
Expand Down

0 comments on commit 41b49a1

Please sign in to comment.