-
Notifications
You must be signed in to change notification settings - Fork 0
/
buffered.go
167 lines (141 loc) · 4.46 KB
/
buffered.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package broadcaster
import (
"errors"
"io"
"sync"
)
// Buffered keeps track of one or more observers watching the progress
// of an operation. For example, if multiple clients are trying to pull an
// image, they share a Buffered struct for the download operation.
type Buffered struct {
sync.Mutex
// c is a channel that observers block on, waiting for the operation
// to finish.
c chan struct{}
// cond is a condition variable used to wake up observers when there's
// new data available.
cond *sync.Cond
// history is a buffer of the progress output so far, so a new observer
// can catch up. The history is stored as a slice of separate byte
// slices, so that if the writer is a WriteFlusher, the flushes will
// happen in the right places.
history [][]byte
// wg is a WaitGroup used to wait for all writes to finish on Close
wg sync.WaitGroup
// result is the argument passed to the first call of Close, and
// returned to callers of Wait
result error
}
// NewBuffered returns an initialized Buffered structure.
func NewBuffered() *Buffered {
b := &Buffered{
c: make(chan struct{}),
}
b.cond = sync.NewCond(b)
return b
}
// closed returns true if and only if the broadcaster has been closed
func (broadcaster *Buffered) closed() bool {
select {
case <-broadcaster.c:
return true
default:
return false
}
}
// receiveWrites runs as a goroutine so that writes don't block the Write
// function. It writes the new data in broadcaster.history each time there's
// activity on the broadcaster.cond condition variable.
func (broadcaster *Buffered) receiveWrites(observer io.Writer) {
n := 0
broadcaster.Lock()
// The condition variable wait is at the end of this loop, so that the
// first iteration will write the history so far.
for {
newData := broadcaster.history[n:]
// Make a copy of newData so we can release the lock
sendData := make([][]byte, len(newData), len(newData))
copy(sendData, newData)
broadcaster.Unlock()
for len(sendData) > 0 {
_, err := observer.Write(sendData[0])
if err != nil {
broadcaster.wg.Done()
return
}
n++
sendData = sendData[1:]
}
broadcaster.Lock()
// If we are behind, we need to catch up instead of waiting
// or handling a closure.
if len(broadcaster.history) != n {
continue
}
// detect closure of the broadcast writer
if broadcaster.closed() {
broadcaster.Unlock()
broadcaster.wg.Done()
return
}
broadcaster.cond.Wait()
// Mutex is still locked as the loop continues
}
}
// Write adds data to the history buffer, and also writes it to all current
// observers.
func (broadcaster *Buffered) Write(p []byte) (n int, err error) {
broadcaster.Lock()
defer broadcaster.Unlock()
// Is the broadcaster closed? If so, the write should fail.
if broadcaster.closed() {
return 0, errors.New("attempted write to a closed broadcaster.Buffered")
}
// Add message in p to the history slice
newEntry := make([]byte, len(p), len(p))
copy(newEntry, p)
broadcaster.history = append(broadcaster.history, newEntry)
broadcaster.cond.Broadcast()
return len(p), nil
}
// Add adds an observer to the broadcaster. The new observer receives the
// data from the history buffer, and also all subsequent data.
func (broadcaster *Buffered) Add(w io.Writer) error {
// The lock is acquired here so that Add can't race with Close
broadcaster.Lock()
defer broadcaster.Unlock()
if broadcaster.closed() {
return errors.New("attempted to add observer to a closed broadcaster.Buffered")
}
broadcaster.wg.Add(1)
go broadcaster.receiveWrites(w)
return nil
}
// CloseWithError signals to all observers that the operation has finished. Its
// argument is a result that should be returned to waiters blocking on Wait.
func (broadcaster *Buffered) CloseWithError(result error) {
broadcaster.Lock()
if broadcaster.closed() {
broadcaster.Unlock()
return
}
broadcaster.result = result
close(broadcaster.c)
broadcaster.cond.Broadcast()
broadcaster.Unlock()
// Don't return until all writers have caught up.
broadcaster.wg.Wait()
}
// Close signals to all observers that the operation has finished. It causes
// all calls to Wait to return nil.
func (broadcaster *Buffered) Close() {
broadcaster.CloseWithError(nil)
}
// Wait blocks until the operation is marked as completed by the Close method,
// and all writer goroutines have completed. It returns the argument that was
// passed to Close.
func (broadcaster *Buffered) Wait() error {
<-broadcaster.c
broadcaster.wg.Wait()
return broadcaster.result
}