/
cache.py
311 lines (247 loc) · 9.37 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
"""Copyright 2009 Chris Davis
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import time
import threading
from operator import itemgetter
from random import choice
from collections import defaultdict
from carbon.conf import settings
from carbon import events, log
from carbon.pipeline import Processor
from carbon.util import TaggedSeries
def by_timestamp(t_v): # useful sort key function
(timestamp, _) = t_v
return timestamp
class CacheFeedingProcessor(Processor):
plugin_name = 'write'
def __init__(self, *args, **kwargs):
super(CacheFeedingProcessor, self).__init__(*args, **kwargs)
self.cache = MetricCache()
def process(self, metric, datapoint):
# normalize metric name (reorder tags)
try:
metric = TaggedSeries.parse(metric).path
except Exception as err:
log.msg('Error parsing metric %s: %s' % (metric, err))
self.cache.store(metric, datapoint)
return Processor.NO_OUTPUT
class DrainStrategy(object):
"""Implements the strategy for writing metrics.
The strategy chooses what order (if any) metrics
will be popped from the backing cache"""
def __init__(self, cache):
self.cache = cache
def choose_item(self):
raise NotImplementedError()
def store(self, metric):
pass
class NaiveStrategy(DrainStrategy):
"""Pop points in an unordered fashion."""
def __init__(self, cache):
super(NaiveStrategy, self).__init__(cache)
def _generate_queue():
while True:
metric_names = list(self.cache.keys())
while metric_names:
yield metric_names.pop()
self.queue = _generate_queue()
def choose_item(self):
return next(self.queue)
class MaxStrategy(DrainStrategy):
"""Always pop the metric with the greatest number of points stored.
This method leads to less variance in pointsPerUpdate but may mean
that infrequently or irregularly updated metrics may not be written
until shutdown """
def choose_item(self):
metric_name, _ = max(self.cache.items(), key=lambda x: len(itemgetter(1)(x)))
return metric_name
class RandomStrategy(DrainStrategy):
"""Pop points randomly"""
def choose_item(self):
return choice(list(self.cache.keys())) # nosec
class SortedStrategy(DrainStrategy):
""" The default strategy which prefers metrics with a greater number
of cached points but guarantees every point gets written exactly once during
a loop of the cache """
def __init__(self, cache):
super(SortedStrategy, self).__init__(cache)
def _generate_queue():
while True:
t = time.time()
metric_counts = sorted(self.cache.counts, key=lambda x: x[1])
size = len(metric_counts)
if settings.LOG_CACHE_QUEUE_SORTS and size:
log.msg("Sorted %d cache queues in %.6f seconds" % (size, time.time() - t))
while metric_counts:
yield itemgetter(0)(metric_counts.pop())
if settings.LOG_CACHE_QUEUE_SORTS and size:
log.msg("Queue consumed in %.6f seconds" % (time.time() - t))
self.queue = _generate_queue()
def choose_item(self):
return next(self.queue)
class TimeSortedStrategy(DrainStrategy):
""" This strategy prefers metrics wich are lagging behind
guarantees every point gets written exactly once during
a loop of the cache """
def __init__(self, cache):
super(TimeSortedStrategy, self).__init__(cache)
def _generate_queue():
while True:
t = time.time()
metric_lw = sorted(self.cache.watermarks, key=lambda x: x[1], reverse=True)
if settings.MIN_TIMESTAMP_LAG:
metric_lw = [x for x in metric_lw if t - x[1] > settings.MIN_TIMESTAMP_LAG]
size = len(metric_lw)
if settings.LOG_CACHE_QUEUE_SORTS and size:
log.msg("Sorted %d cache queues in %.6f seconds" % (size, time.time() - t))
if not metric_lw:
# If there is nothing to do give a chance to sleep to the reader.
yield None
while metric_lw:
yield itemgetter(0)(metric_lw.pop())
if settings.LOG_CACHE_QUEUE_SORTS and size:
log.msg("Queue consumed in %.6f seconds" % (time.time() - t))
self.queue = _generate_queue()
def choose_item(self):
return next(self.queue)
class BucketMaxStrategy(DrainStrategy):
"""
Same as 'max' strategy but sorts on insertion into buckets instead of at
pop().
"""
def __init__(self, cache):
self.buckets = list()
super(BucketMaxStrategy, self).__init__(cache)
def choose_item(self):
try:
# Largest buckets are empty, remove them.
while len(self.buckets[-1]) == 0:
self.buckets.pop()
# return the metric with the most datapoints. If there is
# more than one metrics which has the most datapoints the
# first seen is returned.
return self.buckets[-1].pop(0)
except (KeyError, IndexError): # buckets are empty
return None
def store(self, metric):
nr_points = len(self.cache[metric])
# No bucket of this size exists, create it
while nr_points > len(self.buckets):
self.buckets.append(list())
# Remove existing metrics from its bucket
if nr_points > 1:
self.buckets[nr_points - 2].remove(metric)
self.buckets[nr_points - 1].append(metric)
class _MetricCache(defaultdict):
"""A Singleton dictionary of metric names and lists of their datapoints"""
def __init__(self, strategy=None):
self.lock = threading.Lock()
self.size = 0
self.strategy = None
self.new_metrics = []
if strategy:
self.strategy = strategy(self)
super(_MetricCache, self).__init__(dict)
@property
def counts(self):
return [(metric, len(datapoints)) for (metric, datapoints)
in self.items()]
@property
def watermarks(self):
return [(metric, min(datapoints.keys()), max(datapoints.keys()))
for (metric, datapoints) in self.items()
if datapoints]
@property
def is_full(self):
if settings.MAX_CACHE_SIZE == float('inf'):
return False
else:
return self.size >= settings.MAX_CACHE_SIZE
def _check_available_space(self):
if state.cacheTooFull and self.size < settings.CACHE_SIZE_LOW_WATERMARK:
log.msg("MetricCache below watermark: self.size=%d" % self.size)
events.cacheSpaceAvailable()
def drain_metric(self):
"""Returns a metric and it's datapoints in order determined by the
`DrainStrategy`_"""
if not self:
return (None, [])
if self.strategy:
with self.lock:
metric = self.strategy.choose_item()
else:
# Avoid .keys() as it dumps the whole list
metric = next(iter(self))
if metric is None:
return (None, [])
return (metric, self.pop(metric))
def get_datapoints(self, metric):
"""Return a list of currently cached datapoints sorted by timestamp"""
return sorted(self.get(metric, {}).items(), key=by_timestamp)
def get_new_metrics(self):
yield self.new_metrics.pop()
def pop_new_metric(self):
# return first seen new metric or None if empty
#try:
# return self.new_metrics.pop(0)
#except IndexError:
# return None
return next(self.get_new_metrics())
def pop(self, metric):
with self.lock:
datapoint_index = defaultdict.pop(self, metric)
self.size -= len(datapoint_index)
self._check_available_space()
return sorted(datapoint_index.items(), key=by_timestamp)
def store(self, metric, datapoint):
timestamp, value = datapoint
with self.lock:
# Metric not in cache yet, push to new_metrics list so it
# can be checked if the db already exists
if metric not in self:
self.new_metrics.append(metric)
if timestamp not in self[metric]:
# Not a duplicate, hence process if cache is not full
if self.is_full:
log.msg("MetricCache is full: self.size=%d" % self.size)
events.cacheFull()
else:
self.size += 1
self[metric][timestamp] = value
if self.strategy:
self.strategy.store(metric)
else:
# Updating a duplicate does not increase the cache size
self[metric][timestamp] = value
_Cache = None
def MetricCache():
global _Cache
if _Cache is not None:
return _Cache
# Initialize a singleton cache instance
# TODO: use plugins.
write_strategy = DrainStrategy
if settings.CACHE_WRITE_STRATEGY == 'naive':
write_strategy = NaiveStrategy
if settings.CACHE_WRITE_STRATEGY == 'max':
write_strategy = MaxStrategy
if settings.CACHE_WRITE_STRATEGY == 'sorted':
write_strategy = SortedStrategy
if settings.CACHE_WRITE_STRATEGY == 'timesorted':
write_strategy = TimeSortedStrategy
if settings.CACHE_WRITE_STRATEGY == 'random':
write_strategy = RandomStrategy
if settings.CACHE_WRITE_STRATEGY == 'bucketmax':
write_strategy = BucketMaxStrategy
_Cache = _MetricCache(write_strategy)
return _Cache
# Avoid import circularities
from carbon import state # NOQA