/
completion.go
193 lines (170 loc) · 6.14 KB
/
completion.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Copyright 2018 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package completion
import (
"context"
"errors"
"github.com/cilium/cilium/pkg/lock"
)
// WaitGroup waits for a collection of Completions to complete.
type WaitGroup struct {
// ctx is the context of all the Completions in the wait group.
ctx context.Context
// cancel is the function to call if any pending operation fails
cancel context.CancelFunc
// counterLocker locks all calls to AddCompletion and Wait, which must not
// be called concurrently.
counterLocker lock.Mutex
// pendingCompletions is the list of Completions returned by
// AddCompletion() which have not yet been completed.
pendingCompletions []*Completion
}
// NewWaitGroup returns a new WaitGroup using the given context.
func NewWaitGroup(ctx context.Context) *WaitGroup {
ctx2, cancel := context.WithCancel(ctx)
return &WaitGroup{ctx: ctx2, cancel: cancel}
}
// Context returns the context of all the Completions in the wait group.
func (wg *WaitGroup) Context() context.Context {
return wg.ctx
}
// AddCompletionWithCallback creates a new completion, adds it to the wait
// group, and returns it. The callback will be called upon completion.
// Completion can complete in a failure (err != nil)
func (wg *WaitGroup) AddCompletionWithCallback(callback func(err error)) *Completion {
wg.counterLocker.Lock()
defer wg.counterLocker.Unlock()
c := NewCompletion(wg.cancel, callback)
wg.pendingCompletions = append(wg.pendingCompletions, c)
return c
}
// AddCompletion creates a new completion, adds it into the wait group, and
// returns it.
func (wg *WaitGroup) AddCompletion() *Completion {
return wg.AddCompletionWithCallback(nil)
}
// updateError updates the error value to be returned from Wait()
// so that we return the most severe or consequential error
// encountered. The order of importance of error values is (from
// highest to lowest):
// 1. Non-context errors
// 2. context.Canceled
// 3. context.DeadlineExceeded
// 4. nil
func updateError(old, new error) error {
if new == nil {
return old
}
// 'old' error is overridden by a non-nil 'new' error value if
// 1. 'old' is nil, or
// 2. 'old' is a timeout, or
// 3. 'old' is a cancel and the 'new' error value is not a timeout
if old == nil || errors.Is(old, context.DeadlineExceeded) || (errors.Is(old, context.Canceled) && !errors.Is(new, context.DeadlineExceeded)) {
return new
}
return old
}
// Wait blocks until all completions added by calling AddCompletion are
// completed, or the context is canceled, whichever happens first.
// Returns the context's error if it is cancelled, nil otherwise.
// No callbacks of the completions in this wait group will be called after
// this returns.
// Returns the error value of one of the completions, if available, or the
// error value of the Context otherwise.
func (wg *WaitGroup) Wait() error {
wg.counterLocker.Lock()
defer wg.counterLocker.Unlock()
var err error
Loop:
for i, comp := range wg.pendingCompletions {
select {
case <-comp.Completed():
err = updateError(err, comp.Err()) // Keep the most severe error value we encounter
continue Loop
case <-wg.ctx.Done():
// Complete the remaining completions (if any) to make sure their completed
// channels are closed.
for _, comp := range wg.pendingCompletions[i:] {
// 'comp' may have already completed on a different error
compErr := comp.Complete(wg.ctx.Err())
err = updateError(err, compErr) // Keep the most severe error value we encounter
}
break Loop
}
}
wg.pendingCompletions = nil
return err
}
// Completion provides the Complete callback to be called when an asynchronous
// computation is completed.
type Completion struct {
// cancel is used to cancel the WaitGroup the completion belongs in case of an error
cancel context.CancelFunc
// lock is used to check and close the completed channel atomically.
lock lock.Mutex
// completed is the channel to be closed when Complete is called the first
// time.
completed chan struct{}
// callback is called when Complete is called the first time.
callback func(err error)
// err is the error the completion completed with
err error
}
// Err returns a non-nil error if the completion ended in error
func (c *Completion) Err() error {
c.lock.Lock()
defer c.lock.Unlock()
return c.err
}
// Complete notifies of the completion of the asynchronous computation.
// Idempotent.
// If the operation completed successfully 'err' is passed as nil.
// Returns the error state the completion completed with, which is
// gnerally different from 'err' if already completed.
func (c *Completion) Complete(err error) error {
c.lock.Lock()
select {
case <-c.completed:
err = c.err // return the error 'c' completed with
default:
c.err = err
if c.callback != nil {
// We must call the callbacks synchronously to guarantee
// that they are actually called before Wait() returns.
c.callback(err)
}
// Cancel the WaitGroup on failure
if err != nil && c.cancel != nil {
c.cancel()
}
close(c.completed)
}
c.lock.Unlock()
return err
}
// Completed returns a channel that's closed when the completion is completed,
// i.e. when Complete is called the first time, or when the call to the parent
// WaitGroup's Wait terminated because the context was canceled.
func (c *Completion) Completed() <-chan struct{} {
return c.completed
}
// NewCompletion creates a Completion which calls a function upon Complete().
// 'cancel' is called if the associated operation fails for any reason.
func NewCompletion(cancel context.CancelFunc, callback func(err error)) *Completion {
return &Completion{
cancel: cancel,
completed: make(chan struct{}),
callback: callback,
}
}