Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run a task completely even without time-periods #763

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/adding_tracks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -772,13 +772,13 @@ If you need more control, you need to implement a class. Below is the implementa
self._cache = params.get("cache", False)
# ... but we need to resolve "profession" lazily on each invocation later
self._params = params
# Determines whether this parameter source will be "exhausted" at some point or
# Rally can draw values infinitely from it.
self.infinite = True

def partition(self, partition_index, total_partitions):
return self

def size(self):
return 1

def params(self):
# you must provide all parameters that the runner expects
return {
Expand All @@ -803,17 +803,17 @@ In ``register`` you bind the name in the track specification to your parameter s

* The constructor needs to have the signature ``__init__(self, track, params, **kwargs)``.
* ``partition(self, partition_index, total_partitions)`` is called by Rally to "assign" the parameter source across multiple clients. Typically you can just return ``self``. If each client needs to act differently then you can provide different parameter source instances here as well.
* ``size(self)``: This method helps Rally to provide a proper progress indication to users if you use a warmup time period. For bulk indexing, return the number of bulks (for a given client). As searches are typically executed with a pre-determined amount of iterations, just return ``1`` in this case.
* ``params(self)``: This method returns a dictionary with all parameters that the corresponding "runner" expects. This method will be invoked once for every iteration during the race. In the example, we parameterize the query by randomly selecting a profession from a list.
* ``infinite``: This property helps Rally to determine whether to let the parameter source determine when a task should be finished (when ``infinite`` is ``False``) or whether the task properties (e.g. ``iterations`` or ``time-period``) determine when a task should be finished. In the former case, the parameter source needs to raise ``StopIteration`` to indicate when it is finished.

For cases, where you want to provide a progress indication but cannot calculate ``size`` up-front (e.g. when you generate bulk requests on-the fly up to a certain total size), you can implement a property ``percent_completed`` which returns a floating point value between ``0.0`` and ``1.0``. Rally will query this value before each call to ``params()`` and uses it to indicate progress. However:
For cases, where you want to provide a progress indication (this is typically the case when ``infinite`` is ``False``), you can implement a property ``percent_completed`` which returns a floating point value between ``0.0`` and ``1.0``. Rally will query this value before each call to ``params()`` and uses it to indicate progress. However:

* Rally will not check ``percent_completed`` if it can derive progress in any other way.
* The value of ``percent_completed`` is purely informational and does not influence when Rally considers an operation to be completed.

.. note::

The method ``params(self)`` is called on a performance-critical path. Don't do anything in this method that takes a lot of time (avoid any I/O). For searches, you should usually throttle throughput anyway and there it does not matter that much but if the corresponding operation is run without throughput throttling, double-check that your custom parameter source does not introduce a bottleneck.
The method ``params(self)`` as well as the property ``percent_completed`` are called on a performance-critical path. Don't do anything that takes a lot of time (avoid any I/O). For searches, you should usually throttle throughput anyway and there it does not matter that much but if the corresponding operation is run without throughput throttling, double-check that your custom parameter source does not introduce a bottleneck.

Custom parameter sources can use the Python standard API but using any additional libraries is not supported.

Expand Down
61 changes: 61 additions & 0 deletions docs/migrate.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,67 @@
Migration Guide
===============

Migrating to Rally 1.4.0
------------------------

Custom Parameter Sources
^^^^^^^^^^^^^^^^^^^^^^^^

With Rally 1.4.0, we have changed the API for custom parameter sources. The ``size()`` method is now deprecated and is instead replaced with a new property called ``infinite``. If you have previously returned ``None`` in ``size()``, ``infinite`` should be set to ``True``, otherwise ``False``. Also, we recommend to implement the property ``percent_completed`` as Rally might not be able to determine progress in some cases. See below for some examples.

Old::

class CustomFiniteParamSource:
# ...
def size():
return calculate_size()

def params():
return next_parameters()

class CustomInfiniteParamSource:
# ...
def size():
return None

# ...


New::

class CustomFiniteParamSource:
def __init__(self, track, params, **kwargs):
self.infinite = False
# to track progress
self.current_invocation = 0

# ...
# Note that we have removed the size() method

def params():
self.current_invocation += 1
return next_parameters()

# Implementing this is optional but recommended for proper progress reports
@property
def percent_completed(self):
# for demonstration purposes we use calculate_size() here
# to determine the expected number of invocations. However, if
# it is possible to determine this value upfront, it is best
# to cache it in a field and just reuse the value
return self.current_invocation / calculate_size()


class CustomInfiniteParamSource:
def __init__(self, track, params, **kwargs):
self.infinite = True
# ...

# ...
# Note that we have removed the size() method
# ...


Migrating to Rally 1.3.0
------------------------
Races now stored by ID instead of timestamp
Expand Down
210 changes: 132 additions & 78 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,105 +1260,159 @@ def schedule_for(current_track, task, client_index):
runner_for_op = runner.runner_for(op.type)
params_for_op = track.operation_parameters(current_track, op).partition(client_index, num_clients)

if task.warmup_time_period is not None or task.time_period is not None:
if requires_time_period_schedule(task, params_for_op):
warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0
logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] seconds and a "
"time period of [%s] seconds.", task.schedule, task, str(warmup_time_period), str(task.time_period))
return time_period_based(sched, warmup_time_period, task.time_period, runner_for_op, params_for_op)
logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] "
"seconds and a time period of [%s] seconds.", task.schedule, task.name,
str(warmup_time_period), str(task.time_period))
loop_control = TimePeriodBased(warmup_time_period, task.time_period)
else:
warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0
if task.iterations:
iterations = task.iterations
elif params_for_op.size():
iterations = params_for_op.size() - warmup_iterations
else:
elif params_for_op.infinite:
# this is usually the case if the parameter source provides a constant
iterations = 1
logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%d] warmup iterations and "
"[%d] iterations." % (task.schedule, op, warmup_iterations, iterations))
return iteration_count_based(sched, warmup_iterations, iterations, runner_for_op, params_for_op)
else:
iterations = None
logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup "
"iterations and [%s] iterations.", task.schedule, task.name, str(warmup_iterations), str(iterations))
loop_control = IterationBased(warmup_iterations, iterations)

return generator_for_schedule(task.name, sched, loop_control, runner_for_op, params_for_op)


def requires_time_period_schedule(task, params):
if task.warmup_time_period is not None or task.time_period is not None:
return True
# user has explicitly requested iterations
if task.warmup_iterations is not None or task.iterations is not None:
return False
# If the parameter source ends after a finite amount of iterations, we will run with a time-based schedule
return not params.infinite


def time_period_based(sched, warmup_time_period, time_period, runner, params):
def generator_for_schedule(task_name, sched, task_progress_control, runner, params):
"""
Calculates the necessary schedule for time period based operations.
Creates a generator that will yield individual task invocations for the provided schedule.

:param sched: The scheduler for this task. Must not be None.
:param warmup_time_period: The time period in seconds that is considered for warmup. Must not be None; provide zero instead.
:param time_period: The time period in seconds that is considered for measurement. May be None.
:param task_name: The name of the task for which the schedule is generated.
:param sched: The scheduler for this task.
:param task_progress_control: Controls how and how often this generator will loop.
:param runner: The runner for a given operation.
:param params: The parameter source for a given operation.
:return: A generator for the corresponding parameters.
"""
next_scheduled = 0
start = time.perf_counter()
logger = logging.getLogger(__name__)
if time_period is None:
iterations = params.size()
if iterations:
logger.info("No time-period property specified. Will run for at most [%d] iterations.", iterations)
for it in range(0, iterations):
try:
sample_type = metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal
percent_completed = (it + 1) / iterations
yield (next_scheduled, sample_type, percent_completed, runner, params.params())
next_scheduled = sched.next(next_scheduled)
except StopIteration:
logger.info("Time-period-based schedule stopped due to StopIteration.")
return
logger.info("Time-period-based schedule stopped after the specified number of [%d] iterations.", iterations)
else:
logger.info("No time-period property specified. Will run as long as the parameter source provides values.")
param_source_knows_progress = hasattr(params, "percent_completed")
while True:
try:
sample_type = metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = params.percent_completed if param_source_knows_progress else None
yield (next_scheduled, sample_type, percent_completed, runner, params.params())
next_scheduled = sched.next(next_scheduled)
except StopIteration:
logger.info("Time-period-based schedule stopped due to StopIteration.")
return
if task_progress_control.infinite:
logger.info("Parameter source will determine when the schedule for [%s] terminates.", task_name)
param_source_knows_progress = hasattr(params, "percent_completed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love the name of this variable 👍

task_progress_control.start()
while True:
try:
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = params.percent_completed if param_source_knows_progress else None
yield (next_scheduled, task_progress_control.sample_type, percent_completed, runner, params.params())
next_scheduled = sched.next(next_scheduled)
task_progress_control.next()
except StopIteration:
logger.info("%s schedule for [%s] stopped due to StopIteration.", str(task_progress_control), task_name)
return
else:
duration = warmup_time_period + time_period
end = start + duration
logger.info("Time-period-based schedule will run for a total of [%d] seconds.", duration)
while time.perf_counter() < end:
task_progress_control.start()
logger.info("%s schedule will determine when the schedule for [%s] terminates.",
str(task_progress_control), task_name)
while not task_progress_control.completed:
try:
elapsed = time.perf_counter() - start
sample_type = metrics.SampleType.Warmup if elapsed < warmup_time_period else metrics.SampleType.Normal
percent_completed = elapsed / duration
yield (next_scheduled, sample_type, percent_completed, runner, params.params())
yield (next_scheduled,
task_progress_control.sample_type,
task_progress_control.percent_completed,
runner,
params.params())
next_scheduled = sched.next(next_scheduled)
task_progress_control.next()
except StopIteration:
logger.info("%s schedule for [%s] stopped due to StopIteration.", str(task_progress_control), task_name)
return
logger.info("Time-period-based schedule stopped after the specified time period of [%d] seconds.", duration)
logger.info("%s schedule for [%s] stopped regularly.", str(task_progress_control), task_name)


def iteration_count_based(sched, warmup_iterations, iterations, runner, params):
"""
Calculates the necessary schedule based on a given number of iterations.
class TimePeriodBased:
def __init__(self, warmup_time_period, time_period):
self._warmup_time_period = warmup_time_period
self._time_period = time_period
if warmup_time_period is not None and time_period is not None:
self._duration = self._warmup_time_period + self._time_period
else:
self._duration = None
self._start = None
self._now = None

:param sched: The scheduler for this task. Must not be None.
:param warmup_iterations: The number of warmup iterations to run. 0 if no warmup should be performed.
:param iterations: The number of measurement iterations to run.
:param runner: The runner for a given operation.
:param params: The parameter source for a given operation.
:return: A generator for the corresponding parameters.
"""
next_scheduled = 0
total_iterations = warmup_iterations + iterations
logger = logging.getLogger(__name__)
if total_iterations == 0:
raise exceptions.RallyAssertionError("Operation must run at least for one iteration.")
for it in range(0, total_iterations):
try:
sample_type = metrics.SampleType.Warmup if it < warmup_iterations else metrics.SampleType.Normal
percent_completed = (it + 1) / total_iterations
yield (next_scheduled, sample_type, percent_completed, runner, params.params())
next_scheduled = sched.next(next_scheduled)
except StopIteration:
logger.info("Iteration-count-based schedule stopped due to StopIteration.")
return
logger.info("Iteration-count-based schedule stopped after [%d] warmup iterations and [%d] iterations.",
warmup_iterations, iterations)
def start(self):
self._now = time.perf_counter()
self._start = self._now

@property
def _elapsed(self):
return self._now - self._start

@property
def sample_type(self):
return metrics.SampleType.Warmup if self._elapsed < self._warmup_time_period else metrics.SampleType.Normal

@property
def infinite(self):
return self._time_period is None

@property
def percent_completed(self):
return self._elapsed / self._duration

@property
def completed(self):
return self._now >= (self._start + self._duration)

def next(self):
self._now = time.perf_counter()

def __str__(self):
return "time-period-based"


class IterationBased:
def __init__(self, warmup_iterations, iterations):
self._warmup_iterations = warmup_iterations
self._iterations = iterations
if warmup_iterations is not None and iterations is not None:
self._total_iterations = self._warmup_iterations + self._iterations
if self._total_iterations == 0:
raise exceptions.RallyAssertionError("Operation must run at least for one iteration.")
else:
self._total_iterations = None
self._it = None

def start(self):
self._it = 0

@property
def sample_type(self):
return metrics.SampleType.Warmup if self._it < self._warmup_iterations else metrics.SampleType.Normal

@property
def infinite(self):
return self._iterations is None

@property
def percent_completed(self):
return (self._it + 1) / self._total_iterations

@property
def completed(self):
return self._it >= self._total_iterations

def next(self):
self._it += 1

def __str__(self):
return "iteration-count-based"
Loading