Skip to content

Commit

Permalink
Merge c8be406 into 32a1400
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinThoma authored Feb 10, 2021
2 parents 32a1400 + c8be406 commit de6e8bc
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 48 deletions.
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pytest
pytest-cov
pytest-flake8
Sphinx
black==20.8b1
black==20.8b1
mypy
115 changes: 68 additions & 47 deletions schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import random
import re
import time
from typing import Set, List, Optional, Callable, Union

logger = logging.getLogger("schedule")

Expand Down Expand Up @@ -81,10 +82,10 @@ class Scheduler(object):
handle their execution.
"""

def __init__(self):
self.jobs = []
def __init__(self) -> None:
self.jobs: List[Job] = []

def run_pending(self):
def run_pending(self) -> None:
"""
Run all jobs that are scheduled to run.
Expand All @@ -98,7 +99,7 @@ def run_pending(self):
for job in sorted(runnable_jobs):
self._run_job(job)

def run_all(self, delay_seconds=0):
def run_all(self, delay_seconds: int = 0) -> None:
"""
Run all jobs regardless if they are scheduled to run or not.
Expand All @@ -117,7 +118,7 @@ def run_all(self, delay_seconds=0):
self._run_job(job)
time.sleep(delay_seconds)

def get_jobs(self, tag=None):
def get_jobs(self, tag: Optional[Hashable] = None) -> List["Job"]:
"""
Gets scheduled jobs marked with the given tag, or all jobs
if tag is omitted.
Expand All @@ -130,7 +131,7 @@ def get_jobs(self, tag=None):
else:
return [job for job in self.jobs if tag in job.tags]

def clear(self, tag=None):
def clear(self, tag: Optional[Hashable] = None) -> None:
"""
Deletes scheduled jobs marked with the given tag, or all jobs
if tag is omitted.
Expand All @@ -139,13 +140,13 @@ def clear(self, tag=None):
jobs to delete
"""
if tag is None:
logger.debug('Deleting *all* jobs')
logger.debug("Deleting *all* jobs")
del self.jobs[:]
else:
logger.debug('Deleting all jobs tagged "%s"', tag)
self.jobs[:] = (job for job in self.jobs if tag not in job.tags)

def cancel_job(self, job):
def cancel_job(self, job: "Job") -> None:
"""
Delete a scheduled job.
Expand All @@ -157,7 +158,7 @@ def cancel_job(self, job):
except ValueError:
logger.debug('Cancelling not-scheduled job "%s"', str(job))

def every(self, interval=1):
def every(self, interval: int = 1) -> "Job":
"""
Schedule a new periodic job.
Expand All @@ -167,13 +168,13 @@ def every(self, interval=1):
job = Job(interval, self)
return job

def _run_job(self, job):
def _run_job(self, job: "Job") -> None:
ret = job.run()
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)

@property
def next_run(self):
def next_run(self) -> Optional[datetime.datetime]:
"""
Datetime when the next job should run.
Expand All @@ -185,7 +186,7 @@ def next_run(self):
return min(self.jobs).next_run

@property
def idle_seconds(self):
def idle_seconds(self) -> Optional[float]:
"""
:return: Number of seconds until
:meth:`next_run <Scheduler.next_run>`
Expand Down Expand Up @@ -214,40 +215,51 @@ class Job(object):
method, which also defines its `interval`.
"""

def __init__(self, interval, scheduler=None):
self.interval = interval # pause interval * unit between runs
self.latest = None # upper limit to the interval
self.job_func = None # the job job_func to run
self.unit = None # time units, e.g. 'minutes', 'hours', ...
self.at_time = None # optional time at which this job runs
self.last_run = None # datetime of the last run
self.next_run = None # datetime of the next run
self.period = None # timedelta between runs, only valid for
self.start_day = None # Specific day of the week to start on
self.tags = set() # unique set of tags for the job
self.scheduler = scheduler # scheduler to register with

def __lt__(self, other):
def __init__(self, interval: int, scheduler: Scheduler = None):
self.interval: int = interval # pause interval * unit between runs
self.latest: Optional[int] = None # upper limit to the interval
self.job_func: Optional[functools.partial] = None # the job job_func to run

# time units, e.g. 'minutes', 'hours', ...
self.unit: Optional[str] = None

# optional time at which this job runs
self.at_time: Optional[datetime.time] = None

# datetime of the last run
self.last_run: Optional[datetime.datetime] = None

# datetime of the next run
self.next_run: Optional[datetime.datetime] = None

# timedelta between runs, only valid for
self.period: Optional[datetime.timedelta] = None

# Specific day of the week to start on
self.start_day: Optional[str] = None

self.tags: Set[Hashable] = set() # unique set of tags for the job
self.scheduler: Optional[Scheduler] = scheduler # scheduler to register with

def __lt__(self, other) -> bool:
"""
PeriodicJobs are sortable based on the scheduled time they
run next.
"""
return self.next_run < other.next_run

def __str__(self):
def __str__(self) -> str:
if hasattr(self.job_func, "__name__"):
job_func_name = self.job_func.__name__
job_func_name = self.job_func.__name__ # type: ignore
else:
job_func_name = repr(self.job_func)

return (
"Job(interval={}, unit={}, do={}, args={}, kwargs={})"
).format(
return ("Job(interval={}, unit={}, do={}, args={}, kwargs={})").format(
self.interval,
self.unit,
job_func_name,
self.job_func.args,
self.job_func.keywords,
"()" if self.job_func is None else self.job_func.args,
"{}" if self.job_func is None else self.job_func.keywords,
)

def __repr__(self):
Expand Down Expand Up @@ -397,7 +409,7 @@ def sunday(self):
self.start_day = "sunday"
return self.weeks

def tag(self, *tags):
def tag(self, *tags: Hashable):
"""
Tags the job with one or more unique indentifiers.
Expand Down Expand Up @@ -444,6 +456,9 @@ def at(self, time_str):
if not re.match(r"^:[0-5]\d$", time_str):
raise ScheduleValueError(("Invalid time format for a minutely job"))
time_values = time_str.split(":")
hour: Union[str, int]
mintue: Union[str, int]
second: Union[str, int]
if len(time_values) == 3:
hour, minute, second = time_values
elif len(time_values) == 2 and self.unit == "minutes":
Expand All @@ -470,7 +485,7 @@ def at(self, time_str):
self.at_time = datetime.time(hour, minute, second)
return self

def to(self, latest):
def to(self, latest: int):
"""
Schedule the job to run at an irregular (randomized) interval.
Expand All @@ -485,7 +500,7 @@ def to(self, latest):
self.latest = latest
return self

def do(self, job_func, *args, **kwargs):
def do(self, job_func: Callable, *args, **kwargs):
"""
Specifies the job_func that should be called every time the
job runs.
Expand All @@ -499,14 +514,20 @@ def do(self, job_func, *args, **kwargs):
self.job_func = functools.partial(job_func, *args, **kwargs)
functools.update_wrapper(self.job_func, job_func)
self._schedule_next_run()
if self.scheduler is None:
raise ScheduleError(
"Unable to a add job to schedule. "
"Job is not associated with a scheduler."
)
self.scheduler.jobs.append(self)
return self

@property
def should_run(self):
def should_run(self) -> bool:
"""
:return: ``True`` if the job should be run now.
"""
assert self.next_run is not None, "must run _schedule_next_run before"
return datetime.datetime.now() >= self.next_run

def run(self):
Expand All @@ -521,7 +542,7 @@ def run(self):
self._schedule_next_run()
return ret

def _schedule_next_run(self):
def _schedule_next_run(self) -> None:
"""
Compute the instant when this job should run next.
"""
Expand Down Expand Up @@ -566,7 +587,7 @@ def _schedule_next_run(self):
kwargs["hour"] = self.at_time.hour
if self.unit in ["days", "hours"] or self.start_day is not None:
kwargs["minute"] = self.at_time.minute
self.next_run = self.next_run.replace(**kwargs)
self.next_run = self.next_run.replace(**kwargs) # type: ignore
# Make sure we run at the specified time *today* (or *this hour*)
# as well. This accounts for when a job takes so long it finished
# in the next period.
Expand Down Expand Up @@ -604,56 +625,56 @@ def _schedule_next_run(self):
jobs = default_scheduler.jobs # todo: should this be a copy, e.g. jobs()?


def every(interval=1):
def every(interval: int = 1) -> Job:
"""Calls :meth:`every <Scheduler.every>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.every(interval)


def run_pending():
def run_pending() -> None:
"""Calls :meth:`run_pending <Scheduler.run_pending>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.run_pending()


def run_all(delay_seconds=0):
def run_all(delay_seconds: int = 0) -> None:
"""Calls :meth:`run_all <Scheduler.run_all>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.run_all(delay_seconds=delay_seconds)


def get_jobs(tag=None):
def get_jobs(tag: Optional[Hashable] = None) -> List[Job]:
"""Calls :meth:`get_jobs <Scheduler.get_jobs>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.get_jobs(tag)


def clear(tag=None):
def clear(tag: Optional[Hashable] = None) -> None:
"""Calls :meth:`clear <Scheduler.clear>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.clear(tag)


def cancel_job(job):
def cancel_job(job: Job) -> None:
"""Calls :meth:`cancel_job <Scheduler.cancel_job>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.cancel_job(job)


def next_run():
def next_run() -> Optional[datetime.datetime]:
"""Calls :meth:`next_run <Scheduler.next_run>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.next_run


def idle_seconds():
def idle_seconds() -> Optional[float]:
"""Calls :meth:`idle_seconds <Scheduler.idle_seconds>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[mypy]
files=schedule
ignore_missing_imports = True
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ deps = -rrequirements-dev.txt
commands =
py.test test_schedule.py --flake8 schedule -v --cov schedule --cov-report term-missing
python setup.py check --strict --metadata --restructuredtext
python -m mypy -p schedule

[testenv:docs]
changedir = docs
Expand Down

0 comments on commit de6e8bc

Please sign in to comment.