-
Notifications
You must be signed in to change notification settings - Fork 8
/
tap.go
157 lines (137 loc) · 3.76 KB
/
tap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package walrus
import (
"context"
"expvar"
"time"
sgbucket "github.com/couchbase/sg-bucket"
)
type tapFeedImpl struct {
bucket *WalrusBucket
channel chan sgbucket.FeedEvent
args sgbucket.FeedArguments
events *queue
}
// Starts a TAP feed on a client connection. The events can be read from the returned channel.
// To stop receiving events, call Close() on the feed.
func (bucket *WalrusBucket) StartTapFeed(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) {
channel := make(chan sgbucket.FeedEvent, 10)
feed := &tapFeedImpl{
bucket: bucket,
channel: channel,
args: args,
events: newQueue(),
}
if args.Backfill != sgbucket.FeedNoBackfill {
feed.events.push(&sgbucket.FeedEvent{Opcode: sgbucket.FeedOpBeginBackfill})
bucket.enqueueBackfillEvents(args.Backfill, args.KeysOnly, feed.events)
feed.events.push(&sgbucket.FeedEvent{Opcode: sgbucket.FeedOpEndBackfill})
}
if args.Dump {
feed.events.push(nil) // push an eof
} else {
// Register the feed with the bucket for future notifications:
bucket.lock.Lock()
bucket.tapFeeds = append(bucket.tapFeeds, feed)
bucket.lock.Unlock()
}
go feed.run()
return feed, nil
}
// Until a full DCP implementation is available, walrus wraps tap feed to invoke callback
func (bucket *WalrusBucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error {
tapFeed, err := bucket.StartTapFeed(args, dbStats)
if err != nil {
return err
}
go func() {
eventLoop:
for {
select {
case event := <-tapFeed.Events():
event.TimeReceived = time.Now()
callback(event)
case <-args.Terminator:
break eventLoop
}
}
if args.DoneChan != nil {
close(args.DoneChan)
}
}()
return nil
}
func (feed *tapFeedImpl) Events() <-chan sgbucket.FeedEvent {
return feed.channel
}
func (feed *tapFeedImpl) WriteEvents() chan<- sgbucket.FeedEvent {
return feed.channel
}
// Closes a TapFeed. Call this if you stop using a TapFeed before its channel ends.
func (feed *tapFeedImpl) Close() error {
feed.bucket.lock.Lock()
defer feed.bucket.lock.Unlock()
for i, afeed := range feed.bucket.tapFeeds {
if afeed == feed {
feed.bucket.tapFeeds[i] = nil
}
}
feed.events.close()
feed.bucket = nil
return nil
}
func (feed *tapFeedImpl) run() {
defer close(feed.channel)
for {
event, _ := feed.events.pull().(*sgbucket.FeedEvent)
if event == nil {
break
}
feed.channel <- *event
}
}
func (bucket *WalrusBucket) enqueueBackfillEvents(startSequence uint64, keysOnly bool, q *queue) {
bucket.lock.RLock()
defer bucket.lock.RUnlock()
for docid, doc := range bucket.Docs {
if doc.Raw != nil && doc.Sequence >= startSequence {
event := sgbucket.FeedEvent{
Opcode: sgbucket.FeedOpMutation,
Key: []byte(docid),
Cas: doc.Sequence,
}
if !keysOnly {
event.Value = doc.Raw
}
q.push(&event)
}
}
}
// Caller must have the bucket's RLock, because this method iterates bucket.tapFeeds
func (bucket *WalrusBucket) _postTapEvent(event sgbucket.FeedEvent) {
var eventNoValue sgbucket.FeedEvent = event // copies the struct
eventNoValue.Value = nil
for _, feed := range bucket.tapFeeds {
if feed != nil && feed.channel != nil {
if feed.args.KeysOnly {
feed.events.push(&eventNoValue)
} else {
feed.events.push(&event)
}
}
}
}
func (bucket *WalrusBucket) _postTapMutationEvent(key string, value []byte, seq uint64) {
bucket._postTapEvent(sgbucket.FeedEvent{
Opcode: sgbucket.FeedOpMutation,
Key: []byte(key),
Value: copySlice(value),
Cas: seq,
})
}
func (bucket *WalrusBucket) _postTapDeletionEvent(key string, seq uint64) {
bucket._postTapEvent(sgbucket.FeedEvent{
Opcode: sgbucket.FeedOpDeletion,
Key: []byte(key),
Cas: seq,
})
}