Skip to content

Commit

Permalink
Added Etcd jobstore support (#777)
Browse files Browse the repository at this point in the history
Co-authored-by: Yan Daojiang <yandaojiang@sensorsdata.cn>
Co-authored-by: Alex Grönholm <alex.gronholm@nextday.fi>
  • Loading branch information
3 people committed Aug 20, 2023
1 parent 2cd3791 commit 677ce71
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Expand Up @@ -23,6 +23,6 @@ jobs:
- name: Start external services
run: docker-compose up -d
- name: Install the project and its dependencies
run: pip install -e .[testing,asyncio,gevent,mongodb,redis,rethinkdb,sqlalchemy,tornado,twisted,zookeeper]
run: pip install -e .[testing,asyncio,gevent,mongodb,redis,rethinkdb,sqlalchemy,tornado,twisted,zookeeper,etcd]
- name: Test with pytest
run: pytest
1 change: 1 addition & 0 deletions .readthedocs.yml
Expand Up @@ -21,4 +21,5 @@ python:
- tornado
- twisted
- zookeeper
- etcd3
- doc
1 change: 1 addition & 0 deletions README.rst
Expand Up @@ -36,6 +36,7 @@ like. Supported backends for storing jobs include:
* `Redis <http://redis.io/>`_
* `RethinkDB <https://www.rethinkdb.com/>`_
* `ZooKeeper <https://zookeeper.apache.org/>`_
* `Etcd <https://etcd.io/>`_

APScheduler also integrates with several common Python frameworks, like:

Expand Down
162 changes: 162 additions & 0 deletions apscheduler/jobstores/etcd.py
@@ -0,0 +1,162 @@
import pickle
from datetime import datetime

from pytz import utc

from apscheduler.job import Job
from apscheduler.jobstores.base import (BaseJobStore, ConflictingIdError,
JobLookupError)
from apscheduler.util import (datetime_to_utc_timestamp, maybe_ref,
utc_timestamp_to_datetime)

try:
from etcd3 import Etcd3Client
except ImportError: # pragma: nocover
raise ImportError('EtcdJobStore requires etcd3 be installed')


class EtcdJobStore(BaseJobStore):
"""
Stores jobs in a etcd. Any leftover keyword arguments are directly passed to
etcd3's `etcd3.client
<https://python-etcd3.readthedocs.io/en/latest/readme.html>`_.
Plugin alias: ``etcd``
:param str path: path to store jobs in
:param client: a :class:`~etcd3.client.etcd3` instance to use instead of
providing connection arguments
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
highest available
"""

def __init__(self, path='/apscheduler', client=None, close_connection_on_exit=False,
pickle_protocol=pickle.DEFAULT_PROTOCOL, **connect_args):
super(EtcdJobStore, self).__init__()
self.pickle_protocol = pickle_protocol
self.close_connection_on_exit = close_connection_on_exit

if not path:
raise ValueError('The "path" parameter must not be empty')

self.path = path

if client:
self.client = maybe_ref(client)
else:
self.client = Etcd3Client(**connect_args)

def lookup_job(self, job_id):
node_path = self.path + '/' + str(job_id)
try:
content, _ = self.client.get(node_path)
conetent = pickle.loads(content)
job = self._reconstitute_job(conetent['job_state'])
return job
except BaseException:
return None

def get_due_jobs(self, now):
timestamp = datetime_to_utc_timestamp(now)
jobs = [job_record['job'] for job_record in self._get_jobs()
if job_record['next_run_time'] is not None and
job_record['next_run_time'] <= timestamp]
return jobs

def get_next_run_time(self):
next_runs = [job_record['next_run_time'] for job_record in self._get_jobs()
if job_record['next_run_time'] is not None]
return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None

def get_all_jobs(self):
jobs = [job_record['job'] for job_record in self._get_jobs()]
self._fix_paused_jobs_sorting(jobs)
return jobs

def add_job(self, job):
node_path = self.path + '/' + str(job.id)
value = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': job.__getstate__()
}
data = pickle.dumps(value, self.pickle_protocol)
status = self.client.put_if_not_exists(node_path, value=data)
if not status:
raise ConflictingIdError(job.id)

def update_job(self, job):
node_path = self.path + "/" + str(job.id)
changes = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': job.__getstate__()
}
data = pickle.dumps(changes, self.pickle_protocol)
status, _ = self.client.transaction(
compare=[
self.client.transactions.version(node_path) > 0
],
success=[
self.client.transactions.put(node_path, value=data)
],
failure=[]
)
if not status:
raise JobLookupError(job.id)

def remove_job(self, job_id):
node_path = self.path + "/" + str(job_id)
status, _ = self.client.transaction(
compare=[
self.client.transactions.version(node_path) > 0
],
success=[
self.client.transactions.delete(node_path)
],
failure=[]
)
if not status:
raise JobLookupError(job_id)

def remove_all_jobs(self):
self.client.delete_prefix(self.path)

def shutdown(self):
self.client.close()

def _reconstitute_job(self, job_state):
job_state = job_state
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
return job

def _get_jobs(self):
jobs = []
failed_job_ids = []
all_ids = list(self.client.get_prefix(self.path))

for doc, _ in all_ids:
try:
content = pickle.loads(doc)
job_record = {
'next_run_time': content['next_run_time'],
'job': self._reconstitute_job(content['job_state'])
}
jobs.append(job_record)
except BaseException:
content = pickle.loads(doc)
failed_id = content['job_state']['id']
failed_job_ids.append(failed_id)
self._logger.exception('Unable to restore job "%s" -- removing it', failed_id)

if failed_job_ids:
for failed_id in failed_job_ids:
self.remove_job(failed_id)
paused_sort_key = datetime(9999, 12, 31, tzinfo=utc)
return sorted(jobs, key=lambda job_record: job_record['job'].next_run_time
or paused_sort_key)

def __repr__(self):
self._logger.exception('<%s (client=%s)>' % (self.__class__.__name__, self.client))
return '<%s (client=%s)>' % (self.__class__.__name__, self.client)
7 changes: 7 additions & 0 deletions docker-compose.yml
Expand Up @@ -19,3 +19,10 @@ services:
image: zookeeper
ports:
- 127.0.0.1:2181:2181

etcd:
image: docker.io/bitnami/etcd:3.5
environment:
- ALLOW_NONE_AUTHENTICATION=yes
ports:
- 127.0.0.1:2379:2379
1 change: 1 addition & 0 deletions docs/contributing.rst
Expand Up @@ -31,6 +31,7 @@ To fully run the test suite, you will need at least:
* A MongoDB server
* A Redis server
* A Zookeeper server
* An Etcd server

For other dependencies, it's best to look in tox.ini and install what is appropriate for the Python
version you're using.
Expand Down
33 changes: 33 additions & 0 deletions examples/jobstores/etcd.py
@@ -0,0 +1,33 @@
"""
This example demonstrates the use of the etcd job store.
On each run, it adds a new alarm that fires after ten seconds.
You can exit the program, restart it and observe that any previous alarms that have not fired yet
are still active. Running the example with the --clear switch will remove any existing alarms.
"""

import os
import sys
from datetime import datetime, timedelta

from apscheduler.schedulers.blocking import BlockingScheduler


def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)


if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_jobstore('etcd', alias="etcd", path='/example_jobs')
if len(sys.argv) > 1 and sys.argv[1] == '--clear':
scheduler.remove_all_jobs()

alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, run this example with the --clear argument.')
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
7 changes: 4 additions & 3 deletions setup.py
@@ -1,7 +1,6 @@
import os.path

from setuptools import setup, find_packages

from setuptools import find_packages, setup

here = os.path.dirname(__file__)
readme_path = os.path.join(here, 'README.rst')
Expand Down Expand Up @@ -53,6 +52,7 @@
'tornado': ['tornado >= 4.3'],
'twisted': ['twisted'],
'zookeeper': ['kazoo'],
'etcd': ['etcd3', 'protobuf <= 3.21.0'],
'testing': [
'pytest',
'pytest_asyncio',
Expand Down Expand Up @@ -88,7 +88,8 @@
'mongodb = apscheduler.jobstores.mongodb:MongoDBJobStore [mongodb]',
'rethinkdb = apscheduler.jobstores.rethinkdb:RethinkDBJobStore [rethinkdb]',
'redis = apscheduler.jobstores.redis:RedisJobStore [redis]',
'zookeeper = apscheduler.jobstores.zookeeper:ZooKeeperJobStore [zookeeper]'
'zookeeper = apscheduler.jobstores.zookeeper:ZooKeeperJobStore [zookeeper]',
'etcd = apscheduler.jobstores.etcd:EtcdJobStore [etcd]'
]
}
)
42 changes: 38 additions & 4 deletions tests/test_jobstores.py
Expand Up @@ -90,16 +90,26 @@ def zookeeperjobstore():
store.shutdown()


@pytest.fixture
def etcdjobstore():
etcd = pytest.importorskip('apscheduler.jobstores.etcd')
store = etcd.EtcdJobStore(path='/apscheduler_unittest')
store.start(None, 'etcd')
yield store
store.remove_all_jobs()
store.shutdown()


@pytest.fixture(params=['memjobstore', 'sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore',
'rethinkdbjobstore', 'zookeeperjobstore'],
ids=['memory', 'sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper'])
'rethinkdbjobstore', 'zookeeperjobstore', 'etcdjobstore'],
ids=['memory', 'sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper', 'etcd'])
def jobstore(request):
return request.getfixturevalue(request.param)


@pytest.fixture(params=['sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore',
'rethinkdbjobstore', 'zookeeperjobstore'],
ids=['sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper'])
'rethinkdbjobstore', 'zookeeperjobstore', 'etcdjobstore'],
ids=['sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper', 'etcd'])
def persistent_jobstore(request):
return request.getfixturevalue(request.param)

Expand Down Expand Up @@ -327,6 +337,11 @@ def test_repr_zookeeperjobstore(zookeeperjobstore):
assert repr(zookeeperjobstore).startswith(class_sig)


def test_repr_etcdjobstore(etcdjobstore):
class_sig = "<EtcdJobStore (client=<etcd3.client"
assert repr(etcdjobstore).startswith(class_sig)


def test_memstore_close(memjobstore, create_add_job):
create_add_job(memjobstore, dummy_job, datetime(2016, 5, 3))
memjobstore.shutdown()
Expand Down Expand Up @@ -406,3 +421,22 @@ def test_zookeeper_null_path():
zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper')
exc = pytest.raises(ValueError, zookeeper.ZooKeeperJobStore, path='')
assert '"path"' in str(exc.value)


def test_etcd_null_path():
etcd = pytest.importorskip('apscheduler.jobstores.etcd')
exc = pytest.raises(ValueError, etcd.EtcdJobStore, path='')
assert '"path"' in str(exc.value)


def test_etcd_client_ref():
global etcd_client
etcd = pytest.importorskip('apscheduler.jobstores.etcd')
etcd_client = etcd.Etcd3Client()
try:
etcdjobstore = etcd.EtcdJobStore(client='%s:etcd_client' % __name__)
etcdjobstore.start(None, 'etcd')
etcdjobstore.shutdown()
finally:
etcd_client.close()
del etcd_client
1 change: 1 addition & 0 deletions tox.ini
Expand Up @@ -15,6 +15,7 @@ extras = testing
tornado
twisted
zookeeper
etcd3
deps =
{py36,py37,py38,py39,py310,py311}: PySide6

Expand Down

0 comments on commit 677ce71

Please sign in to comment.