/
listener.go
96 lines (78 loc) · 2.6 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package firecache
import (
"context"
"cloud.google.com/go/firestore"
)
type listener struct {
ctx context.Context
database *database
cache map[string]*listenerCache
}
type listenerCache struct {
callbacks map[*func(event ListenerEvent)]func(event ListenerEvent)
dataReceived bool
initEvent ListenerEvent
unsubscribe context.CancelFunc
}
func (l *listener) addListener(path string, query Q, callback *func(event ListenerEvent), errorhook func(error)) {
key := parseKey(path, query)
scope, ok := l.cache[key]
if !ok {
l.cache[key] = &listenerCache{}
scope = l.cache[key]
scope.callbacks = make(map[*func(event ListenerEvent)]func(event ListenerEvent))
scope.dataReceived = false
scope.unsubscribe = l.database.addListener(path, query, func(data any) {
var event ListenerEvent
if isDoc(path) {
doc := data.(*firestore.DocumentSnapshot)
docdata := Document(doc.Data())
var docpointer *Document
if doc.Exists() {
docpointer = &docdata
}
scope.initEvent = ListenerEvent{Document: docpointer}
event = ListenerEvent{Document: docpointer}
} else {
changes := data.(*firestore.QuerySnapshot).Changes
docChangeList := make(DocumentChangeList, 0)
for _, change := range changes {
docChangeList = append(docChangeList, DocumentChangeEntry{Id: change.Doc.Ref.ID, Document: change.Doc.Data(), Kind: ChangeKind(change.Kind), NewIndex: change.NewIndex, OldIndex: change.OldIndex})
}
docs, err := data.(*firestore.QuerySnapshot).Documents.GetAll()
if err != nil {
errorhook(err)
return
}
docList := make(DocumentList, 0)
initDocChangeList := make(DocumentChangeList, 0)
for index, doc := range docs {
docList = append(docList, DocumentEntry{Id: doc.Ref.ID, Document: doc.Data()})
initDocChangeList = append(initDocChangeList, DocumentChangeEntry{Id: doc.Ref.ID, Document: doc.Data(), Kind: Added, NewIndex: index, OldIndex: -1})
}
scope.initEvent = ListenerEvent{DocumentList: &docList, DocumentChangeList: &initDocChangeList}
event = ListenerEvent{DocumentList: &docList, DocumentChangeList: &docChangeList}
}
for _, cb := range scope.callbacks {
cb(event)
}
scope.dataReceived = true
})
}
scope.callbacks[callback] = (*callback)
if scope.dataReceived {
(*callback)(scope.initEvent)
}
}
func (l *listener) removeListener(path string, query Q, callback *func(event ListenerEvent)) {
key := parseKey(path, query)
scope, ok := l.cache[key]
if !ok {
return
}
delete(scope.callbacks, callback)
if len(scope.callbacks) == 0 {
scope.unsubscribe()
delete(l.cache, key)
}
}