forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 40
/
timeutils.py
126 lines (93 loc) · 3.79 KB
/
timeutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import math
from datetime import datetime, timedelta
from dateutil.parser import parse as parse_iso8601
from kombu.utils import partition
DAYNAMES = "sun", "mon", "tue", "wed", "thu", "fri", "sat"
WEEKDAYS = dict((name, dow) for name, dow in zip(DAYNAMES, range(7)))
RATE_MODIFIER_MAP = {"s": lambda n: n,
"m": lambda n: n / 60.0,
"h": lambda n: n / 60.0 / 60.0}
HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, "total_seconds")
TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
("hour", 60 * 60, lambda n: int(math.ceil(n))),
("minute", 60, lambda n: int(math.ceil(n))),
("second", 1, lambda n: "%.2f" % n))
def timedelta_seconds(delta): # pragma: no cover
"""Convert :class:`datetime.timedelta` to seconds.
Doesn't account for negative values.
"""
if HAVE_TIMEDELTA_TOTAL_SECONDS:
# Should return 0 for negative seconds
return max(delta.total_seconds(), 0)
if delta.days < 0:
return 0
return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5)
def delta_resolution(dt, delta):
"""Round a datetime to the resolution of a timedelta.
If the timedelta is in days, the datetime will be rounded
to the nearest days, if the timedelta is in hours the datetime
will be rounded to the nearest hour, and so on until seconds
which will just return the original datetime.
"""
delta = timedelta_seconds(delta)
resolutions = ((3, lambda x: x / 86400),
(4, lambda x: x / 3600),
(5, lambda x: x / 60))
args = dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second
for res, predicate in resolutions:
if predicate(delta) >= 1.0:
return datetime(*args[:res])
return dt
def remaining(start, ends_in, now=None, relative=True):
"""Calculate the remaining time for a start date and a timedelta.
e.g. "how many seconds left for 30 seconds after start?"
:param start: Start :class:`~datetime.datetime`.
:param ends_in: The end delta as a :class:`~datetime.timedelta`.
:keyword relative: If set to :const:`False`, the end time will be
calculated using :func:`delta_resolution` (i.e. rounded to the
resolution of `ends_in`).
:keyword now: Function returning the current time and date,
defaults to :func:`datetime.now`.
"""
now = now or datetime.now()
end_date = start + ends_in
if not relative:
end_date = delta_resolution(end_date, ends_in)
return end_date - now
def rate(rate):
"""Parses rate strings, such as `"100/m"` or `"2/h"`
and converts them to seconds."""
if rate:
if isinstance(rate, basestring):
ops, _, modifier = partition(rate, "/")
return RATE_MODIFIER_MAP[modifier or "s"](int(ops)) or 0
return rate or 0
return 0
def weekday(name):
"""Return the position of a weekday (0 - 7, where 0 is Sunday).
Example::
>>> weekday("sunday"), weekday("sun"), weekday("mon")
(0, 0, 1)
"""
abbreviation = name[0:3].lower()
try:
return WEEKDAYS[abbreviation]
except KeyError:
# Show original day name in exception, instead of abbr.
raise KeyError(name)
def humanize_seconds(secs, prefix=""):
"""Show seconds in human form, e.g. 60 is "1 minute", 7200 is "2
hours"."""
for unit, divider, formatter in TIME_UNITS:
if secs >= divider:
w = secs / divider
punit = w > 1 and (unit + "s") or unit
return "%s%s %s" % (prefix, formatter(w), punit)
return "now"
def maybe_iso8601(dt):
"""`Either datetime | str -> datetime or None -> None`"""
if not dt:
return
if isinstance(dt, datetime):
return dt
return parse_iso8601(dt)