-
Notifications
You must be signed in to change notification settings - Fork 369
/
buffer.rb
328 lines (271 loc) · 9.32 KB
/
buffer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# typed: true
require 'ddtrace/diagnostics/health'
# Trace buffer that accumulates traces for a consumer.
# Consumption can happen from a different thread.
module Datadog
# Buffer that stores objects. The buffer has a maximum size and when
# the buffer is full, a random object is discarded.
class Buffer
def initialize(max_size)
@max_size = max_size
@items = []
@closed = false
end
# Add a new ``item`` in the local queue. This method doesn't block the execution
# even if the buffer is full. In that case, a random item is discarded.
def push(item)
return if closed?
full? ? replace!(item) : add!(item)
item
end
# A bulk push alternative to +#push+. Use this method if
# pushing more than one item for efficiency.
def concat(items)
return if closed?
# Segment items into underflow and overflow
underflow, overflow = overflow_segments(items)
# Concatenate items do not exceed capacity.
add_all!(underflow) unless underflow.nil?
# Iteratively replace items, to ensure pseudo-random replacement.
overflow.each { |item| replace!(item) } unless overflow.nil?
end
# Stored items are returned and the local buffer is reset.
def pop
drain!
end
# Return the current number of stored traces.
def length
@items.length
end
# Return if the buffer is empty.
def empty?
@items.empty?
end
# Closes this buffer, preventing further pushing.
# Draining is still allowed.
def close
@closed = true
end
def closed?
@closed
end
protected
# Segment items into two distinct segments: underflow and overflow.
# Underflow are items that will fit into buffer.
# Overflow are items that will exceed capacity, after underflow is added.
# Returns each array, and nil if there is no underflow/overflow.
def overflow_segments(items)
underflow = nil
overflow = nil
overflow_size = @max_size > 0 ? (@items.length + items.length) - @max_size : 0
if overflow_size > 0
# Items will overflow
if overflow_size < items.length
# Partial overflow
underflow_end_index = items.length - overflow_size - 1
underflow = items[0..underflow_end_index]
overflow = items[(underflow_end_index + 1)..-1]
else
# Total overflow
overflow = items
end
else
# Items do not exceed capacity.
underflow = items
end
[underflow, overflow]
end
def full?
@max_size > 0 && @items.length >= @max_size
end
def add_all!(items)
@items.concat(items)
end
def add!(item)
@items << item
end
def replace!(item)
# Choose random item to be replaced
replace_index = rand(@items.length)
# Replace random item
discarded_item = @items[replace_index]
@items[replace_index] = item
# Return discarded item
discarded_item
end
def drain!
items = @items
@items = []
items
end
end
# Buffer that stores objects, has a maximum size, and
# can be safely used concurrently on any environment.
#
# This implementation uses a {Mutex} around public methods, incurring
# overhead in order to ensure thread-safety.
#
# This is implementation is recommended for non-CRuby environments.
# If using CRuby, {Datadog::CRubyBuffer} is a faster implementation with minimal compromise.
class ThreadSafeBuffer < Buffer
def initialize(max_size)
super
@mutex = Mutex.new
end
# Add a new ``item`` in the local queue. This method doesn't block the execution
# even if the buffer is full. In that case, a random item is discarded.
def push(item)
synchronize { super }
end
def concat(items)
synchronize { super }
end
# Return the current number of stored traces.
def length
synchronize { super }
end
# Return if the buffer is empty.
def empty?
synchronize { super }
end
# Stored traces are returned and the local buffer is reset.
def pop
synchronize { super }
end
def close
synchronize { super }
end
def synchronize(&block)
@mutex.synchronize(&block)
end
end
# Buffer that stores objects, has a maximum size, and
# can be safely used concurrently with CRuby.
#
# Under extreme concurrency scenarios, this class can exceed
# its +max_size+ by up to 4%.
#
# Because singular +Array+ operations are thread-safe in CRuby,
# we can implement the trace buffer without an explicit lock,
# while making the compromise of allowing the buffer to go
# over its maximum limit under extreme circumstances.
#
# On the following scenario:
# * 4.5 million spans/second.
# * Pushed into a single CRubyTraceBuffer from 1000 threads.
# The buffer can exceed its maximum size by no more than 4%.
#
# This implementation allocates less memory and is faster
# than {Datadog::ThreadSafeBuffer}.
#
# @see spec/ddtrace/benchmark/buffer_benchmark_spec.rb Buffer benchmarks
# @see https://github.com/ruby-concurrency/concurrent-ruby/blob/c1114a0c6891d9634f019f1f9fe58dcae8658964/lib/concurrent-ruby/concurrent/array.rb#L23-L27
class CRubyBuffer < Buffer
# Add a new ``trace`` in the local queue. This method doesn't block the execution
# even if the buffer is full. In that case, a random trace is discarded.
def replace!(item)
# we should replace a random trace with the new one
replace_index = rand(@items.size)
replaced_trace = @items.delete_at(replace_index)
@items << item
# We might have deleted an element right when the buffer
# was drained, thus +replaced_trace+ will be +nil+.
# In that case, nothing was replaced, and this method
# performed a simple insertion into the buffer.
replaced_trace
end
end
# Health metrics for trace buffers.
module MeasuredBuffer
include Kernel # Ensure that kernel methods are always available (https://sorbet.org/docs/error-reference#7003)
def initialize(*_)
super
@buffer_accepted = 0
@buffer_accepted_lengths = 0
@buffer_dropped = 0
@buffer_spans = 0
end
def add!(trace)
super
# Emit health metrics
measure_accept(trace)
end
def add_all!(traces)
super
# Emit health metrics
traces.each { |trace| measure_accept(trace) }
end
def replace!(trace)
discarded_trace = super
# Emit health metrics
measure_accept(trace)
measure_drop(discarded_trace) if discarded_trace
discarded_trace
end
# Stored traces are returned and the local buffer is reset.
def drain!
traces = super
measure_pop(traces)
traces
end
def measure_accept(trace)
@buffer_accepted += 1
@buffer_accepted_lengths += trace.length
@buffer_spans += trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue accept. Cause: #{e.message} Source: #{Array(e.backtrace).first}")
end
def measure_drop(trace)
@buffer_dropped += 1
@buffer_spans -= trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue drop. Cause: #{e.message} Source: #{Array(e.backtrace).first}")
end
def measure_pop(traces)
# Accepted, cumulative totals
Datadog.health_metrics.queue_accepted(@buffer_accepted)
Datadog.health_metrics.queue_accepted_lengths(@buffer_accepted_lengths)
# Dropped, cumulative totals
Datadog.health_metrics.queue_dropped(@buffer_dropped)
# TODO: are we missing a +queue_dropped_lengths+ metric?
# Queue gauges, current values
Datadog.health_metrics.queue_max_length(@max_size)
Datadog.health_metrics.queue_spans(@buffer_spans)
Datadog.health_metrics.queue_length(traces.length)
# Reset aggregated metrics
@buffer_accepted = 0
@buffer_accepted_lengths = 0
@buffer_dropped = 0
@buffer_spans = 0
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue. Cause: #{e.message} Source: #{Array(e.backtrace).first}")
end
end
# Trace buffer that stores application traces, has a maximum size, and
# can be safely used concurrently on any environment.
#
# @see {Datadog::ThreadSafeBuffer}
class ThreadSafeTraceBuffer < ThreadSafeBuffer
prepend MeasuredBuffer
end
# Trace buffer that stores application traces, has a maximum size, and
# can be safely used concurrently with CRuby.
#
# @see {Datadog::CRubyBuffer}
class CRubyTraceBuffer < CRubyBuffer
prepend MeasuredBuffer
end
# Trace buffer that stores application traces. The buffer has a maximum size and when
# the buffer is full, a random trace is discarded. This class is thread-safe and is used
# automatically by the ``Tracer`` instance when a ``Span`` is finished.
#
# We choose the default TraceBuffer implementation for current platform dynamically here.
#
# TODO We should restructure this module, so that classes are not declared at top-level ::Datadog.
# TODO Making such a change is potentially breaking for users manually configuring the tracer.
TraceBuffer = if Datadog::Core::Environment::Ext::RUBY_ENGINE == 'ruby'
CRubyTraceBuffer
else
ThreadSafeTraceBuffer
end
end