Skip to content

Commit

Permalink
Scheduler: adds optional support for redis and specifying redis client
Browse files Browse the repository at this point in the history
- allow optional support for running the scheduler without a redis
  backend for persistent schedule storage. The default is to exit on
  startup error. The user can set `redis_startup_error_hard_kill` to
  false if they really want to run the scheduler with no backend db.
- allow users the ability to specify a redis client class. this will let
  them use redis-cluster if they wish. They can install the appropriate
  module and define `redis_client_class` (and if necessary class keyword
  arguments via `redis_client_class_kwargs`).
  • Loading branch information
com4 committed Jan 16, 2019
1 parent eedb40e commit 0bed1a0
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 104 deletions.
70 changes: 66 additions & 4 deletions docs/settings_file.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ Default: False

Enable most verbose level of debug statements

hide_heartbeat_logs
===================
Default: True

This hides heart beat messages from the logs. Disabling this will result in very
noisy log output.

max_sockets
===========
Default: 1024
Expand Down Expand Up @@ -91,15 +98,55 @@ Default: ''

Password to use when connecting to redis

redis_client_class
==================
Default: ``redis.StrictRedis``

The class to use as the redis client. This can be overridden if you want to use
a different module to connect to redis. For example
``rediscluster.StrictRedisCluster``. Note: You make get errors if you don't use
a strict mode class.

redis_client_class_kwargs
=========================
Default: {}

This is a JSON hash map of keyword arguments to pass to the Python class
constructor. This is useful for using ``redis-cluster-py`` on AWS Elasticache.
When using Elasticache this value should be set to
``{"skip_full_coverage_check": true}`` to prevent startup errors.

redis_startup_error_hard_kill
=============================
Default: True

If there is an error connecting to the Redis server for persistent schedule
storage on startup then kill the app. This is useful if you want to prevent
accidentally accepting schedules that can't be saved to a persistent store. If
you would like to use redis you will need to ``pip install redis`` or
``pip install redis-py-cluster`` and define the necessary options.

***********
Job Manager
***********

default_queue_name
==================
Default: default

This is the default queue a job manager will listen on if nothing is specified.

default_queue_weight
====================
Default: 10

This is the default weight for the default queue is it is not explicitly set.

concurrent_jobs
===============
Default: 4

This is the number of concurrent jobs the indiviudal job manager should execute
This is the number of concurrent jobs the individual job manager should execute
at a time. If you are using the multiprocess or threading model this number
becomes important as you will want to control the load on your server. If the
load equals the number of cores on the server, processes will begin waiting for
Expand All @@ -115,7 +162,7 @@ queues
======
Default: [[10, "default"]]

Comma seperated list of queues to process jobs for with their weights. This list
Comma separated list of queues to process jobs with their weights. This list
must be valid JSON otherwise an error will be thrown.
Example: ``queues=[[10, "data_process"], [15, "email"]]``. With these
weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers
Expand All @@ -131,14 +178,29 @@ number until the large box is ready to accept another q1 job.
default queue so that anything that is not explicitly assigned will still be
run.

setup_callabe/setup_path
========================
setup_callable/setup_path
=========================
Default: '' (Signifies no task will be attempted)

Strings containing path and callable to be run when a worker is spawned
if applicable to that type of worker. Currently the only supported worker is a
MultiProcessWorker, and is useful for pulling any global state into memory.

job_entry_func
==============
Default: '' (Signifies no function will be executed)

The function to execute before **every** job a worker thread executes. For
example: cleaning up stale database connections. (Django's
``django.db.connections[].close_if_unusable_or_obsolete()``)

job_exit_func
=============
Default: '' (Signifies no function will be executed)

The function to execute **after** every job a worker thread executes. For
example: closing any database handles that were left open.

max_job_count
=============
Default: 1024
Expand Down
1 change: 1 addition & 0 deletions etc/eventmq.conf-dist
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ scheduler_addr=tcp://eventmq:47291
[router]

[scheduler]
redis_client_class = redis.StrictRedis

[jobmanager]
worker_addr=tcp://127.0.0.1:47290
Expand Down
2 changes: 1 addition & 1 deletion eventmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__author__ = 'EventMQ Contributors'
__version__ = '0.3.9'
__version__ = '0.3.10-rc1'

PROTOCOL_VERSION = 'eMQP/1.0'

Expand Down
4 changes: 2 additions & 2 deletions eventmq/client/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,14 @@ def send_request(socket, message, reply_requested=False, guarantee=False,
}
}
Args:
socket (socket): Socket to use when sending `message`
socket: Socket (Sender or Receiver) to use when sending `message`
message: message to send to `socket`
reply_requested (bool): request the return value of func as a reply
guarantee (bool): (Give your best effort) to guarantee that func is
executed. Exceptions and things will be logged.
retry_count (int): How many times should be retried when encountering
an Exception or some other failure before giving up. (default: 0
or immediatly fail)
or immediately fail)
timeout (int): How many seconds should we wait before killing the job
default: 0 which means infinite timeout
queue (str): Name of queue to use when executing the job. Default: is
Expand Down
1 change: 1 addition & 0 deletions eventmq/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
RQ_PASSWORD = ''
REDIS_CLIENT_CLASS = 'redis.StrictRedis'
REDIS_CLIENT_CLASS_KWARGS = {}
REDIS_STARTUP_ERROR_HARD_KILL = True

MAX_JOB_COUNT = 1024

Expand Down
Loading

0 comments on commit 0bed1a0

Please sign in to comment.