Skip to content

Commit

Permalink
Refactor configuration and option handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Aug 26, 2016
1 parent 6e6a2e1 commit ec0ee32
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 133 deletions.
133 changes: 8 additions & 125 deletions huey/bin/huey_consumer.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,18 @@
#!/usr/bin/env python

import logging
import optparse
import os
import sys
from logging import FileHandler

from huey.consumer import Consumer
from huey.consumer import ConsumerConfig
from huey.consumer import OptionParserHandler
from huey.utils import load_class


def err(s):
sys.stderr.write('\033[91m%s\033[0m\n' % s)


def get_loglevel(verbose=None):
if verbose is None:
return logging.INFO
elif verbose:
return logging.DEBUG
return logging.ERROR


def setup_logger(loglevel, logfile, worker_type):
if worker_type == 'process':
worker = '%(process)d'
else:
worker = '%(threadName)s'

log_format = ('[%(asctime)s] %(levelname)s:%(name)s:' + worker +
':%(message)s')
logging.basicConfig(level=loglevel, format=log_format)

if logfile:
handler = FileHandler(logfile)
handler.setFormatter(logging.Formatter(log_format))
logging.getLogger().addHandler(handler)


def get_option_parser():
parser = optparse.OptionParser(
'Usage: %prog [options] path.to.huey_instance')

log_opts = parser.add_option_group(
'Logging',
'The following options pertain to the logging system.')
log_opts.add_option('-l', '--logfile',
dest='logfile',
help='write logs to FILE',
metavar='FILE')
log_opts.add_option('-v', '--verbose',
action='store_true',
dest='verbose',
help='log debugging statements')
log_opts.add_option('-q', '--quiet',
action='store_false',
dest='verbose',
help='only log exceptions')

worker_opts = parser.add_option_group(
'Workers',
('By default huey uses a single worker thread. To specify a different '
'number of workers, or a different execution model (such as multiple '
'processes or greenlets), use the options below.'))
worker_opts.add_option('-w', '--workers',
dest='workers',
type='int',
help='number of worker threads/processes (default=1)',
default=1)
worker_opts.add_option('-k', '--worker-type',
dest='worker_type',
help='worker execution model (thread, greenlet, process).',
default='thread',
choices=['greenlet', 'thread', 'process', 'gevent'])
worker_opts.add_option('-d', '--delay',
dest='initial_delay',
type='float',
help='initial delay between polling intervals in seconds (default=0.1)',
default=0.1)
worker_opts.add_option('-m', '--max-delay',
dest='max_delay',
type='float',
help='maximum time to wait between polling the queue (default=10)',
default=10)
worker_opts.add_option('-b', '--backoff',
dest='backoff',
type='float',
help='amount to backoff delay when no results present (default=1.15)',
default=1.15)

scheduler_opts = parser.add_option_group(
'Scheduler',
('By default Huey will run the scheduler once every second to check '
'for tasks scheduled in the future, or tasks set to run at specific '
'intervals (periodic tasks). Use the options below to configure the '
'scheduler or to disable periodic task scheduling.'))
scheduler_opts.add_option('-s', '--scheduler-interval',
dest='scheduler_interval',
type='int',
help='Granularity of scheduler in seconds.',
default=1)
scheduler_opts.add_option('-n', '--no-periodic',
action='store_false',
default=True,
dest='periodic',
help='do NOT schedule periodic tasks')
scheduler_opts.add_option('-u', '--utc',
dest='utc',
action='store_true',
help='use UTC time for all tasks (default=True)',
default=True)
scheduler_opts.add_option('--localtime',
dest='utc',
action='store_false',
help='use local time for all tasks')
return parser


def load_huey(path):
try:
return load_class(path)
Expand All @@ -130,35 +26,22 @@ def load_huey(path):


def consumer_main():
parser = get_option_parser()
parser_handler = OptionParserHandler()
parser = parser_handler.get_option_parser()
options, args = parser.parse_args()

setup_logger(
get_loglevel(options.verbose),
options.logfile,
options.worker_type)

if len(args) == 0:
err('Error: missing import path to `Huey` instance')
err('Example: huey_consumer.py app.queue.huey_instance')
sys.exit(1)

if options.workers < 1:
err('You must have at least one worker.')
sys.exit(1)
config = ConsumerConfig(**options.__dict__)
config.validate()

huey_instance = load_huey(args[0])

consumer = Consumer(
huey_instance,
options.workers,
options.periodic,
options.initial_delay,
options.backoff,
options.max_delay,
options.utc,
options.scheduler_interval,
options.worker_type)
config.setup_logger()
consumer = config.create_consumer(huey_instance)
consumer.run()


Expand Down
138 changes: 130 additions & 8 deletions huey/consumer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import datetime
import logging
import operator
import optparse
import os
import signal
import threading
import time
from collections import defaultdict
from collections import namedtuple
from logging import FileHandler

from multiprocessing import Event as ProcessEvent
from multiprocessing import Process
Expand Down Expand Up @@ -312,40 +314,160 @@ def is_alive(self, proc):
WORKER_THREAD = 'thread'
WORKER_GREENLET = 'greenlet'
WORKER_PROCESS = 'process'
WORKER_TYPES = (WORKER_THREAD, WORKER_GREENLET, WORKER_PROCESS)


worker_to_environment = {
WORKER_THREAD: ThreadEnvironment,
WORKER_GREENLET: GreenletEnvironment,
'gevent': GreenletEnvironment, # Same as greenlet.
'gevent': GreenletEnvironment, # Preserved for backwards-compat.
WORKER_PROCESS: ProcessEnvironment,
}


config_defaults = (
('workers', 1),
('periodic', True),
('worker_type', WORKER_THREAD),
('initial_delay', 0.1),
('backoff', 1.15),
('max_delay', 10.0),
('utc', True),
('scheduler_interval', 1),
('worker_type', WORKER_THREAD),
('periodic', True),
('utc', True),
('logfile', None),
('verbose', None),
)
config_keys = [param for param, _ in config_defaults]


def option(name, **options):
if isinstance(name, tuple):
letter, opt_name = name
else:
opt_name = name.replace('_', '-')
letter = name[0]
options.setdefault('dest', name)
return ('-' + letter, '--' + opt_name, options)


class OptionParserHandler(object):
def get_worker_options(self):
return (
option('workers', default=1, type='int',
help='number of worker threads/processes (default=1)'),
option(('k', 'worker-type'), choices=WORKER_TYPES,
default=WORKER_THREAD, dest='worker_type',
help=('worker execution model (thread, greenlet, '
'process)')),
option('delay', default=0.1, dest='initial_delay',
help='minimum delay polling queue (default=0.1)',
metavar='SECONDS', type='float'),
option('max_delay', default=10, metavar='SECONDS',
help='maximum delay polling queue (default=10)',
type='float'),
option('backoff', default=1.15, metavar='SECONDS',
help='amount to back-off delay when queue is empty',
type='float'),
)

def get_scheduler_options(self):
return (
option('scheduler_interval', default=1, type='int',
help='Granularity of scheduler in seconds.'),
option('no_periodic', action='store_false', default=True,
dest='periodic', help='do NOT enqueue periodic tasks'),
option('utc', action='store_true', default=True,
help='use UTC time for all tasks (default=True)'),
option(('o', 'localtime'), action='store_false', dest='utc',
help='use local time for all tasks'),
)

def get_logging_options(self):
return (
option('logfile', metavar='FILE'),
option('verbose', action='store_true',
help='verbose logging (includes DEBUG statements)'),
option('quiet', action='store_false', dest='verbose',
help='minimal logging (only exceptions/errors)'),
)

def get_option_parser(self):
parser = optparse.OptionParser('Usage: %prog [options] '
'path.to.huey_instance')

def add_group(name, description, options):
group = parser.add_option_group(name, description)
for abbrev, name, kwargs in options:
group.add_option(abbrev, name, **kwargs)

add_group('Logging', 'The following options pertain to logging.',
ConsumerConfig.get_log_options())

add_group('Workers', (
'By default huey uses a single worker thread. To specify a '
'different number of workers, or a different execution model (such'
' as multiple processes or greenlets), use the options below.'),
ConsumerConfig.get_worker_options())

add_group('Scheduler', (
'By default Huey will run the scheduler once every second to check'
' for tasks scheduled in the future, or tasks set to run at '
'specfic intervals (periodic tasks). Use the options below to '
'configure the scheduler or to disable periodic task scheduling.'),
ConsumerConfig.get_scheduler_options())

return parser


class ConsumerConfig(namedtuple('_ConsumerConfig', config_keys)):
def __new__(cls, **kwargs):
config = dict(config_defaults)
config.update(kwargs)
args = [config[key] for key in config_keys]
return super(ConsumerConfig, cls).__new__(cls, *args)

def get_options(self):
return (
('-w', '--workers', {}),
)
def validate(self):
if self.backoff < 1:
raise ValueError('The backoff must be greater than 1.')
if not (0 < self.scheduler_interval <= 60):
raise ValueError('The scheduler must run at least once per '
'minute, and at most once per second (1-60).')

@property
def loglevel(self):
if self.verbose is None:
return logging.INFO
return logging.DEBUG if self.verbose else logging.ERROR

def setup_logger(self, logger=None):
if self.worker_type == 'process':
worker = '%(process)d'
else:
worker = '%(threadName)s'

logformat = ('[%(asctime)s] %(levelname)s:%(name)s:' + worker +
':%(message)s')
loglevel = self.loglevel
if logger is None:
logging.basicConfig(level=loglevel, format=logformat)
logger = logging.getLogger()
else:
logger.setLevel(loglevel)

if self.logfile:
handler = FileHandler(self.logfile)
handler.setFormatter(logging.Formatter(logformat))
logger.addHandler(handler)

def create_consumer(self, huey, **overrides):
options = {}
for key in config_keys:
if key in ('logfile', 'verbose'):
continue
options[key] = getattr(self, key)

options.update(overrides)
return Consumer(huey, **options)


class Consumer(object):
Expand Down

0 comments on commit ec0ee32

Please sign in to comment.