Skip to content

Commit

Permalink
Refactor periodic tasks.
Browse files Browse the repository at this point in the history
This review allows periodic tasks to be enabled or disabled in the
decorator, as well as by specifying an interval which is negative.

The spacing between runs of a periodic task is now specified in
seconds, with zero meaning the default spacing which is currently 60
seconds.

There is also a new argument to the decorator which indicates if a
periodic task _needs_ to be run in the nova-compute process. There is
also a flag (run_external_periodic_tasks) which can be used to move
these periodic tasks out of the nova-compute process.

I also remove the periodic_interval flag to services, as the interval
between runs is now dynamic based on the number of seconds that a
periodic task wants to wait for its next run. For callers who want to
twiddle the sleep period (for example unit tests), there is a
create() argument periodic_interval_max which lets the period
periodic_tasks() specifies be overridden. This is not exposed as a
flag because I cannot see a use case for that. It is needed for unit
testing however.

DocImpact. Resolves bug 939087.

Change-Id: I7f245a88b8d229a481c1b65a4c0f1e2769bf3901
  • Loading branch information
mikalstill committed Jan 4, 2013
1 parent 1be4d7b commit 9fb647e
Show file tree
Hide file tree
Showing 14 changed files with 310 additions and 85 deletions.
21 changes: 10 additions & 11 deletions nova/compute/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@
default=120,
help='Interval in seconds for querying the host status'),
cfg.IntOpt("image_cache_manager_interval",
default=40,
help="Number of periodic scheduler ticks to wait between "
"runs of the image cache manager."),
default=2400,
help='Number of seconds to wait between runs of the image '
'cache manager'),
cfg.IntOpt('reclaim_instance_interval',
default=0,
help='Interval in seconds for reclaiming deleted instances'),
Expand Down Expand Up @@ -155,9 +155,9 @@
"Valid options are 'noop', 'log' and 'reap'. "
"Set to 'noop' to disable."),
cfg.IntOpt("running_deleted_instance_poll_interval",
default=30,
help="Number of periodic scheduler ticks to wait between "
"runs of the cleanup task."),
default=1800,
help="Number of seconds to wait between runs of the cleanup "
"task."),
cfg.IntOpt("running_deleted_instance_timeout",
default=0,
help="Number of seconds after being deleted when a running "
Expand Down Expand Up @@ -3115,7 +3115,7 @@ def _report_driver_status(self, context):
capability['host_ip'] = CONF.my_ip
self.update_service_capabilities(capabilities)

@manager.periodic_task(ticks_between_runs=10)
@manager.periodic_task(spacing=600.0)
def _sync_power_states(self, context):
"""Align power states between the database and the hypervisor.
Expand Down Expand Up @@ -3289,8 +3289,7 @@ def update_available_resource(self, context):
new_resource_tracker_dict[nodename] = rt
self._resource_tracker_dict = new_resource_tracker_dict

@manager.periodic_task(
ticks_between_runs=CONF.running_deleted_instance_poll_interval)
@manager.periodic_task(spacing=CONF.running_deleted_instance_poll_interval)
def _cleanup_running_deleted_instances(self, context):
"""Cleanup any instances which are erroneously still running after
having been deleted.
Expand Down Expand Up @@ -3411,8 +3410,8 @@ def remove_aggregate_host(self, context, host, slave_info=None,
aggregate, host,
isinstance(e, exception.AggregateError))

@manager.periodic_task(
ticks_between_runs=CONF.image_cache_manager_interval)
@manager.periodic_task(spacing=CONF.image_cache_manager_interval,
external_process_ok=True)
def _run_image_cache_manager_pass(self, context):
"""Run a single pass of the image cache manager."""

Expand Down
4 changes: 4 additions & 0 deletions nova/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,10 @@ class InvalidUUID(Invalid):
message = _("Expected a uuid but received %(uuid)s.")


class InvalidPeriodicTaskArg(Invalid):
message = _("Unexpected argument for periodic task creation: %(arg)s.")


class ConstraintNotMet(NovaException):
message = _("Constraint not met.")
code = 412
Expand Down
96 changes: 77 additions & 19 deletions nova/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,34 +54,61 @@
"""

import eventlet
import time

from nova.db import base
from nova import exception
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova.openstack.common.plugin import pluginmanager
from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import version


periodic_opts = [
cfg.BoolOpt('run_external_periodic_tasks',
default=True,
help=('Some periodic tasks can be run in a separate process. '
'Should we run them here?')),
]

CONF = cfg.CONF
CONF.register_opts(periodic_opts)
CONF.import_opt('host', 'nova.config')
LOG = logging.getLogger(__name__)

DEFAULT_INTERVAL = 60.0


def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on every tick
1. Without arguments '@periodic_task', this will be run on every cycle
of the periodic scheduler.
2. With arguments, @periodic_task(ticks_between_runs=N), this will be
run on every N ticks of the periodic scheduler.
2. With arguments, @periodic_task(periodic_spacing=N), this will be
run on approximately every N seconds. If this number is negative the
periodic task will be disabled.
"""
def decorator(f):
# Test for old style invocation
if 'ticks_between_runs' in kwargs:
raise exception.InvalidPeriodicTaskArg(arg='ticks_between_runs')

# Control if run at all
f._periodic_task = True
f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
f._periodic_enabled = False
else:
f._periodic_enabled = kwargs.pop('enabled', True)

# Control frequency
f._periodic_spacing = kwargs.pop('spacing', 0)
f._periodic_last_run = time.time()
return f

# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
Expand Down Expand Up @@ -117,17 +144,39 @@ def __init__(cls, names, bases, dict_):
cls._periodic_tasks = []

try:
cls._ticks_to_skip = cls._ticks_to_skip.copy()
cls._periodic_last_run = cls._periodic_last_run.copy()
except AttributeError:
cls._periodic_last_run = {}

try:
cls._periodic_spacing = cls._periodic_spacing.copy()
except AttributeError:
cls._ticks_to_skip = {}
cls._periodic_spacing = {}

for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
task = value
name = task.__name__
if task._ticks_between_runs >= 0:
cls._periodic_tasks.append((name, task))
cls._ticks_to_skip[name] = task._ticks_between_runs

if task._periodic_spacing < 0:
LOG.info(_('Skipping periodic task %(task)s because '
'its interval is negative'),
{'task': name})
continue
if not task._periodic_enabled:
LOG.info(_('Skipping periodic task %(task)s because '
'it is disabled'),
{'task': name})
continue

# A periodic spacing of zero indicates that this task should
# be run every pass
if task._periodic_spacing == 0:
task._periodic_spacing = None

cls._periodic_tasks.append((name, task))
cls._periodic_spacing[name] = task._periodic_spacing
cls._periodic_last_run[name] = task._periodic_last_run


class Manager(base.Base):
Expand Down Expand Up @@ -158,30 +207,39 @@ def create_rpc_dispatcher(self):

def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])

ticks_to_skip = self._ticks_to_skip[task_name]
if ticks_to_skip > 0:
LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
" ticks left until next run"), locals())
self._ticks_to_skip[task_name] -= 1
continue
# If a periodic task is _nearly_ due, then we'll run it early
if self._periodic_spacing[task_name] is None:
wait = 0
else:
wait = time.time() - (self._periodic_last_run[task_name] +
self._periodic_spacing[task_name])
if wait > 0.2:
if wait < idle_for:
idle_for = wait
continue

self._ticks_to_skip[task_name] = task._ticks_between_runs
LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
self._periodic_last_run[task_name] = time.time()

try:
task(self, context)
# NOTE(tiantian): After finished a task, allow manager to
# do other work (report_state, processing AMPQ request etc.)
eventlet.sleep(0)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
locals())

if (not self._periodic_spacing[task_name] is None and
self._periodic_spacing[task_name] < idle_for):
idle_for = self._periodic_spacing[task_name]
eventlet.sleep(0)

return idle_for

def init_host(self):
"""Hook to do additional manager initialization when one requests
the service be started. This is called before any service record
Expand Down
6 changes: 3 additions & 3 deletions nova/network/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@
'entries in multi host mode'),
cfg.IntOpt("dns_update_periodic_interval",
default=-1,
help='Number of periodic scheduler ticks to wait between '
'runs of updates to DNS entries.'),
help='Number of seconds to wait between runs of updates to DNS '
'entries.'),
cfg.StrOpt('dhcp_domain',
default='novalocal',
help='domain to use for building the hostnames'),
Expand Down Expand Up @@ -1973,7 +1973,7 @@ def get_vif_by_mac_address(self, context, mac_address):
mac_address)

@manager.periodic_task(
ticks_between_runs=CONF.dns_update_periodic_interval)
spacing=CONF.dns_update_periodic_interval)
def _periodic_update_dns(self, context):
"""Update local DNS entries of all networks on this host"""
networks = self.db.network_get_all_by_host(context, self.host)
Expand Down
38 changes: 21 additions & 17 deletions nova/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
cfg.IntOpt('report_interval',
default=10,
help='seconds between nodes reporting state to datastore'),
cfg.IntOpt('periodic_interval',
default=60,
help='seconds between running periodic tasks'),
cfg.BoolOpt('periodic_enable',
default=True,
help='enable periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=60,
help='range of seconds to randomly delay when starting the'
Expand Down Expand Up @@ -371,7 +371,8 @@ class Service(object):
it state to the database services table."""

def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None,
*args, **kwargs):
self.host = host
self.binary = binary
Expand All @@ -380,8 +381,9 @@ def __init__(self, host, binary, topic, manager, report_interval=None,
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_enable = periodic_enable
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.periodic_interval_max = periodic_interval_max
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
self.backdoor_port = None
Expand Down Expand Up @@ -433,15 +435,15 @@ def start(self):
if pulse:
self.timers.append(pulse)

if self.periodic_interval:
if self.periodic_enable:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None

periodic = utils.LoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
periodic = utils.DynamicLoopingCall(self.periodic_tasks)
periodic.start(initial_delay=initial_delay,
periodic_interval_max=self.periodic_interval_max)
self.timers.append(periodic)

def _create_service_ref(self, context):
Expand All @@ -460,17 +462,18 @@ def __getattr__(self, key):

@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None):
report_interval=None, periodic_enable=None,
periodic_fuzzy_delay=None, periodic_interval_max=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_interval: defaults to CONF.periodic_interval
:param periodic_enable: defaults to CONF.periodic_enable
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param periodic_interval_max: if set, the max time to wait between runs
"""
if not host:
Expand All @@ -486,14 +489,15 @@ def create(cls, host=None, binary=None, topic=None, manager=None,
manager = CONF.get(manager_cls, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_interval is None:
periodic_interval = CONF.periodic_interval
if periodic_enable is None:
periodic_enable = CONF.periodic_enable
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay)
periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max)

return service_obj

Expand Down Expand Up @@ -529,7 +533,7 @@ def wait(self):
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)


class WSGIService(object):
Expand Down
2 changes: 1 addition & 1 deletion nova/servicegroup/db_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def join(self, member_id, group_id, service=None):
' ServiceGroup driver'))
report_interval = service.report_interval
if report_interval:
pulse = utils.LoopingCall(self._report_state, service)
pulse = utils.FixedIntervalLoopingCall(self._report_state, service)
pulse.start(interval=report_interval,
initial_delay=report_interval)
return pulse
Expand Down

0 comments on commit 9fb647e

Please sign in to comment.