@@ -11,7 +11,6 @@ import (
11
11
"sync/atomic"
12
12
"syscall"
13
13
"testing"
14
- "time"
15
14
16
15
"github.com/cockroachdb/errors"
17
16
"github.com/stretchr/testify/require"
@@ -41,7 +40,7 @@ func TestOnDiskFull_FS(t *testing.T) {
41
40
for name , fn := range filesystemWriteOps {
42
41
t .Run (name , func (t * testing.T ) {
43
42
innerFS := & enospcMockFS {}
44
- innerFS .enospcs . Store (1 )
43
+ innerFS .addENOSPC (1 )
45
44
var callbackInvocations int
46
45
fs := OnDiskFull (innerFS , func () {
47
46
callbackInvocations ++
@@ -55,7 +54,7 @@ func TestOnDiskFull_FS(t *testing.T) {
55
54
require .Equal (t , 1 , callbackInvocations )
56
55
// The inner filesystem should be invoked twice because of the
57
56
// retry.
58
- require .Equal (t , uint32 ( 2 ) , innerFS .invocations . Load () )
57
+ require .Equal (t , 2 , innerFS .mu . invocations )
59
58
})
60
59
}
61
60
}
@@ -72,7 +71,7 @@ func TestOnDiskFull_File(t *testing.T) {
72
71
require .NoError (t , err )
73
72
74
73
// The next Write should ENOSPC.
75
- innerFS .enospcs . Store (1 )
74
+ innerFS .addENOSPC (1 )
76
75
77
76
// Call the Write method on the wrapped file. The first call should return
78
77
// ENOSPC, but also that six bytes were written. Our registered callback
@@ -84,7 +83,7 @@ func TestOnDiskFull_File(t *testing.T) {
84
83
require .Equal (t , 1 , callbackInvocations )
85
84
// The inner filesystem should be invoked 3 times. Once during Create
86
85
// and twice during Write.
87
- require .Equal (t , uint32 ( 3 ) , innerFS .invocations . Load () )
86
+ require .Equal (t , 3 , innerFS .mu . invocations )
88
87
})
89
88
t .Run ("Sync" , func (t * testing.T ) {
90
89
innerFS := & enospcMockFS {bytesWritten : 6 }
@@ -98,30 +97,29 @@ func TestOnDiskFull_File(t *testing.T) {
98
97
99
98
// The next Sync should ENOSPC. The callback should be invoked, but a
100
99
// Sync cannot be retried.
101
- innerFS .enospcs . Store (1 )
100
+ innerFS .addENOSPC (1 )
102
101
103
102
err = f .Sync ()
104
103
require .Error (t , err )
105
104
require .Equal (t , 1 , callbackInvocations )
106
105
// The inner filesystem should be invoked 2 times. Once during Create
107
106
// and once during Sync.
108
- require .Equal (t , uint32 ( 2 ) , innerFS .invocations . Load () )
107
+ require .Equal (t , 2 , innerFS .mu . invocations )
109
108
})
110
109
}
111
110
112
111
func TestOnDiskFull_Concurrent (t * testing.T ) {
113
- innerFS := & enospcMockFS {
114
- opDelay : 10 * time .Millisecond ,
115
- }
116
- innerFS .enospcs .Store (10 )
112
+ const concurrentWriters = 10
113
+ innerFS := & enospcMockFS {}
114
+ innerFS .mu .enospcs = concurrentWriters
117
115
var callbackInvocations atomic.Int32
118
116
fs := OnDiskFull (innerFS , func () {
119
117
callbackInvocations .Add (1 )
120
118
})
121
119
122
120
var wg sync.WaitGroup
123
- for i := 0 ; i < 10 ; i ++ {
124
- wg . Add ( 1 )
121
+ wg . Add ( concurrentWriters )
122
+ for range concurrentWriters {
125
123
go func () {
126
124
defer wg .Done ()
127
125
_ , err := fs .Create ("foo" , WriteCategoryUnspecified )
@@ -133,26 +131,51 @@ func TestOnDiskFull_Concurrent(t *testing.T) {
133
131
// Since all operations should start before the first one returns an
134
132
// ENOSPC, the callback should only be invoked once.
135
133
require .Equal (t , int32 (1 ), callbackInvocations .Load ())
136
- require .Equal (t , uint32 ( 20 ) , innerFS .invocations . Load () )
134
+ require .Equal (t , 2 * concurrentWriters , innerFS .mu . invocations )
137
135
}
138
136
139
137
type enospcMockFS struct {
140
138
FS
141
- opDelay time.Duration
142
139
bytesWritten int
143
- enospcs atomic.Int32
144
- invocations atomic.Uint32
140
+
141
+ mu struct {
142
+ sync.Mutex
143
+ cond sync.Cond
144
+ enospcs int
145
+ invocations int
146
+ }
145
147
}
146
148
147
- func (fs * enospcMockFS ) maybeENOSPC () error {
148
- fs .invocations .Add (1 )
149
- v := fs .enospcs .Add (- 1 )
149
+ func (fs * enospcMockFS ) addENOSPC (n int ) {
150
+ fs .mu .Lock ()
151
+ defer fs .mu .Unlock ()
152
+ fs .mu .enospcs += n
153
+ }
150
154
151
- // Sleep before returning so that tests may issue concurrent writes that
152
- // fall into the same write generation.
153
- time .Sleep (fs .opDelay )
155
+ func (fs * enospcMockFS ) maybeENOSPC () error {
156
+ fs .mu .Lock ()
157
+ defer fs .mu .Unlock ()
158
+ if fs .mu .cond .L == nil {
159
+ fs .mu .cond .L = & fs .mu .Mutex
160
+ }
154
161
155
- if v >= 0 {
162
+ fs .mu .invocations ++
163
+
164
+ // If there are any ENOSPCs left, inject one and return an error.
165
+ if fs .mu .enospcs > 0 {
166
+ fs .mu .enospcs --
167
+ // If there are remaining ENOSPCs, we wait for the other goroutines to
168
+ // reach this point. We wait on the condition variable until all ENOSPCs
169
+ // have been consumed. If we are the last goroutine to reach this point,
170
+ // we wake up any other goroutines waiting on the condition variable.
171
+ // This ensures that all the gorouintes part of the same 'write
172
+ // generation' are blocked together.
173
+ if fs .mu .enospcs == 0 {
174
+ fs .mu .cond .Broadcast ()
175
+ }
176
+ for fs .mu .enospcs > 0 {
177
+ fs .mu .cond .Wait ()
178
+ }
156
179
// Wrap the error to test error unwrapping.
157
180
err := & os.PathError {Op : "mock" , Path : "mock" , Err : syscall .ENOSPC }
158
181
return errors .Wrap (err , "uh oh" )
0 commit comments