-
Notifications
You must be signed in to change notification settings - Fork 438
/
operation.go
324 lines (276 loc) · 9.82 KB
/
operation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
// Package dyngo is the Go implementation of Datadog's Instrumentation Gateway
// which provides an event-based instrumentation API based on a stack
// representation of instrumented functions along with nested event listeners.
// It allows to both correlate passed and future function calls in order to
// react and monitor specific function call scenarios, while keeping the
// monitoring state local to the monitoring logic thanks to nested Go function
// closures.
// dyngo is not intended to be directly used and should be instead wrapped
// behind statically and strongly typed wrapper types. Indeed, dyngo is a
// generic implementation relying on empty interface values (values of type
// `interface{}`) and using it directly can be error-prone due to the lack of
// compile-time type-checking. For example, AppSec provides the package
// `httpsec`, built on top of dyngo, as its HTTP instrumentation API and which
// defines the abstract HTTP operation representation expected by the AppSec
// monitoring.
package dyngo
import (
"sync"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"go.uber.org/atomic"
)
// Operation interface type allowing to register event listeners to the
// operation. The event listeners will be automatically removed from the
// operation once it finishes so that it no longer can be called on finished
// operations.
type Operation interface {
// Parent returns the parent operation, or nil for the root operation.
Parent() Operation
// unwrap is an internal method guaranteeing only *operation implements Operation.
unwrap() *operation
}
// ArgOf marks a particular type as being the argument type of a given operation
// type. This allows this type to be listened to by an operation start listener.
// This removes the possibility of incorrectly pairing an operation and payload
// when setting up listeners, as it allows compiler-assisted coherence checks.
type ArgOf[O Operation] interface {
IsArgOf(O)
}
// ResultOf marks a particular type as being the result type of a given
// operation. This allows this type to be listened to by an operation finish
// listener.
// This removes the possibility of incorrectly pairing an operation and payload
// when setting up listeners, as it allows compiler-assisted coherence checks.
type ResultOf[O Operation] interface {
IsResultOf(O)
}
// EventListener interface allowing to identify the Go type listened to and
// dispatch calls to the underlying event listener function.
type EventListener[O Operation, T any] func(O, T)
// Atomic *Operation so we can atomically read or swap it.
var rootOperation atomic.Pointer[Operation]
// SwapRootOperation allows to atomically swap the current root operation with
// the given new one. Concurrent uses of the old root operation on already
// existing and running operation are still valid.
func SwapRootOperation(new Operation) {
rootOperation.Swap(&new)
// Note: calling Finish(old, ...) could result into mem leaks because
// some finish event listeners, possibly releasing memory and resources,
// wouldn't be called anymore (because Finish() disables the operation and
// removes the event listeners).
}
// operation structure allowing to subscribe to operation events and to
// navigate in the operation stack. Events
// bubble-up the operation stack, which allows listening to future events that
// might happen in the operation lifetime.
type operation struct {
parent *operation
eventRegister
dataBroadcaster
disabled bool
mu sync.RWMutex
}
func (o *operation) Parent() Operation {
return o.parent
}
// This is the one true Operation implementation!
func (o *operation) unwrap() *operation { return o }
// NewRootOperation creates and returns a new root operation, with no parent
// operation. Root operations are meant to be the top-level operation of an
// operation stack, therefore receiving all the operation events. It allows to
// prepare a new set of event listeners, to then atomically swap it with the
// current one.
func NewRootOperation() Operation {
return &operation{parent: nil}
}
// NewOperation creates and returns a new operation. It must be started by calling
// StartOperation, and finished by calling Finish. The returned operation should
// be used in wrapper types to provide statically typed start and finish
// functions. The following example shows how to wrap an operation so that its
// functions are statically typed (instead of dyngo's interface{} values):
//
// package mypackage
// import "dyngo"
// type (
// MyOperation struct {
// dyngo.Operation
// }
// MyOperationArgs { /* ... */ }
// MyOperationRes { /* ... */ }
// )
// func StartOperation(args MyOperationArgs, parent dyngo.Operation) MyOperation {
// op := MyOperation{Operation: dyngo.NewOperation(parent)}
// dyngo.StartOperation(op, args)
// return op
// }
// func (op MyOperation) Finish(res MyOperationRes) {
// dyngo.FinishOperation(op, res)
// }
func NewOperation(parent Operation) Operation {
if parent == nil {
if ptr := rootOperation.Load(); ptr != nil {
parent = *ptr
}
}
var parentOp *operation
if parent != nil {
parentOp = parent.unwrap()
}
return &operation{parent: parentOp}
}
// StartOperation starts a new operation along with its arguments and emits a
// start event with the operation arguments.
func StartOperation[O Operation, E ArgOf[O]](op O, args E) {
// Bubble-up the start event starting from the parent operation as you can't
// listen for your own start event
for current := op.unwrap().parent; current != nil; current = current.parent {
emitEvent(¤t.eventRegister, op, args)
}
}
// FinishOperation finishes the operation along with its results and emits a
// finish event with the operation results.
// The operation is then disabled and its event listeners removed.
func FinishOperation[O Operation, E ResultOf[O]](op O, results E) {
o := op.unwrap()
defer o.disable() // This will need the RLock below to be released...
o.mu.RLock()
defer o.mu.RUnlock() // Deferred and stacked on top of the previously deferred call to o.disable()
if o.disabled {
return
}
for current := o; current != nil; current = current.parent {
emitEvent(¤t.eventRegister, op, results)
}
}
// Disable the operation and remove all its event listeners.
func (o *operation) disable() {
o.mu.Lock()
defer o.mu.Unlock()
if o.disabled {
return
}
o.disabled = true
o.eventRegister.clear()
}
// On registers and event listener that will be called when the operation
// begins.
func On[O Operation, E ArgOf[O]](op Operation, l EventListener[O, E]) {
o := op.unwrap()
o.mu.RLock()
defer o.mu.RUnlock()
if o.disabled {
return
}
addEventListener(&o.eventRegister, l)
}
// OnFinish registers an event listener that will be called when the operation
// finishes.
func OnFinish[O Operation, E ResultOf[O]](op Operation, l EventListener[O, E]) {
o := op.unwrap()
o.mu.RLock()
defer o.mu.RUnlock()
if o.disabled {
return
}
addEventListener(&o.eventRegister, l)
}
func OnData[T any](op Operation, l DataListener[T]) {
o := op.unwrap()
o.mu.RLock()
defer o.mu.RUnlock()
if o.disabled {
return
}
addDataListener(&o.dataBroadcaster, l)
}
// EmitData sends a data event up the operation stack. Listeners will be matched
// based on `T`. Callers may need to manually specify T when the static type of
// the value is more specific that the intended data event type.
func EmitData[T any](op Operation, data T) {
o := op.unwrap()
o.mu.RLock()
defer o.mu.RUnlock()
if o.disabled {
return
}
// Bubble up the data to the stack of operations. Contrary to events,
// we also send the data to ourselves since SDK operations are leaf operations
// that both emit and listen for data (errors).
for current := o; current != nil; current = current.parent {
emitData(¤t.dataBroadcaster, data)
}
}
type (
// eventRegister implements a thread-safe list of event listeners.
eventRegister struct {
listeners eventListenerMap
mu sync.RWMutex
}
// eventListenerMap is the map of event listeners. The list of listeners are
// indexed by the operation argument or result type the event listener
// expects.
eventListenerMap map[any][]any
typeID[T any] struct{}
dataBroadcaster struct {
listeners dataListenerMap
mu sync.RWMutex
}
DataListener[T any] func(T)
dataListenerMap map[any][]any
)
func addDataListener[T any](b *dataBroadcaster, l DataListener[T]) {
b.mu.Lock()
defer b.mu.Unlock()
if b.listeners == nil {
b.listeners = make(dataListenerMap)
}
key := typeID[DataListener[T]]{}
b.listeners[key] = append(b.listeners[key], l)
}
func (b *dataBroadcaster) clear() {
b.mu.Lock()
defer b.mu.Unlock()
b.listeners = nil
}
func emitData[T any](b *dataBroadcaster, v T) {
defer func() {
if r := recover(); r != nil {
log.Error("appsec: recovered from an unexpected panic from an event listener: %+v", r)
}
}()
b.mu.RLock()
defer b.mu.RUnlock()
for _, listener := range b.listeners[typeID[DataListener[T]]{}] {
listener.(DataListener[T])(v)
}
}
func addEventListener[O Operation, T any](r *eventRegister, l EventListener[O, T]) {
r.mu.Lock()
defer r.mu.Unlock()
if r.listeners == nil {
r.listeners = make(eventListenerMap, 2)
}
key := typeID[EventListener[O, T]]{}
r.listeners[key] = append(r.listeners[key], l)
}
func (r *eventRegister) clear() {
r.mu.Lock()
defer r.mu.Unlock()
r.listeners = nil
}
func emitEvent[O Operation, T any](r *eventRegister, op O, v T) {
defer func() {
if r := recover(); r != nil {
log.Error("appsec: recovered from an unexpected panic from an event listener: %+v", r)
}
}()
r.mu.RLock()
defer r.mu.RUnlock()
for _, listener := range r.listeners[typeID[EventListener[O, T]]{}] {
listener.(EventListener[O, T])(op, v)
}
}