Skip to content
This repository has been archived by the owner on Feb 14, 2018. It is now read-only.

Commit

Permalink
Fix connection handling in the RabbitMQ handler, update the HeapReque…
Browse files Browse the repository at this point in the history
…stHandler and add a HeapRequestHandler output report tool
  • Loading branch information
Gavin M. Roy committed Oct 23, 2012
1 parent 26a4543 commit eeeb56c
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 58 deletions.
9 changes: 6 additions & 3 deletions setup.py
Expand Up @@ -25,7 +25,7 @@
handle.write('include %s\n' % filename)

setup(name='tinman',
version='0.9.2',
version='0.9.3',
description=("Tornado application wrapper and toolset for Tornado "
"development"),
long_description=('Tinman is a take what you need package designed to '
Expand All @@ -50,7 +50,8 @@
'tinman.session',
'tinman.utilities'],
install_requires=requirements,
extras_require={'LDAP': 'python-ldap',
extras_require={'Heapy': 'guppy',
'LDAP': 'python-ldap',
'MsgPack Sessions': 'msgpack',
'PostgreSQL': 'psycopg2',
'RabbitMQ': 'pika',
Expand All @@ -60,5 +61,7 @@
data_files=[(key, data_files[key]) for key in data_files.keys()],
entry_points=dict(console_scripts=['tinman=tinman.controller:main',
'tinman-init=tinman.utilities.'
'initialize:main']),
'initialize:main',
'tinman-heap-report=tinman.utilities.'
'heapy_report:main']),
zip_safe=True)
2 changes: 1 addition & 1 deletion tinman/__init__.py
@@ -1,2 +1,2 @@
__version__ = '0.9.2'
__version__ = '0.9.3'
__desc__ = 'Tinman adds a little more stack to Tornado'
43 changes: 24 additions & 19 deletions tinman/handlers/heapy.py
Expand Up @@ -7,9 +7,14 @@
"""
import guppy
import logging
import re
from tornado import web

LOGGER = logging.getLogger(__name__)
MAX_REFERRER_DEPTH = 4
MAX_ROW_COUNT_PER_LEVEL = 5

REPORT_TOTAL = re.compile('^Partition of a set of ([\d]+) objects\.'
' Total size = ([\d]+) bytes\.')
REPORT_HEADER = re.compile('^ Index Count % Size % Cumulative % (.*)',
Expand All @@ -18,11 +23,13 @@
'([\d]+)\s+([\d]+)\s+([\d]+)\s+(.*)', re.MULTILINE)


def get_report_data(heapy_obj):
def get_report_data(heapy_obj, depth=1):
LOGGER.debug('Getting report data at depth %i', depth)
report = {'total_objects': 0, 'total_bytes': 0, 'rows': []}
totals = REPORT_TOTAL.findall(str(heapy_obj))
report['total_objects'], report['total_bytes'] = (int(totals[0][0]),
int(totals[0][1]))
if totals:
report['total_objects'], report['total_bytes'] = (int(totals[0][0]),
int(totals[0][1]))
items = REPORT_ITEMS.findall(str(heapy_obj))
for index, row in enumerate(items):
report['rows'].append({'item': row[-1],
Expand All @@ -32,6 +39,20 @@ def get_report_data(heapy_obj):
'percent': int(row[4])},
'cumulative': {'value': int(row[5]),
'percent': int(row[6])}})
if depth < MAX_REFERRER_DEPTH:
try:
rows = len(heapy_obj.byrcs[index])
except IndexError:
LOGGER.warning('Could not process item at index %i', index)
report['rows'][index]['error'] = 'Could not get referrers'
continue
if rows > MAX_ROW_COUNT_PER_LEVEL:
rows = MAX_ROW_COUNT_PER_LEVEL
for referrer_index in range(0, rows):
report['rows'][index]['referrers'] =\
get_report_data(heapy_obj.byrcs[index].referrers.byrcs,
depth + 1)

header = REPORT_HEADER.findall(str(heapy_obj))
if header:
report['title'] = header[0]
Expand All @@ -47,21 +68,5 @@ def initialize(self):
def get(self):
heap = self._heapy.heap()
report = get_report_data(heap.byrcs)
rows = len(heap.byrcs)
for index in range(0, 10 if rows > 10 else rows):
report['rows'][index]['referrers'] = \
get_report_data(heap.byrcs[index].referrers.byrcs)
self.write(report)
self.finish()


if __name__ == '__main__':

heapy = guppy.hpy()
heap = heapy.heap()
report = get_report_data(heap.byrcs)
rows = len(heap.byrcs)
for index in range(0, 10 if rows > 10 else rows):
report['rows'][index]['referrers'] = \
get_report_data(heap.byrcs[index].referrers.byrcs)
print report
73 changes: 38 additions & 35 deletions tinman/handlers/rabbitmq.py
Expand Up @@ -20,6 +20,10 @@

from tinman import exceptions

message_stack = list()
pending_rabbitmq_connection = None
rabbitmq_connection = None


class RabbitMQRequestHandler(web.RequestHandler):
"""The request handler will connect to RabbitMQ on the first request,
Expand Down Expand Up @@ -51,13 +55,6 @@ class RabbitMQRequestHandler(web.RequestHandler):
CHANNEL = 'rabbitmq_channel'
CONNECTION = 'rabbitmq_connection'

def __init__(self, application, request, **kwargs):
self._message_stack = list()
self._rabbitmq_channel_opening = False
self._rabbitmq_connection_opening = False
super(RabbitMQRequestHandler, self).__init__(application, request,
**kwargs)

def _add_to_publish_stack(self, exchange, routing_key, message, properties):
"""Temporarily add the message to the stack to publish to RabbitMQ
Expand All @@ -67,13 +64,15 @@ def _add_to_publish_stack(self, exchange, routing_key, message, properties):
:param pika.BasicProperties: The message properties
"""
self._message_stack.append((exchange, routing_key, message, properties))
global message_stack
message_stack.append((exchange, routing_key, message, properties))

def _connect_to_rabbitmq(self):
"""Connect to RabbitMQ and assign a local attribute"""
if not self._rabbitmq_connection_opening:
self._rabbitmq_connection_opening = True
self._set_rabbitmq_connection(self._new_rabbitmq_connection())
global pending_rabbitmq_connection, rabbitmq_connection
if not rabbitmq_connection:
LOGGER.info('Creating a new RabbitMQ connection')
pending_rabbitmq_connection = self._new_rabbitmq_connection()

def _new_message_properties(self, content_type=None, content_encoding=None,
headers=None, delivery_mode=None, priority=None,
Expand Down Expand Up @@ -119,11 +118,11 @@ def _publish_deferred_messages(self):
any requests buffered.
"""
if self._rabbitmq_channel and self._message_stack:
LOGGER.info('Publishing %i deferred message(s)',
len(self._message_stack))
while self._message_stack:
self._publish_message(*self._message_stack.pop())
global message_stack
if not self._rabbitmq_is_closed and message_stack:
LOGGER.info('Publishing %i deferred message(s)', len(message_stack))
while message_stack:
self._publish_message(*message_stack.pop())

def _publish_message(self, exchange, routing_key, message, properties):
"""Publish the message to RabbitMQ
Expand All @@ -135,7 +134,7 @@ def _publish_message(self, exchange, routing_key, message, properties):
"""
if self._rabbitmq_is_closed or not self._rabbitmq_channel:
LOGGER.info('Temporarily buffering message to publish')
LOGGER.warning('Temporarily buffering message to publish')
self._add_to_publish_stack(exchange, routing_key,
message, properties)
return
Expand All @@ -154,12 +153,13 @@ def _rabbitmq_config(self):
raise exceptions.ConfigurationException('rabbitmq')
return config

@property
def _rabbitmq_connection(self):
return getattr(self.application.tinman, self.CONNECTION, None)

@property
def _rabbitmq_channel(self):
"""Return the Pika channel from the tinman object assignment.
:rtype: pika.channel.Channel
"""
return getattr(self.application.tinman, self.CHANNEL, None)

@property
Expand All @@ -169,9 +169,8 @@ def _rabbitmq_is_closed(self):
:rtype: bool
"""
return (not self._rabbitmq_connection or
(not self._rabbitmq_connection.is_open and
not self._rabbitmq_connection_opening))
global rabbitmq_connection
return not rabbitmq_connection and not pending_rabbitmq_connection

@property
def _rabbitmq_parameters(self):
Expand All @@ -191,10 +190,12 @@ def _rabbitmq_parameters(self):
return pika.ConnectionParameters(**kwargs)

def _set_rabbitmq_channel(self, channel):
setattr(self.application.tinman, self.CHANNEL, channel)
"""Assign the channel object to the tinman global object.
:param pika.channel.Channel channel: The pika channel
def _set_rabbitmq_connection(self, connection):
setattr(self.application.tinman, self.CONNECTION, connection)
"""
setattr(self.application.tinman, self.CHANNEL, channel)

def on_rabbitmq_close(self, reply_code, reply_text):
"""Called when RabbitMQ has been connected to.
Expand All @@ -203,9 +204,10 @@ def on_rabbitmq_close(self, reply_code, reply_text):
:param str reply_text: The disconnect reason
"""
global rabbitmq_connection
LOGGER.warning('RabbitMQ has disconnected (%s): %s',
reply_code, reply_text)
self._set_rabbitmq_connection(None)
rabbitmq_connection = None
self._set_rabbitmq_channel(None)
self._connect_to_rabbitmq()

Expand All @@ -215,19 +217,21 @@ def on_rabbitmq_conn_open(self, connection):
:param pika.connection.Connection connection: The pika connection
"""
LOGGER.debug('RabbitMQ has connected')
self._rabbitmq_connection.add_on_close_callback(self.on_rabbitmq_close)
self._rabbitmq_connection_opening = False
self._rabbitmq_connection.channel(self.on_rabbitmq_channel_open)
global pending_rabbitmq_connection, rabbitmq_connection
LOGGER.info('RabbitMQ has connected')
rabbitmq_connection = connection
rabbitmq_connection.add_on_close_callback(self.on_rabbitmq_close)
rabbitmq_connection.channel(self.on_rabbitmq_channel_open)
pending_rabbitmq_connection = None

def on_rabbitmq_channel_open(self, channel):
"""Called when the RabbitMQ accepts the channel open request.
:param pika.channel.Channel channel: The channel opened with RabbitMQ
"""
LOGGER.debug('Channel %i is opened for communication with RabbitMQ',
channel.channel_number)
LOGGER.info('Channel %i is opened for communication with RabbitMQ',
channel.channel_number)
self._set_rabbitmq_channel(channel)
self._publish_deferred_messages()

Expand All @@ -237,6 +241,5 @@ def prepare(self):
"""
super(RabbitMQRequestHandler, self).prepare()
# Connect to RabbitMQ if disconnected
if self._rabbitmq_is_closed:
self._connect_to_rabbitmq()
43 changes: 43 additions & 0 deletions tinman/utilities/heapy_report.py
@@ -0,0 +1,43 @@
#!/usr/bin/env python
"""Will generate a plaintext report from the JSON document created
by the HeapyRequestHandler.
Usage: tinman-heap-report file.json
"""
import json
import os
import sys


def print_row(row, depth):
prefix = ''.join([' ' for offset in range(0, depth * 4)])
item = '%s - %s' % (prefix, row['item'])
parts = [item.ljust(80),
('%(value)s' % (row['count'])).rjust(10),
(' %(percent)s%%' % (row['count'])).rjust(7),
('%(value)s' % (row['size'])).rjust(10),
(' %(percent)s%%' % (row['size'])).rjust(7)]
print ''.join(parts)


def main():
if len(sys.argv) == 1 or not (os.path.exists(sys.argv[1]) and
os.path.isfile(sys.argv[1])):
print 'Usage: tinman-heap-report heap-file.json\n'
sys.exit(-1)
with open(sys.argv[1], "r") as handle:
report = json.load(handle)
print ''.join(['Item'.ljust(80), 'Count'.rjust(17), 'Size'.rjust(17)])
print ''.join(['-' for position in xrange(0, 114)])
for row in report['rows']:
print
print_row(row, 0)
for child in row['referrers']['rows']:
print
print_row(child, 1)
for grandchild in child['referrers']['rows']:
print_row(grandchild, 2)

if __name__ == '__main__':
main()

0 comments on commit eeeb56c

Please sign in to comment.