Permalink
Browse files

added code for 0.6 py-amqp library

git-svn-id: http://evserver.googlecode.com/svn/trunk@84 e9bb6d7e-af12-11dd-bad7-87afd3b02348
  • Loading branch information...
1 parent 7b3f15f commit 658f13390d9b63516f52ac5d61db412fbb0a5933 majek04 committed Feb 9, 2009
Showing with 105 additions and 3 deletions.
  1. +2 −2 evserver/examples/amqp.py
  2. +97 −0 evserver/examples/amqp06.py
  3. +6 −1 evserver/status.py
@@ -1,5 +1,5 @@
"""
-Based on Barry Pederson code. Requires py-amqplib package.
+Based on Barry Pederson code. Requires py-amqplib 0.5 package.
To run consumer server:
evserver --exec="import evserver.examples.amqp; application=evserver.examples.amqp.wsgi_subscribe"
@@ -9,7 +9,7 @@
import sys
import time
-import amqplib.client_0_8 as blamqp
+import amqplib.client_0_8 as blamqp
import amqplib.nbclient_0_8 as nbamqp
def publish(msg_body):
@@ -0,0 +1,97 @@
+"""
+Based on Barry Pederson code. Requires py-amqplib 0.6 package.
+
+To run consumer server:
+ evserver --exec="import evserver.examples.amqp06; application=evserver.examples.amqp06.wsgi_subscribe"
+To produce message:
+ python -c "import evserver.examples.amqp06; evserver.examples.amqp06.publish('Hello World')"
+"""
+import sys
+import time
+import os.path
+import socket
+import logging
+log = logging.getLogger(os.path.basename(__file__))
+
+
+import amqplib.client_0_8 as amqp
+logging.getLogger('amqplib').setLevel(logging.INFO) # ignore msgs from there
+
+
+def publish(msg_body):
+ conn = amqp.Connection('localhost', userid='guest', password='guest')
+ ch = conn.channel()
+ ch.access_request('/data', active=True, write=True)
+ ch.exchange_declare('myfan', 'fanout', auto_delete=True)
+
+ msg = amqp.Message(msg_body, content_type='text/plain')
+ ch.basic_publish(msg, 'myfan')
+
+ ch.close()
+ conn.close()
+
+
+def set_ridiculously_high_buffers(sd):
+ for flag in [socket.SO_SNDBUF, socket.SO_RCVBUF]:
+ for i in range(10):
+ bef = sd.getsockopt(socket.SOL_SOCKET, flag)
+ sd.setsockopt(socket.SOL_SOCKET, flag, bef*2)
+ aft = sd.getsockopt(socket.SOL_SOCKET, flag)
+ if aft <= bef or aft >= 16777216: # 16M
+ break
+
+
+
+def wsgi_subscribe(environ, start_response):
+ start_response("200 OK", [('Content-type','text/plain')])
+
+ msgs = []
+ def callback(msg):
+ msgs.append(msg.body)
+ msg.channel.basic_ack(msg.delivery_tag)
+
+ t0 = time.time()
+ conn = amqp.Connection('localhost', userid='guest', password='guest')
+
+ ch = conn.channel()
+ ch.access_request('/data', active=True, read=True)
+
+ ch.exchange_declare('myfan', 'fanout', auto_delete=True)
+ qname, _, _ = ch.queue_declare()
+ ch.queue_bind(qname, 'myfan')
+ ch.basic_consume(qname, callback=callback)
+
+ sd = conn.transport.sock
+ sd.setblocking(False)
+ set_ridiculously_high_buffers(sd)
+
+ try:
+ yield 'setting up an amqp connection took %.3fms\n' % ((time.time()-t0)*1000.0, )
+
+ while ch.callbacks:
+ t0 = time.time()
+ try:
+ while True: # until exception
+ ch.wait()
+ except (TypeError,), e:
+ pass
+
+ yield '%.3fms: %r\n' % ((time.time()-t0)*1000.0, msgs)
+ while msgs:
+ msgs.pop()
+
+ yield environ['x-wsgiorg.fdevent.readable'](conn.transport.sock)
+ except GeneratorExit:
+ pass
+
+ try:
+ ch.close()
+ except Exception:
+ pass
+ try:
+ conn.close()
+ except Exception:
+ pass
+ return
+
+
View
@@ -49,12 +49,17 @@ def status_page(environ, start_response):
pagesize = resource.getpagesize()
vm, rm, sm = meminfo.memory()
+ gc0, gc1, gc2 = gc.get_count()
s.append(
- '''######## PID:%(pid)i total events:%(event_counter)i objects in memory:%(gc_objects)i file descriptors:%(file_descriptors)i/%(max_descriptors)i \n'''
+ '''######## PID:%(pid)i total events:%(event_counter)i python objects, unreachable:%(gc_unreachable)i total:%(gc_objects)i dirty:%(gc0)i/%(gc1)i/%(gc2)i file descriptors:%(file_descriptors)i/%(max_descriptors)i \n'''
'''######## virt memory:%(vm).0fMiB res memory:%(rm).0fMiB sys cpu time:%(sys).3fs user:%(user).3fs context switches, voluntary:%(vcs)i involuntary:%(ics)i \n''' %
{
'pid':pid,
'event_counter':server.event_counter,
+ 'gc0': gc0,
+ 'gc1': gc1,
+ 'gc2': gc2,
+ 'gc_unreachable': len(gc.garbage),
'gc_objects':len(gc.get_objects()),
'file_descriptors':len(os.listdir('/proc/%i/fd' % pid)),
'max_descriptors':resource.getrlimit(resource.RLIMIT_NOFILE)[1],

0 comments on commit 658f133

Please sign in to comment.