This repository has been archived by the owner on Jan 20, 2020. It is now read-only.
/
filters.py
205 lines (162 loc) · 5.79 KB
/
filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
Filters used by GrandFatherSon to decide which datetimes to keep.
"""
import calendar
from datetime import datetime, time, timedelta, tzinfo
# As gleefully stolen from the python datetime docs
class UTC(tzinfo):
"""UTC"""
ZERO = timedelta(0)
def utcoffset(self, dt):
return self.ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return self.ZERO
class Filter(object):
"""Base class."""
@classmethod
def mask(cls, dt, **options):
"""
Return a datetime with the same value as ``dt``, keeping only
significant values.
"""
raise NotImplemented
@classmethod
def start(cls, now, number, **options):
"""
Return the starting datetime: ``number`` of units before ``now``.
"""
return (cls.mask(now, **options) -
timedelta(**{cls.__name__.lower(): number - 1}))
@classmethod
def filter(cls, datetimes, number, now=None, **options):
"""Return a set of datetimes, after filtering ``datetimes``.
The result will be the ``datetimes`` which are ``number`` of
units before ``now``, until ``now``, with approximately one
unit between each of them. The first datetime for any unit is
kept, later duplicates are removed.
If there are ``datetimes`` after ``now``, they will be
returned unfiltered.
"""
if number < 0 or not isinstance(number, (int, long)):
raise ValueError('Invalid number: %s' % number)
datetimes = tuple(datetimes)
# Sample the first datetime to see if it is timezone-aware
tzinfo = None
if datetimes and datetimes[0].tzinfo is not None:
tzinfo = UTC()
if now is None:
now = datetime.now(tzinfo)
if not hasattr(now, 'second'):
# now looks like a date, so convert it into a datetime
now = datetime.combine(now, time(23, 59, 59, 999999, tzinfo=tzinfo))
# Always keep datetimes from the future
future = set(dt for dt in datetimes if dt > now)
if number == 0:
return future
# Don't consider datetimes from before the start
start = cls.start(now, number, **options)
valid = (dt for dt in datetimes if start <= dt <= now)
# Deduplicate datetimes with the same mask() value by keeping
# the oldest.
kept = {}
for dt in sorted(valid):
kept.setdefault(cls.mask(dt), dt)
return set(kept.values()) | future
class Seconds(Filter):
@classmethod
def mask(cls, dt, **options):
"""
Return a datetime with the same value as ``dt``, to a
resolution of seconds.
"""
return dt.replace(microsecond=0)
class Minutes(Filter):
@classmethod
def mask(cls, dt, **options):
"""
Return a datetime with the same value as ``dt``, to a
resolution of minutes.
"""
return dt.replace(second=0, microsecond=0)
class Hours(Filter):
@classmethod
def mask(cls, dt, **options):
"""
Return a datetime with the same value as ``dt``, to a
resolution of hours.
"""
return dt.replace(minute=0, second=0, microsecond=0)
class Days(Filter):
@classmethod
def mask(cls, dt, **options):
"""
Return a datetime with the same value as ``dt``, to a
resolution of days.
"""
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
class Weeks(Filter):
DAYS_IN_WEEK = 7
@classmethod
def start(cls, now, number, firstweekday=calendar.SATURDAY, **options):
"""
Return the starting datetime: ``number`` of weeks before ``now``.
``firstweekday`` determines when the week starts. It defaults
to Saturday.
"""
week = cls.mask(now, firstweekday=firstweekday, **options)
days = (number - 1) * cls.DAYS_IN_WEEK
return week - timedelta(days=days)
@classmethod
def mask(cls, dt, firstweekday=calendar.SATURDAY, **options):
"""
Return a datetime with the same value as ``dt``, to a
resolution of weeks.
``firstweekday`` determines when the week starts. It defaults
to Saturday.
"""
correction = (dt.weekday() - firstweekday) % cls.DAYS_IN_WEEK
week = dt - timedelta(days=correction)
return week.replace(hour=0, minute=0, second=0, microsecond=0)
class Months(Filter):
MONTHS_IN_YEAR = 12
@classmethod
def start(cls, now, number, **options):
"""
Return the starting datetime: ``number`` of months before ``now``.
"""
year = now.year
month = now.month - number + 1
# Handle negative months
if month < 0:
year = year + (month / cls.MONTHS_IN_YEAR)
month = month % cls.MONTHS_IN_YEAR
# Handle December
if month == 0:
year = year - 1
month = 12
return cls.mask(now, **options).replace(year=year, month=month)
@classmethod
def mask(cls, dt, **options):
"""
Return a datetime with the same value as ``dt``, to a
resolution of months.
"""
return dt.replace(day=1,
hour=0, minute=0, second=0, microsecond=0)
class Years(Filter):
@classmethod
def start(cls, now, number, **options):
"""
Return the starting datetime: ``number`` of years before ``now``.
"""
return cls.mask(now).replace(year=(now.year - number + 1))
@classmethod
def mask(cls, dt, **options):
"""
Return a datetime with the same value as ``dt``, to a
resolution of years.
"""
return dt.replace(month=1, day=1,
hour=0, minute=0, second=0, microsecond=0)