- Does not miss future events.
- Works even when period < execution time
- Tested on only linux platform
- Uses light weight multiprocessing to schedule jobs.
- This package uses a 24-hour clock, only.
- Simultaneously schedule any number of jobs.
- Recurring jobs to be precisely scheduled.
- Event jobs to be executed within the minute.
- Schedule the same function, again, with a different job_name
- On execution failure, set the number of reattempts.
- Control the duration between each re-attempt.
- Works only in background, hence easy to intergrate across platforms(eg. django, flask)
- Set start and stop timestamps if you want to ensure that your function executes only within this window
From PyPi :
pip install simple-scheduler
There are 2 different schedulers:
- Event scheduler (event occurs at an exact time)
from simple_scheduler.event import event_scheduler
from time import sleep, ctime, time
def print_args(a, b=1, *args, **kwargs):
print(ctime(time()), a, b, args, kwargs)
event_scheduler.add_job(
target=print_args,
args=(0,),
kwargs={"b":2},
tz="Asia/Kolkata",
job_name="print-args-1",
number_of_reattempts=2,
when=['fri|14:28', '*|14:**'],
reattempt_duration_in_seconds=5
)
event_scheduler.add_job(
target=print_args,
args=(0, 2, "arg1", "arg2"),
kwargs={
"key1":"value1",
"key2":"value2"
},
when=['fri|14:28', '*|14:**'],
tz="Asia/Kolkata",
start="Jul 23 13:00:00 2021",
stop="Jul 23 15:00:00 2021",
job_name="print-args-2",
number_of_reattempts=2,
reattempt_duration_in_seconds=5
)
event_scheduler.run()
- Recurring scheduler (tasks occur after every "x"-seconds)
from simple_scheduler.recurring import recurring_scheduler
from time import sleep, ctime, time
def wait_10_secs(t): wait_X_secs(10)
def wait_X_secs(t):
began_at = ctime(time()); sleep(t)
print(f"I waited {t} seconds. [From: {began_at} to {ctime(time())}]")
recurring_scheduler.add_job(
target=wait_10_secs,
period_in_seconds=5, # period < execution time
start="Jul 23 12:18:00 2021",
stop="Jul 23 12:19:00 2021",
job_name="ten",
number_of_reattempts=0,
reattempt_duration_in_seconds=0,
kwargs={"t":10},
tz="Asia/Kolkata"
)
recurring_scheduler.add_job(
target=wait_X_secs,
kwargs={"t":3}, # period > execution time
period_in_seconds=5,
job_name="three"
)
recurring_scheduler.run()
event_scheduler.verbose=False
recurring_scheduler.verbose=True
event_scheduler.job_summary()
recurring_scheduler.job_summary()
event_scheduler.add_job(number_of_reattempts=3)
recurring_scheduler.add_job(number_of_reattempts=0)
event_scheduler.add_job(reattempt_duration_in_seconds=10)
recurring_scheduler.add_job(reattempt_duration_in_seconds=10)
event_scheduler.add_job(start="Dec 31 23:59:59 2021")
recurring_scheduler.add_job(start="Dec 31 23:59:59 2021")
event_scheduler.add_job(stop="Dec 31 23:59:59 2021")
recurring_scheduler.add_job(stop="Dec 31 23:59:59 2021")
event_scheduler.remove_job(job_name)
recurring_scheduler.remove_job(job_name)
event_scheduler.clear()
recurring_scheduler.clear()
# its List[str], collection of "day|HH:MM"
# can take values like
day|HH:MM,
*|HH:MM,
*|*H:MM,
*|*H:MM,
*|**:MM,
*|**:*M,
*|**:M*
# day:str = mon/tue/wed/thu/fri/sat/sun
In app.py ensure that scheduler is started globally and not within main()
event_scheduler.run() # global variable
if __name__ == "__main__":
app.run(host="0.0.0.0", port="5000")
Also, in gunicorn config use the --preload argument. This will ensure that
only 1 instance of scheduler is running.