-
Notifications
You must be signed in to change notification settings - Fork 3
/
stream.go
201 lines (164 loc) · 3.61 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package coal
import (
"sync"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
)
// Event defines the event type.
type Event string
const (
// Created is emitted when a document has been created.
Created Event = "created"
// Updated is emitted when a document has been updated.
Updated Event = "updated"
// Deleted is emitted when a document has been deleted.
Deleted Event = "deleted"
)
// Receiver is a callback that receives stream events.
type Receiver func(Event, bson.ObjectId, Model)
// Stream simplifies the handling of change streams to receives changes to
// documents.
type Stream struct {
store *Store
model Model
// Reporter is called with errors.
Reporter func(error)
mutex sync.Mutex
current *mgo.ChangeStream
token *bson.Raw
closed bool
}
// NewStream creates and returns a new stream.
func NewStream(store *Store, model Model) *Stream {
return &Stream{
store: store,
model: model,
}
}
// Open will open the stream and continuously forward events to the specified
// receiver until the stream is closed. The provided open function is called
// when the stream has been opened the first time.
func (s *Stream) Open(rec Receiver, open func()) {
go s.open(rec, open)
}
// Close will close the stream.
func (s *Stream) Close() {
// get mutex
s.mutex.Lock()
defer s.mutex.Unlock()
// set flag
s.closed = true
// close active change stream
if s.current != nil {
_ = s.current.Close()
}
}
func (s *Stream) open(rec Receiver, open func()) {
// prepare once
var once sync.Once
// prepare opener
opener := func() {
once.Do(func() {
if open != nil {
open()
}
})
}
// run forever and call reporter with eventual errors
for {
// check status
s.mutex.Lock()
closed := s.closed
s.mutex.Unlock()
// return if closed
if closed {
return
}
// tail stream
err := s.tail(rec, opener)
if err != nil {
if s.Reporter != nil {
s.Reporter(err)
}
}
}
}
func (s *Stream) tail(rec Receiver, open func()) error {
// copy store
store := s.store.Copy()
defer store.Close()
// open change stream
cs, err := store.C(s.model).Watch([]bson.M{}, mgo.ChangeStreamOptions{
FullDocument: mgo.UpdateLookup,
ResumeAfter: s.token,
})
if err != nil {
return err
}
// ensure stream is closed
defer cs.Close()
// save reference and get status
s.mutex.Lock()
closed := s.closed
if !closed {
s.current = cs
}
s.mutex.Unlock()
// return if closed
if closed {
return nil
}
// signal open
open()
// iterate on elements forever
var ch change
for cs.Next(&ch) {
// prepare type
var typ Event
// parse operation type
if ch.OperationType == "insert" {
typ = Created
} else if ch.OperationType == "replace" || ch.OperationType == "update" {
typ = Updated
} else if ch.OperationType == "delete" {
typ = Deleted
} else {
continue
}
// prepare record
var record Model
// unmarshal document for created and updated events
if typ != Deleted {
// unmarshal record
record = s.model.Meta().Make()
err = ch.FullDocument.Unmarshal(record)
if err != nil {
return err
}
// init record
Init(record)
}
// call receiver
rec(typ, ch.DocumentKey.ID, record)
// save token
s.token = &ch.ResumeToken
}
// close stream and check error
err = cs.Close()
if err != nil {
return err
}
// unset reference
s.mutex.Lock()
s.current = nil
s.mutex.Unlock()
return nil
}
type change struct {
ResumeToken bson.Raw `bson:"_id"`
OperationType string `bson:"operationType"`
DocumentKey struct {
ID bson.ObjectId `bson:"_id"`
} `bson:"documentKey"`
FullDocument bson.Raw `bson:"fullDocument"`
}