Skip to content

Commit

Permalink
Add support for a concurrency policy (#1)
Browse files Browse the repository at this point in the history
* Add MIT license
* Fix GitHub Actions workflow for package testing
  • Loading branch information
bradshjg committed Nov 21, 2020
1 parent e165dcb commit a14e2b5
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 29 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
pip install flake8 freezegun pytest
pip install .
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
Expand Down
9 changes: 0 additions & 9 deletions .travis.yml

This file was deleted.

24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [0.1.0] - 2020-11-20
### Added
- Support for setting a concurrency policy. The default policy is to wait for
the current worker to complete and immediately spawn a new one if the next start
time has lapsed. The other available policies are ``ConcurrencyPolic.SKIP`` which
will skip the lapsed schedule and ``ConcurrencyPolicy.ALLOW`` which will launch
a new worker concurrently if one is still running.
- MIT license.

## [0.0.1] - 2020-11-19
### Added
- Cron entrypoint that supports timezone-aware cron schedules.

[Unreleased]: https://github.com/bradshjg/nameko-cron/compare/0.1.0...HEAD
[0.1.0]: https://github.com/bradshjg/nameko-cron/compare/0.0.1...0.1.0
[0.0.1]: https://github.com/bradshjg/nameko-cron/releases/tag/0.0.1
15 changes: 15 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Copyright 2020 James Bradshaw

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions
of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,13 @@ class Service:
# executes every day at noon America/Chicago time
print("pong")
```

by default, if a worker takes longer than the next scheduled run the worker will wait until
the task is complete before immediately launching a new worker. This behavior can be controlled
via the ``concurrency`` keyword argument.

``ConcurrencyPolicy.WAIT`` is that default behavior.

``ConcurrencyPolicy.ALLOW`` will spawn a worker regardless of whether existing workers are still running.

``ConcurrencyPolicy.SKIP`` will skip a run if the previous worker lapsed the next scheduled run.
30 changes: 26 additions & 4 deletions nameko_cron/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from enum import Enum
import time
from logging import getLogger

Expand All @@ -13,11 +14,22 @@
_log = getLogger(__name__)


class ConcurrencyPolicy(Enum):
ALLOW = 'allow'
SKIP = 'skip'
WAIT = 'wait'


class Cron(Entrypoint):
def __init__(self, schedule, tz=None, **kwargs):
def __init__(self, schedule: str, tz: str = None, concurrency: str = ConcurrencyPolicy.WAIT, **kwargs):
"""
Cron entrypoint. Fires according to a (possibly timezone-aware)
cron schedule. If no timezone info is passed, the default is UTC.
Set ``concurrency`` to ``ConcurrencyPolicy.ALLOW`` to allow multiple workers
to run simultaneously. Set ``concurrency`` to ``ConcurrencyPolicy.SKIP`` to
skip lapsed scheduled runs. The default behavior (``ConcurrencyPolicy.WAIT``)
is to wait until the running worker completes and immediately spawn another
if the schedule has lapsed.
Example::
Expand All @@ -32,6 +44,7 @@ def ping(self):
"""
self.schedule = schedule
self.tz = tz
self.concurrency = concurrency
self.should_stop = Event()
self.worker_complete = Event()
self.gt = None
Expand Down Expand Up @@ -74,10 +87,17 @@ def _run(self):

self.handle_timer_tick()

self.worker_complete.wait()
self.worker_complete.reset()
if self.concurrency != ConcurrencyPolicy.ALLOW:
self.worker_complete.wait()
self.worker_complete.reset()

sleep_time = next(interval)
print(sleep_time)

# a sleep time of zero represents that we've elapsed the next start time, so
# if the user set the policy to skip, we need to update the interval again.
if self.concurrency == ConcurrencyPolicy.SKIP and sleep_time == 0:
sleep_time = next(interval)

def handle_timer_tick(self):
args = ()
Expand All @@ -92,7 +112,9 @@ def handle_timer_tick(self):
self, args, kwargs, handle_result=self.handle_result)

def handle_result(self, worker_ctx, result, exc_info):
self.worker_complete.send()
# we only care about the worker completion if we're going to be waiting for it.
if self.concurrency != ConcurrencyPolicy.ALLOW:
self.worker_complete.send()
return result, exc_info


Expand Down
8 changes: 7 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="nameko-cron",
version="0.0.1",
version="0.1.0",
author="bradshjg",
author_email="james.g.bradshaw@gmail.com",
description="Nameko cron extension",
Expand All @@ -15,8 +15,14 @@
packages=setuptools.find_packages(exclude=['tests']),
classifiers=[
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Topic :: Internet",
"Topic :: Software Development :: Libraries :: Python Modules",
"Intended Audience :: Developers",
],
python_requires='>=3.6',
install_requires=[
Expand Down
32 changes: 20 additions & 12 deletions tests/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,51 @@
import eventlet
import freezegun
import pytest
from mock import Mock
from unittest.mock import Mock

from nameko.testing.services import get_extension
from nameko.testing.utils import wait_for_call

from nameko_cron import Cron, cron
from nameko_cron import ConcurrencyPolicy, Cron, cron


@pytest.fixture
def tracker():
return Mock()


def test_cron_runs(container_factory, tracker):
@pytest.mark.parametrize("timeout,concurrency,task_time,expected_calls", [
# the cron schedule is set to spawn a worker every second
(5, ConcurrencyPolicy.WAIT, 0, 5), # a short-lived worker run at 0, 1, 2, 3, 4, 5
(5, ConcurrencyPolicy.WAIT, 2, 3), # a long-lived worker should fire at 0, 2, 4
(5, ConcurrencyPolicy.ALLOW, 10, 5), # if concurrency is permitted, new workers spawn alongside existing ones
(5, ConcurrencyPolicy.SKIP, 1.5, 3), # skipping should run at 0, 2, and 4
(5, ConcurrencyPolicy.WAIT, 1.5, 4), # run at 0, 1.5, 3, 4.5 (always behind)
])
def test_cron_runs(timeout, concurrency, task_time, expected_calls, container_factory, tracker):
"""Test running the cron main loop."""
timeout = 2.0

class Service(object):
name = "service"

@cron('* * * * * *')
@cron('* * * * * *', concurrency=concurrency)
def tick(self):
eventlet.sleep(0)
tracker()
eventlet.sleep(task_time)

container = container_factory(Service, {})

# Check that Timer instance is initialized correctly
# Check that Cron instance is initialized correctly
instance = get_extension(container, Cron)
assert instance.schedule == '* * * * * *'
assert instance.tz is None
assert instance.concurrency == concurrency

container.start()
eventlet.sleep(timeout)
container.stop()
with freezegun.freeze_time('2020-11-20 23:59:59.5', tick=True):
container.start()
eventlet.sleep(timeout)
container.stop()

assert tracker.call_count == 2
assert tracker.call_count == expected_calls


@pytest.mark.parametrize("timezone,expected_first_interval_hours", [
Expand Down
1 change: 0 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ envlist = py36
[testenv]
deps =
freezegun
mock
pytest
commands =
pytest

0 comments on commit a14e2b5

Please sign in to comment.