@@ -10,12 +10,6 @@ import (
10
10
"unsafe"
11
11
)
12
12
13
- const (
14
- cacheLineSize = 128
15
- poolLocalSize = 2 * cacheLineSize
16
- poolLocalCap = poolLocalSize / unsafe .Sizeof (* (* interface {})(nil )) - 1
17
- )
18
-
19
13
// A Pool is a set of temporary objects that may be individually saved and
20
14
// retrieved.
21
15
//
@@ -46,36 +40,21 @@ const (
46
40
// free list.
47
41
//
48
42
type Pool struct {
49
- // The following fields are known to runtime.
50
- next * Pool // for use by runtime
51
- local * poolLocal // local fixed-size per-P pool, actually an array
52
- localSize uintptr // size of the local array
53
- globalOffset uintptr // offset of global
54
- // The rest is not known to runtime.
43
+ local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
44
+ localSize uintptr // size of the local array
55
45
56
46
// New optionally specifies a function to generate
57
47
// a value when Get would otherwise return nil.
58
48
// It may not be changed concurrently with calls to Get.
59
49
New func () interface {}
60
-
61
- pad [cacheLineSize ]byte
62
- // Read-mostly date above this point, mutable data follows.
63
- mu Mutex
64
- global []interface {} // global fallback pool
65
50
}
66
51
67
52
// Local per-P Pool appendix.
68
53
type poolLocal struct {
69
- tail int
70
- unused int
71
- buf [poolLocalCap ]interface {}
72
- }
73
-
74
- func init () {
75
- var v poolLocal
76
- if unsafe .Sizeof (v ) != poolLocalSize {
77
- panic ("sync: incorrect pool size" )
78
- }
54
+ private interface {} // Can be used only by the respective P.
55
+ shared []interface {} // Can be used by any P.
56
+ Mutex // Protects shared.
57
+ pad [128 ]byte // Prevents false sharing.
79
58
}
80
59
81
60
// Put adds x to the pool.
@@ -90,14 +69,17 @@ func (p *Pool) Put(x interface{}) {
90
69
return
91
70
}
92
71
l := p .pin ()
93
- t := l .tail
94
- if t < int (poolLocalCap ) {
95
- l .buf [t ] = x
96
- l .tail = t + 1
97
- runtime_procUnpin ()
72
+ if l .private == nil {
73
+ l .private = x
74
+ x = nil
75
+ }
76
+ runtime_procUnpin ()
77
+ if x == nil {
98
78
return
99
79
}
100
- p .putSlow (l , x )
80
+ l .Lock ()
81
+ l .shared = append (l .shared , x )
82
+ l .Unlock ()
101
83
}
102
84
103
85
// Get selects an arbitrary item from the Pool, removes it from the
@@ -116,69 +98,49 @@ func (p *Pool) Get() interface{} {
116
98
return nil
117
99
}
118
100
l := p .pin ()
119
- t := l .tail
120
- if t > 0 {
121
- t -= 1
122
- x := l .buf [t ]
123
- l .tail = t
124
- runtime_procUnpin ()
101
+ x := l .private
102
+ l .private = nil
103
+ runtime_procUnpin ()
104
+ if x != nil {
125
105
return x
126
106
}
127
- return p .getSlow ()
128
- }
129
-
130
- func (p * Pool ) putSlow (l * poolLocal , x interface {}) {
131
- // Grab half of items from local pool and put to global pool.
132
- // Can not lock the mutex while pinned.
133
- const N = int (poolLocalCap / 2 + 1 )
134
- var buf [N ]interface {}
135
- buf [0 ] = x
136
- for i := 1 ; i < N ; i ++ {
137
- l .tail --
138
- buf [i ] = l .buf [l .tail ]
107
+ l .Lock ()
108
+ last := len (l .shared ) - 1
109
+ if last >= 0 {
110
+ x = l .shared [last ]
111
+ l .shared = l .shared [:last ]
139
112
}
140
- runtime_procUnpin ()
141
-
142
- p . mu . Lock ()
143
- p . global = append ( p . global , buf [:] ... )
144
- p . mu . Unlock ()
113
+ l . Unlock ()
114
+ if x != nil {
115
+ return x
116
+ }
117
+ return p . getSlow ()
145
118
}
146
119
147
120
func (p * Pool ) getSlow () (x interface {}) {
148
- // Grab a batch of items from global pool and put to local pool .
149
- // Can not lock the mutex while pinned.
150
- runtime_procUnpin ()
151
- p . mu . Lock ()
121
+ // See the comment in pin regarding ordering of the loads .
122
+ size := atomic . LoadUintptr ( & p . localSize ) // load-acquire
123
+ local := p . local // load-consume
124
+ // Try to steal one element from other procs.
152
125
pid := runtime_procPin ()
153
- s := p .localSize
154
- l := p .local
155
- if uintptr (pid ) < s {
156
- l = indexLocal (l , pid )
157
- // Get the item to return.
158
- last := len (p .global ) - 1
126
+ runtime_procUnpin ()
127
+ for i := 0 ; i < int (size ); i ++ {
128
+ l := indexLocal (local , (pid + i + 1 )% int (size ))
129
+ l .Lock ()
130
+ last := len (l .shared ) - 1
159
131
if last >= 0 {
160
- x = p .global [last ]
161
- p .global = p .global [:last ]
162
- }
163
- // Try to refill local pool, we may have been rescheduled to another P.
164
- if last > 0 && l .tail == 0 {
165
- n := int (poolLocalCap / 2 )
166
- gl := len (p .global )
167
- if n > gl {
168
- n = gl
169
- }
170
- copy (l .buf [:], p .global [gl - n :])
171
- p .global = p .global [:gl - n ]
172
- l .tail = n
132
+ x = l .shared [last ]
133
+ l .shared = l .shared [:last ]
134
+ l .Unlock ()
135
+ break
173
136
}
137
+ l .Unlock ()
174
138
}
175
- runtime_procUnpin ()
176
- p .mu .Unlock ()
177
139
178
140
if x == nil && p .New != nil {
179
141
x = p .New ()
180
142
}
181
- return
143
+ return x
182
144
}
183
145
184
146
// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P.
@@ -199,32 +161,63 @@ func (p *Pool) pin() *poolLocal {
199
161
200
162
func (p * Pool ) pinSlow () * poolLocal {
201
163
// Retry under the mutex.
164
+ // Can not lock the mutex while pinned.
202
165
runtime_procUnpin ()
203
- p . mu .Lock ()
204
- defer p . mu .Unlock ()
166
+ allPoolsMu .Lock ()
167
+ defer allPoolsMu .Unlock ()
205
168
pid := runtime_procPin ()
169
+ // poolCleanup won't be called while we are pinned.
206
170
s := p .localSize
207
171
l := p .local
208
172
if uintptr (pid ) < s {
209
173
return indexLocal (l , pid )
210
174
}
211
175
if p .local == nil {
212
- p .globalOffset = unsafe .Offsetof (p .global )
213
- runtime_registerPool (p )
176
+ allPools = append (allPools , p )
214
177
}
215
178
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
216
179
size := runtime .GOMAXPROCS (0 )
217
180
local := make ([]poolLocal , size )
218
- atomic .StorePointer ((* unsafe .Pointer )(unsafe . Pointer ( & p .local ) ), unsafe .Pointer (& local [0 ])) // store-release
219
- atomic .StoreUintptr (& p .localSize , uintptr (size )) // store-release
181
+ atomic .StorePointer ((* unsafe .Pointer )(& p .local ), unsafe .Pointer (& local [0 ])) // store-release
182
+ atomic .StoreUintptr (& p .localSize , uintptr (size )) // store-release
220
183
return & local [pid ]
221
184
}
222
185
223
- func indexLocal (l * poolLocal , i int ) * poolLocal {
224
- return (* poolLocal )(unsafe .Pointer (uintptr (unsafe .Pointer (l )) + unsafe .Sizeof (* l )* uintptr (i ))) // uh...
186
+ func poolCleanup () {
187
+ // This function is called with the world stopped, at the beginning of a garbage collection.
188
+ // It must not allocate and probably should not call any runtime functions.
189
+ // Defensively zero out everything, 2 reasons:
190
+ // 1. To prevent false retention of whole Pools.
191
+ // 2. If GC happens while a goroutine works with l.shared in Put/Get,
192
+ // it will retain whole Pool. So next cycle memory consumption would be doubled.
193
+ for i , p := range allPools {
194
+ allPools [i ] = nil
195
+ for i := 0 ; i < int (p .localSize ); i ++ {
196
+ l := indexLocal (p .local , i )
197
+ l .private = nil
198
+ for j := range l .shared {
199
+ l .shared [j ] = nil
200
+ }
201
+ l .shared = nil
202
+ }
203
+ }
204
+ allPools = []* Pool {}
205
+ }
206
+
207
+ var (
208
+ allPoolsMu Mutex
209
+ allPools []* Pool
210
+ )
211
+
212
+ func init () {
213
+ runtime_registerPoolCleanup (poolCleanup )
214
+ }
215
+
216
+ func indexLocal (l unsafe.Pointer , i int ) * poolLocal {
217
+ return & (* [1000000 ]poolLocal )(l )[i ]
225
218
}
226
219
227
220
// Implemented in runtime.
228
- func runtime_registerPool ( * Pool )
221
+ func runtime_registerPoolCleanup ( cleanup func () )
229
222
func runtime_procPin () int
230
223
func runtime_procUnpin ()
0 commit comments