/
watch.go
73 lines (62 loc) · 1.58 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
"sync"
)
// Set up change stream watching based on Watch config
func setupWatch(watch Watch, database *mongo.Database, waitGroup *sync.WaitGroup) {
for _, collectionName := range watch.Collections {
collection := database.Collection(collectionName)
matchPipeline := bson.D{
bson.E{
Key: "$match",
Value: bson.D{
bson.E{
Key: "operationType",
Value: bson.D{
bson.E{Key: "$in", Value: watch.EventTypes},
},
},
},
},
}
stream, err := collection.Watch(
context.Background(),
mongo.Pipeline{matchPipeline},
options.ChangeStream().SetFullDocument(options.UpdateLookup),
)
if err != nil {
panic(err)
}
waitGroup.Add(1)
go iterateChangeStream(waitGroup, stream, watch)
}
}
// Listens to incoming change events and performs actions with them
func iterateChangeStream(waitGroup *sync.WaitGroup, stream *mongo.ChangeStream, watch Watch) {
defer func() {
err := stream.Close(context.Background())
if err != nil {
log.Println("Error during change stream closing", err)
}
}()
defer waitGroup.Done()
for stream.Next(context.Background()) {
var data bson.M
if err := stream.Decode(&data); err != nil {
panic(err)
}
text, err := renderTemplate(data, watch)
if err != nil {
log.Println("Error during template rendering", err)
}
err = notify(text, watch.NotifyHook)
if err != nil {
log.Println("Error during sending notification", err)
}
}
}