Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add consumer profiling

  • Loading branch information...
commit c432bbca7a1382bdbb561c20a81c9ccb5e5b218f 1 parent 34320e7
Gavin M. Roy authored
Showing with 38 additions and 3 deletions.
  1. +8 −1 rejected/controller.py
  2. +5 −2 rejected/mcp.py
  3. +25 −0 rejected/process.py
View
9 rejected/controller.py
@@ -25,7 +25,8 @@ def _master_control_program(self):
"""
return mcp.MasterControlProgram(self._get_application_config(),
- consumer=self._options.consumer)
+ consumer=self._options.consumer,
+ profile=self._options.profile)
def _prepend_python_path(self, path): #pragma: no cover
"""Add the specified value to the python path.
@@ -76,6 +77,12 @@ def _cli_options(parser):
:param optparse.OptionParser parser: The option parser to add options to
"""
+ parser.add_option('-P', '--profile',
+ action='store',
+ default=None,
+ dest='profile',
+ help='Profile the consumer modules, specifying '
+ 'the output directory.')
parser.add_option('-o', '--only',
action='store',
default=None,
View
7 rejected/mcp.py
@@ -27,11 +27,13 @@ class MasterControlProgram(state.State):
_POLL_RESULTS_INTERVAL = 3.0
_SHUTDOWN_WAIT = 1
- def __init__(self, config, consumer=None):
+ def __init__(self, config, consumer=None, profile=None):
"""Initialize the Master Control Program
:param dict config: The full content from the YAML config file
:param str consumer: If specified, only run processes for this consumer
+ :param str profile: Optional profile output directory to
+ enable profiling
"""
self._set_process_name()
@@ -45,6 +47,7 @@ def __init__(self, config, consumer=None):
self._last_poll_results = dict()
self._poll_data = {'time': 0, 'processes': list()}
self._poll_timer = None
+ self._profile = profile
self._results_timer = None
self._stats = dict()
self._stats_queue = multiprocessing.Queue()
@@ -129,7 +132,6 @@ def _check_consumer_process_counts(self):
LOGGER.debug('Checking minimum consumer process levels')
for name in self._consumers:
for connection in self._consumers[name]['connections']:
-
processes_needed = self._process_spawn_qty(name, connection)
if processes_needed:
LOGGER.debug('Need to spawn %i processes for %s on %s',
@@ -287,6 +289,7 @@ def _new_process(self, consumer_name, connection_name):
kwargs = {'config': self._config,
'connection_name': connection_name,
'consumer_name': consumer_name,
+ 'profile': self._profile,
'stats_queue': self._stats_queue}
return process_name, process.Process(name=process_name, kwargs=kwargs)
View
25 rejected/process.py
@@ -8,6 +8,10 @@
import multiprocessing
import os
import pika
+try:
+ import cProfile as profile
+except ImportError:
+ import profile
from pika.adapters import tornado_connection
import signal
import sys
@@ -593,6 +597,18 @@ def process(self, channel=None, method=None, header=None, body=None):
self.ack_message(method.delivery_tag)
self.reset_state()
+ @property
+ def profile_file(self):
+ if not self._kwargs['profile']:
+ return None
+
+ if os.path.exists(self._kwargs['profile']) and \
+ os.path.isdir(self._kwargs['profile']):
+ return '%s/%s-%s.prof' % (os.path.normpath(self._kwargs['profile']),
+ os.getpid(),
+ self._kwargs['consumer_name'])
+ return None
+
def processing_failure(self):
"""Called when message processing failure happens due to a
ConsumerException or an unhandled exception.
@@ -698,6 +714,15 @@ def reset_state(self):
def run(self):
"""Start the consumer"""
+ if self.profile_file:
+ LOGGER.info('Profiling to %s', self.profile_file)
+ profile.runctx('self._run()', globals(), locals(),
+ self.profile_file)
+ else:
+ self._run()
+
+ def _run(self):
+ """Run method that can be profiled"""
try:
self.setup(self._kwargs['config'],
self._kwargs['connection_name'],
Please sign in to comment.
Something went wrong with that request. Please try again.