forked from couchbase/gocbcore
/
memdqpackets.go
130 lines (105 loc) · 3.25 KB
/
memdqpackets.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package gocbcore
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/opentracing/opentracing-go"
)
// The data for a response from a server. This includes the
// packets data along with some useful meta-data related to
// the response.
type memdQResponse struct {
memdPacket
sourceAddr string
sourceConnId string
}
type callback func(*memdQResponse, *memdQRequest, error)
// The data for a request that can be queued with a memdqueueconn,
// and can potentially be rerouted to multiple servers due to
// configuration changes.
type memdQRequest struct {
memdPacket
// Static routing properties
ReplicaIdx int
Callback callback
Persistent bool
// Owner represents the agent which created and currently owns
// this request. This is used for specialized routing such as
// not-my-vbucket and enhanced errors.
owner *Agent
// This tracks when the request was dispatched so that we can
// properly prioritize older requests to try and meet timeout
// requirements.
dispatchTime time.Time
// This stores a pointer to the server that currently own
// this request. This allows us to remove it from that list
// whenever the request is cancelled.
queuedWith unsafe.Pointer
// This stores a pointer to the opList that currently is holding
// this request. This allows us to remove it form that list
// whenever the request is cancelled
waitingIn unsafe.Pointer
// This keeps track of whether the request has been 'completed'
// which is synonymous with the callback having been invoked.
// This is an integer to allow us to atomically control it.
isCompleted uint32
// This is used to lock access to the request when processing
// a timeout, a response or spans
processingLock sync.Mutex
// This stores the number of times that the item has been
// retried. It is used for various non-linear retry
// algorithms.
retryCount uint32
RootTraceContext opentracing.SpanContext
cmdTraceSpan opentracing.Span
netTraceSpan opentracing.Span
CollectionName string
ScopeName string
}
func (req *memdQRequest) cloneNew() *memdQRequest {
return &memdQRequest{
memdPacket: req.memdPacket,
ReplicaIdx: req.ReplicaIdx,
Callback: req.Callback,
Persistent: req.Persistent,
owner: req.owner,
RootTraceContext: req.RootTraceContext,
}
}
func (req *memdQRequest) tryCallback(resp *memdQResponse, err error) bool {
if req.Persistent {
if atomic.LoadUint32(&req.isCompleted) == 0 {
req.Callback(resp, req, err)
return true
}
} else {
if atomic.SwapUint32(&req.isCompleted, 1) == 0 {
req.Callback(resp, req, err)
return true
}
}
return false
}
func (req *memdQRequest) isCancelled() bool {
return atomic.LoadUint32(&req.isCompleted) != 0
}
func (req *memdQRequest) Cancel() bool {
req.processingLock.Lock()
if atomic.SwapUint32(&req.isCompleted, 1) != 0 {
// Someone already completed this request
req.processingLock.Unlock()
return false
}
queuedWith := (*memdOpQueue)(atomic.LoadPointer(&req.queuedWith))
if queuedWith != nil {
queuedWith.Remove(req)
}
waitingIn := (*memdClient)(atomic.LoadPointer(&req.waitingIn))
if waitingIn != nil {
waitingIn.CancelRequest(req)
}
req.owner.cancelReqTrace(req, ErrCancelled)
req.processingLock.Unlock()
return true
}