-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
pool_manager.go
125 lines (102 loc) · 2.77 KB
/
pool_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2020 Datadog, Inc.
package packets
import (
"sync"
"unsafe"
"go.uber.org/atomic"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
type genericPool interface {
Get() interface{}
Put(x interface{})
}
// PoolManager helps manage sync pools so multiple references to the same pool objects may be held.
type PoolManager struct {
pool genericPool
refs sync.Map
passthru *atomic.Bool
sync.RWMutex
}
// NewPoolManager creates a PoolManager to manage the underlying genericPool.
func NewPoolManager(gp genericPool) *PoolManager {
return &PoolManager{
pool: gp,
passthru: atomic.NewBool(true),
}
}
// Get gets an object from the pool.
func (p *PoolManager) Get() interface{} {
return p.pool.Get()
}
// Put declares intent to return an object to the pool. In passthru mode the object is immediately
// returned to the pool, otherwise we wait until the object is put by all (only 2 currently supported)
// reference holders before actually returning it to the object pool.
func (p *PoolManager) Put(x interface{}) {
if p.IsPassthru() {
p.pool.Put(x)
return
}
var ref unsafe.Pointer
switch t := x.(type) {
case *[]byte:
ref = unsafe.Pointer(t)
case *Packet:
ref = unsafe.Pointer(t)
default:
log.Debugf("Unsupported type by pool manager")
return
}
// This lock is not to guard the map, it's here to
// avoid adding items to the map while flushing.
p.RLock()
// TODO: use LoadAndDelete when go 1.15 is introduced
_, loaded := p.refs.Load(ref)
if loaded {
// reference exists, put back.
p.refs.Delete(ref)
p.pool.Put(x)
} else {
// reference does not exist, account.
p.refs.Store(ref, x)
}
// relatively hot path so not deferred
p.RUnlock()
}
// IsPassthru returns a boolean telling us if the PoolManager is in passthru mode or not.
func (p *PoolManager) IsPassthru() bool {
return p.passthru.Load()
}
// SetPassthru sets the passthru mode to the specified value. It will flush the sccounting before
// enabling passthru mode.
func (p *PoolManager) SetPassthru(b bool) {
if b {
p.passthru.Store(true)
p.Flush()
} else {
p.passthru.Store(false)
}
}
// Count returns the number of elements accounted by the PoolManager.
func (p *PoolManager) Count() int {
p.RLock()
defer p.RUnlock()
size := 0
p.refs.Range(func(k, v interface{}) bool {
size++
return true
})
return size
}
// Flush flushes all objects back to the object pool, and stops tracking any pending objects.
func (p *PoolManager) Flush() {
p.Lock()
defer p.Unlock()
p.refs.Range(func(k, v interface{}) bool {
p.pool.Put(v)
p.refs.Delete(k)
return true
})
}