/
filter.py
370 lines (332 loc) · 13.8 KB
/
filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
from .utils import *
from datetime import timedelta, datetime, date
import time
import re
import logging
logger = logging.getLogger(__name__)
REGEX_MAP = {
'timestring': r'^.*{0}.*$',
'newer_than': r'(?P<date>{0})',
'older_than': r'(?P<date>{0})',
'prefix': r'^{0}.*$',
'suffix': r'^.*{0}$',
}
DATE_REGEX = {
'Y' : '4',
'y' : '2',
'm' : '2',
'W' : '2',
'U' : '2',
'd' : '2',
'H' : '2',
'M' : '2',
'S' : '2',
'j' : '1,3',
}
def build_filter(
kindOf=None, time_unit=None, timestring=None,
groupname='date', value=None,
):
"""
Return a filter object based on the arguments.
:arg kindOf: Can be one of:
[older_than|newer_than|suffix|prefix|regex|timestring|exclude].
This option defines what kind of filter you will be building.
:arg groupname: The name of a named capture in pattern. Currently only acts
on 'date'
:arg timestring: An strftime string to match the datestamp in an index name.
Only used for time-based filtering.
:arg time_unit: One of ``hours``, ``days``, ``weeks``, ``months``.
(default: ``days``). Only used for time-based filtering.
:arg value: Depends on `kindOf`. It's a time-unit multiplier for
`older_than` and `newer_than`. It is the strftime string if `kindOf` is
`timestring`. It's used to build the regular expression for other kinds.
"""
if kindOf not in [ 'older_than', 'newer_than', 'regex',
'exclude', 'prefix', 'suffix', 'timestring' ]:
logger.error('{0}: Invalid value for kindOf'.format(kindOf))
return {}
# Stop here if None or empty value, but zero is okay
if value == 0:
logger.info("I found a zero value")
argdict = {}
elif not value:
return {}
else:
argdict = {}
if kindOf in ['older_than', 'newer_than']:
if not time_unit:
logger.error("older_than and newer_than require time_unit parameter")
return {}
if not timestring:
logger.error("older_than and newer_than require timestring parameter")
return {}
argdict = { "groupname":groupname, "time_unit":time_unit,
"timestring": timestring, "value": value,
"method": kindOf }
date_regex = get_date_regex(timestring)
regex = REGEX_MAP[kindOf].format(date_regex)
elif kindOf == 'timestring':
regex = r'^.*{0}.*$'.format(get_date_regex(value))
elif kindOf == 'regex':
regex = r'{0}'.format(value)
elif kindOf in ['prefix', 'suffix']:
regex = REGEX_MAP[kindOf].format(value)
if kindOf == 'exclude':
regex = r'{0}'.format(value)
argdict['exclude'] = True
logger.debug("REGEX = {0}".format(regex))
argdict['pattern'] = regex
logger.debug("Added filter: {0}".format(argdict))
return argdict
def apply_filter(
items, pattern=None, exclude=False, groupname=None, timestring=None,
time_unit=None, method=None, value=None, utc_now=None):
"""Iterate over all items in the list and return a list of matches
:arg items: A list of indices or snapshots to act on
:arg pattern: A regular expression to iterate all indices against.
:arg exclude: If `True`, exclude matches rather than include
:arg groupname: The name of a named capture in pattern. Currently only acts
on 'date'
:arg timestring: An strftime string to match the datestamp in an index name.
Only used for time-based filtering.
:arg time_unit: One of ``hours``, ``days``, ``weeks``, ``months``.
(default: ``days``). Only used for time-based filtering.
:arg method: Either ``older_than`` or ``newer_than``. Only used for
time-based filtering.
:arg value: `time_unit` multiplier used to calculate time window. Only
used for time-based filtering.
:arg utc_now: Used for testing. Overrides current time with specified time.
"""
if not pattern:
logger.error("Missing required pattern parameter.")
return None
p = re.compile(pattern)
if exclude:
return list(filter(lambda x: not p.search(x), items))
result = []
items = ensure_list(items)
for item in items:
match = False
if groupname:
m = p.search(item)
if m:
if m.group(groupname):
if groupname == "date":
timestamp = m.group(groupname)
# Get a boolean result
match = timestamp_check(
timestamp, timestring=timestring,
time_unit=time_unit, method=method,
value=value, utc_now=utc_now,
)
else:
m = p.match(item)
if m:
match = True
if match == True:
result.append(item)
return result
def get_date_regex(timestring):
"""
Return a regex string based on a provided strftime timestring.
:arg timestring: An strftime pattern
:rtype: str
"""
prev = ''; curr = ''; regex = ''
for s in range(0, len(timestring)):
curr = timestring[s]
if curr == '%':
pass
elif curr in DATE_REGEX and prev == '%':
regex += '\d{' + DATE_REGEX[curr] + '}'
else:
regex += "\\" + curr
prev = curr
logger.debug("regex = {0}".format(regex))
return regex
def get_datetime(index_timestamp, timestring):
"""
Return the datetime extracted from the index name, which is the index
creation time.
:arg index_timestamp: The timestamp extracted from an index name
:arg timestring: An strftime pattern
:rtype: Datetime object
"""
# Compensate for week of year by appending '%w' to the timestring
# and '1' (Monday) to index_timestamp
if '%W' in timestring:
timestring += '%w'
index_timestamp += '1'
elif '%U' in timestring:
timestring += '%w'
index_timestamp += '1'
elif '%m' in timestring:
if not '%d' in timestring:
timestring += '%d'
index_timestamp += '1'
#logger.debug("index_timestamp: {0}, timestring: {1}, return value: {2}".format(index_timestamp, timestring, datetime.strptime(index_timestamp, timestring)))
return datetime.strptime(index_timestamp, timestring)
def month_bump(datestamp, sign='positive'):
"""
This method returns either the next or previous month based on the value of
sign.
:arg datestamp: A datetime datestamp
:arg sign: Either `positive` or `negative`
:rtype: Datetime object
"""
if sign is not 'positive' and sign is not 'negative':
raise ValueError
if sign == 'positive':
if datestamp.month == 1:
return date(datestamp.year-1, 12, 1)
else:
return date(datestamp.year, datestamp.month-1, 1)
else:
if datestamp.month == 12:
return date(datestamp.year+1, 1, 1)
else:
return date(datestamp.year, datestamp.month+1, 1)
def get_target_month(month_count, utc_now=None):
"""
Return datetime object for number of *full* months older than
`month_count` from now, or `utc_now`, if provided.
:arg month_count: Number of *full* months
:arg utc_now: Used for testing. Overrides current time with specified time.
:rtype: Datetime object
"""
utc_now = date(utc_now.year, utc_now.month, 1) if utc_now else date.today()
target_date = date(utc_now.year, utc_now.month, 1)
if month_count < 0:
for i in range(0, month_count, -1):
target_date = month_bump(target_date, sign='negative')
elif month_count > 0:
for i in range(0, month_count):
target_date = month_bump(target_date)
return datetime(target_date.year, target_date.month, target_date.day)
def get_cutoff(unit_count=None, time_unit='days', utc_now=None):
"""
Find the cutoff time based on `unit_count` and `time_unit`.
:arg unit_count: `time_unit` multiplier
:arg time_unit: One of ``hours``, ``days``, ``weeks``, ``months``. (default:
``days``)
:arg utc_now: Used for testing. Overrides current time with specified time.
:rtype: Datetime object
"""
if unit_count == None:
logger.error("Missing value for unit_count.")
return False
if type(unit_count) != type(int()):
logger.error("Non-integer value for unit_count.")
return False
# time-injection for test purposes only
utc_now = utc_now if utc_now else datetime.utcnow()
# reset to start of the period to be sure we are not retiring a human by mistake
utc_now = utc_now.replace(minute=0, second=0, microsecond=0)
if time_unit == 'days':
utc_now = utc_now.replace(hour=0)
if time_unit == 'weeks':
# Since week math always uses Monday as the start of the week,
# this work-around resets utc_now to be Monday of the current week.
weeknow = utc_now.strftime('%Y-%W')
utc_now = get_datetime(weeknow, '%Y-%W')
if time_unit == 'months':
utc_now = utc_now.replace(hour=0)
cutoff = get_target_month(unit_count, utc_now=utc_now)
else:
# This cutoff must be a multiple of time_units
if unit_count < 0:
cutoff = utc_now - timedelta(**{time_unit: (unit_count)})
else:
cutoff = utc_now - timedelta(**{time_unit: (unit_count - 1)})
#logger.debug("time_cutoff: {0}".format(cutoff))
return cutoff
def timestamp_check(timestamp, timestring=None, time_unit=None,
method='older_than', value=None, utc_now=None):
"""
Check `timestamp` to see if it is `value` * `time_unit`
`method` (``older_than`` or ``newer_than``) the calculated cutoff.
:arg timestamp: An strftime parsable date string.
:arg timestring: An strftime string to match against ``timestamp``.
:arg time_unit: One of ``hours``, ``days``, ``weeks``, ``months``.
:arg method: ``older_than`` or ``newer_than``.
:arg value: `time_unit` multiplier used to calculate time window.
:arg utc_now: Used for testing. Overrides current time with specified time.
:rtype: bool
"""
cutoff = get_cutoff(unit_count=value, time_unit=time_unit, utc_now=utc_now)
if not cutoff:
logger.error('No cutoff value.')
return False
try:
object_time = get_datetime(timestamp, timestring)
except ValueError:
logger.error('Could not extract a timestamp matching {0} from timestring {1}'.format(timestamp, timestring))
return False
if method == "older_than":
if object_time < cutoff:
return True
elif method == "newer_than":
if object_time > cutoff:
return True
logger.debug('Timestamp "{0}" is outside the cutoff period ({1} {2} {3}).'.format(
timestamp, method.replace('_', ' '), value, time_unit))
return False
def filter_by_space(client, indices, disk_space=None, reverse=True):
"""
Remove indices from the provided list of indices based on space consumed,
sorted reverse-alphabetically by default. If you set `reverse` to
`False`, it will be sorted alphabetically.
The default is usually what you will want. If only one kind of index is
provided--for example, indices matching ``logstash-%Y.%m.%d``--then reverse
alphabetical sorting will mean the oldest get removed first, because lower
numbers in the dates mean older indices.
By setting reverse to `False`, then ``index3`` will be deleted before
``index2``, which will be deleted before ``index1``
:arg client: The Elasticsearch client connection
:arg indices: A list of indices to act on
:arg disk_space: Filter indices over *n* gigabytes
:arg reverse: The filtering direction. (default: `True`)
:rtype: list
"""
def get_stat_list(stats):
retval = []
for index_name in stats['indices']:
size = stats['indices'][index_name]['total']['store']['size_in_bytes']
logger.debug('Index: {0} Size: {1}'.format(index_name, size))
retval.append((index_name, size))
return retval
# Ensure that disk_space is a float
if disk_space:
disk_space = float(disk_space)
else:
logger.error("Mising value for disk_space.")
return False
disk_usage = 0.0
disk_limit = disk_space * 2**30
delete_list = []
not_closed = [i for i in indices if not index_closed(client, i)]
# Because we're building a csv list of indices to pass, we need to ensure
# that we actually have at least one index before calling
# client.indices.status, otherwise the call will match _all indices, which
# is very bad.
# See https://github.com/elastic/curator/issues/254
logger.debug('List of open indices: {0}'.format(sorted(not_closed)))
if not_closed:
if len(to_csv(not_closed)) > 3072:
logger.warn('Very large list of indices. Breaking it up into smaller chunks.')
index_lists = chunk_index_list(not_closed)
statlist = []
for l in index_lists:
statlist.extend(get_stat_list(client.indices.stats(index=to_csv(l))))
else:
statlist = get_stat_list(client.indices.stats(index=to_csv(not_closed)))
sorted_indices = sorted(statlist, reverse=reverse)
for index_name, index_size in sorted_indices:
disk_usage += index_size
if disk_usage > disk_limit:
delete_list.append(index_name)
logger.info('Deleting {0}, summed disk usage is {1:.3f} GB and disk limit is {2:.3f} GB.'.format(index_name, disk_usage/2**30, disk_limit/2**30))
else:
logger.info('skipping {0}, summed disk usage is {1:.3f} GB and disk limit is {2:.3f} GB.'.format(index_name, disk_usage/2**30, disk_limit/2**30))
return delete_list