Skip to content

Commit

Permalink
internal/appsec/dyngo: atomic instrumentation swapping (#1873)
Browse files Browse the repository at this point in the history
Co-authored-by: François Mazeau <francois.mazeau@datadoghq.com>
  • Loading branch information
Julio-Guerra and Hellzy committed Apr 17, 2023
1 parent b438db9 commit 8d92f3e
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 261 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/atomic v1.10.0
)

require (
Expand All @@ -225,7 +226,6 @@ require (
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/outcaste-io/ristretto v0.2.1 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
Expand Down
33 changes: 22 additions & 11 deletions internal/appsec/appsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/remoteconfig"

waf "github.com/DataDog/go-libddwaf"
)

// Enabled returns true when AppSec is up and running. Meaning that the appsec build tag is enabled, the env var
Expand Down Expand Up @@ -92,11 +94,11 @@ func setActiveAppSec(a *appsec) {
}

type appsec struct {
cfg *Config
unregisterWAF dyngo.UnregisterFunc
limiter *TokenTicker
rc *remoteconfig.Client
started bool
cfg *Config
limiter *TokenTicker
rc *remoteconfig.Client
wafHandle *waf.Handle
started bool
}

func newAppSec(cfg *Config) *appsec {
Expand All @@ -119,7 +121,7 @@ func (a *appsec) start() error {
a.limiter = NewTokenTicker(int64(a.cfg.traceRateLimit), int64(a.cfg.traceRateLimit))
a.limiter.Start()
// Register the WAF operation event listener
if err := a.swapWAF(a.cfg.rulesManager.raw()); err != nil {
if err := a.swapWAF(a.cfg.rulesManager.latest); err != nil {
return err
}
a.enableRCBlocking()
Expand All @@ -129,10 +131,19 @@ func (a *appsec) start() error {

// Stop AppSec by unregistering the security protections.
func (a *appsec) stop() {
if a.started {
a.started = false
a.unregisterWAF()
a.limiter.Stop()
a.disableRCBlocking()
if !a.started {
return
}
a.started = false
// Disable RC blocking first so that the following is guaranteed not to be concurrent anymore.
a.disableRCBlocking()

// Disable the currently applied instrumentation
dyngo.SwapRootOperation(nil)
if a.wafHandle != nil {
a.wafHandle.Close()
}
// TODO: block until no more requests are using dyngo operations

a.limiter.Stop()
}
7 changes: 6 additions & 1 deletion internal/appsec/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,13 @@ func newConfig() (*Config, error) {
return nil, err
}

r, err := newRulesManager(rules)
if err != nil {
return nil, err
}

return &Config{
rulesManager: newRulesManager(rules),
rulesManager: r,
wafTimeout: readWAFTimeoutConfig(),
traceRateLimit: readRateLimitConfig(),
obfuscator: readObfuscatorConfig(),
Expand Down
7 changes: 5 additions & 2 deletions internal/appsec/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
)

func TestConfig(t *testing.T) {
r, err := newRulesManager(nil)
require.NoError(t, err)
expectedDefaultConfig := &Config{
rulesManager: newRulesManager(nil),
rulesManager: r,
wafTimeout: defaultWAFTimeout,
traceRateLimit: defaultTraceRate,
obfuscator: ObfuscatorConfig{
Expand Down Expand Up @@ -124,7 +126,8 @@ func TestConfig(t *testing.T) {
os.Remove(file.Name())
}()
expCfg := *expectedDefaultConfig
expCfg.rulesManager = newRulesManager([]byte(staticRecommendedRules))
expCfg.rulesManager, err = newRulesManager([]byte(staticRecommendedRules))
require.NoError(t, err)
_, err = file.WriteString(staticRecommendedRules)
require.NoError(t, err)
os.Setenv(rulesEnvVar, file.Name())
Expand Down
128 changes: 40 additions & 88 deletions internal/appsec/dyngo/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ package dyngo

import (
"reflect"
"sort"
"sync"
"sync/atomic"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"

"go.uber.org/atomic"
)

// Operation interface type allowing to register event listeners to the
Expand All @@ -48,11 +48,10 @@ type Operation interface {
// that no other package can define it.
emitEvent(argsType reflect.Type, op Operation, v interface{})

// register the given event listeners and return the unregistration
// function allowing to remove the event listener from the operation.
// register is a private method implemented by the operation struct type so
// add the given event listeners to the operation.
// add is a private method implemented by the operation struct type so
// that no other package can define it.
register(...EventListener) UnregisterFunc
add(...EventListener)

// finish the operation. This method allows to pass the operation value to
// use to emit the finish event.
Expand All @@ -72,15 +71,18 @@ type EventListener interface {
Call(op Operation, v interface{})
}

// UnregisterFunc is a function allowing to unregister from an operation the
// previously registered event listeners.
type UnregisterFunc func()

var rootOperation = newOperation(nil)

// Register global operation event listeners to listen to.
func Register(listeners ...EventListener) UnregisterFunc {
return rootOperation.register(listeners...)
// Atomic *Operation so we can atomically read or swap it.
var rootOperation atomic.Pointer[Operation]

// SwapRootOperation allows to atomically swap the current root operation with
// the given new one. Concurrent uses of the old root operation on already
// existing and running operation are still valid.
func SwapRootOperation(new Operation) {
rootOperation.Swap(&new)
// Note: calling FinishOperation(old) could result into mem leaks because
// some finish event listeners, possibly releasing memory and resources,
// wouldn't be called anymore (because finish() disables the operation and
// removes the event listeners).
}

// operation structure allowing to subscribe to operation events and to
Expand All @@ -95,7 +97,16 @@ type operation struct {
mu sync.RWMutex
}

// NewOperation creates and returns a new operationIt must be started by calling
// NewRootOperation creates and returns a new root operation, with no parent
// operation. Root operations are meant to be the top-level operation of an
// operation stack, therefore receiving all the operation events. It allows to
// prepare a new set of event listeners, to then atomically swap it with the
// current one.
func NewRootOperation() Operation {
return newOperation(nil)
}

// NewOperation creates and returns a new operation. It must be started by calling
// StartOperation, and finished by calling FinishOperation. The returned
// operation should be used in wrapper types to provide statically typed start
// and finish functions. The following example shows how to wrap an operation
Expand All @@ -121,7 +132,9 @@ type operation struct {
// }
func NewOperation(parent Operation) Operation {
if parent == nil {
parent = rootOperation
if root := rootOperation.Load(); root != nil {
parent = *root
}
}
return newOperation(parent)
}
Expand Down Expand Up @@ -178,36 +191,19 @@ func (o *operation) disable() {
o.eventRegister.clear()
}

// Register allows to register the given event listeners to the operation. An
// unregistration function is returned allowing to unregister the event
// listeners from the operation.
func (o *operation) register(l ...EventListener) UnregisterFunc {
// eventRegisterIndex allows to lookup for the event listener in the event register.
type eventRegisterIndex struct {
key reflect.Type
id eventListenerID
}
// Add the given event listeners to the operation.
func (o *operation) add(l ...EventListener) {
o.mu.RLock()
defer o.mu.RUnlock()
if o.disabled {
return func() {}
return
}
indices := make([]eventRegisterIndex, len(l))
for i, l := range l {
for _, l := range l {
if l == nil {
continue
}
key := l.ListenedType()
id := o.eventRegister.add(key, l)
indices[i] = eventRegisterIndex{
key: key,
id: id,
}
}
return func() {
for _, ix := range indices {
o.eventRegister.remove(ix.key, ix.id)
}
o.eventRegister.add(key, l)
}
}

Expand Down Expand Up @@ -237,60 +233,16 @@ type (
// eventListenerMap is the map of event listeners. The list of listeners are
// indexed by the operation argument or result type the event listener
// expects.
eventListenerMap map[reflect.Type][]eventListenerMapEntry
eventListenerMapEntry struct {
id eventListenerID
listener EventListener
}

// eventListenerID is the unique ID of an event when registering it. It
// allows to find it back and remove it from the list of event listeners
// when unregistering it.
eventListenerID uint32
eventListenerMap map[reflect.Type][]EventListener
)

// lastID is the last event listener ID that was given to the latest event
// listener.
var lastID eventListenerID

// nextID atomically increments lastID and returns the new event listener ID to
// use.
func nextID() eventListenerID {
return eventListenerID(atomic.AddUint32((*uint32)(&lastID), 1))
}

func (r *eventRegister) add(key reflect.Type, l EventListener) eventListenerID {
func (r *eventRegister) add(key reflect.Type, l EventListener) {
r.mu.Lock()
defer r.mu.Unlock()
if r.listeners == nil {
r.listeners = make(eventListenerMap)
}
// id is computed when the lock is exclusively taken so that we know
// listeners are added in incremental id order.
// This allows to use the optimized sort.Search() function to remove the
// entry.
id := nextID()
r.listeners[key] = append(r.listeners[key], eventListenerMapEntry{
id: id,
listener: l,
})
return id
}

func (r *eventRegister) remove(key reflect.Type, id eventListenerID) {
r.mu.Lock()
defer r.mu.Unlock()
if r.listeners == nil {
return
}
listeners := r.listeners[key]
length := len(listeners)
i := sort.Search(length, func(i int) bool {
return listeners[i].id >= id
})
if i < length && listeners[i].id == id {
r.listeners[key] = append(listeners[:i], listeners[i+1:]...)
}
r.listeners[key] = append(r.listeners[key], l)
}

func (r *eventRegister) clear() {
Expand All @@ -307,7 +259,7 @@ func (r *eventRegister) emitEvent(key reflect.Type, op Operation, v interface{})
}()
r.mu.RLock()
defer r.mu.RUnlock()
for _, e := range r.listeners[key] {
e.listener.Call(op, v)
for _, listener := range r.listeners[key] {
listener.Call(op, v)
}
}
Loading

0 comments on commit 8d92f3e

Please sign in to comment.