/
memdpipelineclient.go
231 lines (184 loc) · 6.95 KB
/
memdpipelineclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package gocbcore
import (
"io"
"sync"
)
type memdPipelineClient struct {
parent *memdPipeline
address string
client *memdClient
consumer *memdOpConsumer
lock sync.Mutex
closedSig chan struct{}
}
func newMemdPipelineClient(parent *memdPipeline) *memdPipelineClient {
return &memdPipelineClient{
parent: parent,
address: parent.address,
closedSig: make(chan struct{}),
}
}
func (pipecli *memdPipelineClient) ReassignTo(parent *memdPipeline) {
pipecli.lock.Lock()
pipecli.parent = parent
oldConsumer := pipecli.consumer
pipecli.consumer = nil
pipecli.lock.Unlock()
if oldConsumer != nil {
oldConsumer.Close()
}
}
func (pipecli *memdPipelineClient) ioLoop(client *memdClient) {
pipecli.lock.Lock()
if pipecli.parent == nil {
logDebugf("Pipeline client ioLoop started with no parent pipeline")
pipecli.lock.Unlock()
err := client.Close()
if err != nil {
logErrorf("Failed to close client for shut down ioLoop (%s)", err)
}
return
}
pipecli.client = client
pipecli.lock.Unlock()
killSig := make(chan struct{})
// This goroutine is responsible for monitoring the client and handling
// the cleanup whenever it shuts down. All cases of the client being
// shut down flow through this goroutine, even cases where we may already
// be aware that the client is shutdown, outside this scope.
go func() {
logDebugf("Pipeline client `%s/%p` client watcher starting...", pipecli.address, pipecli)
<-client.CloseNotify()
logDebugf("Pipeline client `%s/%p` client died", pipecli.address, pipecli)
pipecli.lock.Lock()
pipecli.client = nil
activeConsumer := pipecli.consumer
pipecli.consumer = nil
pipecli.lock.Unlock()
logDebugf("Pipeline client `%s/%p` closing consumer %p", pipecli.address, pipecli, activeConsumer)
// If we have a consumer, we need to close it to signal the loop below that
// something has happened. If there is no consumer, we don't need to signal
// as the loop below will already be in the process of fetching a new one,
// where it will inevitably detect the problem.
if activeConsumer != nil {
activeConsumer.Close()
}
killSig <- struct{}{}
}()
logDebugf("Pipeline client `%s/%p` IO loop starting...", pipecli.address, pipecli)
var localConsumer *memdOpConsumer
for {
if localConsumer == nil {
logDebugf("Pipeline client `%s/%p` fetching new consumer", pipecli.address, pipecli)
pipecli.lock.Lock()
if pipecli.consumer != nil {
// If we still have an active consumer, lets close it to make room for the new one
pipecli.consumer.Close()
pipecli.consumer = nil
}
if pipecli.client == nil {
// The client has disconnected from the server, this only occurs AFTER the watcher
// goroutine running above has detected the client is closed and has cleaned it up.
pipecli.lock.Unlock()
break
}
if pipecli.parent == nil {
// This pipelineClient has been shut down
logDebugf("Pipeline client `%s/%p` found no parent pipeline", pipecli.address, pipecli)
pipecli.lock.Unlock()
// Close our client to force the watcher goroutine above to clean it up
err := client.Close()
if err != nil {
logErrorf("Pipeline client `%s/%p` failed to shut down client socket (%s)", pipecli.address, pipecli, err)
}
break
}
// Fetch a new consumer to use for this iteration
localConsumer = pipecli.parent.queue.Consumer()
pipecli.consumer = localConsumer
pipecli.lock.Unlock()
}
req := localConsumer.Pop()
if req == nil {
// Set the local consumer to null, this will force our normal logic to run
// which will clean up the original consumer and then attempt to acquire a
// new one if we are not being cleaned up. This is a minor code-optimization
// to avoid having to do a lock/unlock just to lock above anyways. It does
// have the downside of not being able to detect where we've looped around
// in error though.
localConsumer = nil
continue
}
err := client.SendRequest(req)
if err != nil {
logDebugf("Pipeline client `%s/%p` encountered a socket write error: %v", pipecli.address, pipecli, err)
if err != io.EOF {
// If we errored the write, and the client was not already closed,
// lets go ahead and close it. This will trigger the shutdown
// logic via the client watcher above. If the socket error was EOF
// we already did shut down, and the watcher should already be
// cleaning up.
err := client.Close()
if err != nil {
logErrorf("Pipeline client `%s/%p` failed to shut down errored client socket (%s)", pipecli.address, pipecli, err)
}
}
// We need to alert the caller that there was a network error
req.tryCallback(nil, ErrNetwork)
// Stop looping
break
}
}
logDebugf("Pipeline client `%s/%p` waiting for client shutdown", pipecli.address, pipecli)
// We must wait for the close wait goroutine to die as well before we can continue.
<-killSig
logDebugf("Pipeline client `%s/%p` received client shutdown notification", pipecli.address, pipecli)
}
func (pipecli *memdPipelineClient) Run() {
for {
logDebugf("Pipeline Client `%s/%p` preparing for new client loop", pipecli.address, pipecli)
pipecli.lock.Lock()
pipeline := pipecli.parent
pipecli.lock.Unlock()
if pipeline == nil {
// If our pipeline is nil, it indicates that we need to shut down.
logDebugf("Pipeline Client `%s/%p` is shutting down", pipecli.address, pipecli)
break
}
logDebugf("Pipeline Client `%s/%p` retrieving new client connection for parent %p", pipecli.address, pipecli, pipeline)
client, err := pipeline.getClientFn()
if err != nil {
continue
}
// Runs until the connection has died (for whatever reason)
logDebugf("Pipeline Client `%s/%p` starting new client loop for %p", pipecli.address, pipecli, client)
pipecli.ioLoop(client)
}
// Lets notify anyone who is watching that we are now shut down
close(pipecli.closedSig)
}
// Close will close this pipeline client. Note that this method will not wait for
// everything to be cleaned up before returning.
func (pipecli *memdPipelineClient) Close() error {
logDebugf("Pipeline Client `%s/%p` received close request", pipecli.address, pipecli)
// To shut down the client, we remove our reference to the parent. This
// causes our ioLoop see that we are being shut down and perform cleanup
// before exiting.
pipecli.lock.Lock()
pipecli.parent = nil
activeConsumer := pipecli.consumer
pipecli.consumer = nil
pipecli.lock.Unlock()
// If we have an active consumer, we need to close it to cause the running
// ioLoop to unpause and pick up that our parent has been removed. Note
// that in some cases, we might not have an active consumer. This means
// that the ioLoop is about to try and fetch one, finding the missing
// parent in doing so.
if activeConsumer != nil {
activeConsumer.Close()
}
// Lets wait till the ioLoop has shut everything down before returning.
<-pipecli.closedSig
logDebugf("Pipeline Client `%s/%p` has exited", pipecli.address, pipecli)
return nil
}