/
watch.go
157 lines (133 loc) · 3.76 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package crdb
import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
v0 "github.com/authzed/authzed-go/proto/authzed/api/v0"
"github.com/shopspring/decimal"
"github.com/authzed/spicedb/internal/datastore"
)
const queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '1s';"
func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision) (<-chan *datastore.RevisionChanges, <-chan error) {
updates := make(chan *datastore.RevisionChanges, cds.watchBufferLength)
errs := make(chan error, 1)
interpolated := fmt.Sprintf(queryChangefeed, tableTuple, afterRevision)
go func() {
defer close(updates)
defer close(errs)
pendingChanges := make(map[string]*datastore.RevisionChanges)
changes, err := cds.conn.Query(ctx, interpolated)
if err != nil {
if errors.Is(ctx.Err(), context.Canceled) {
errs <- datastore.NewWatchCanceledErr()
} else {
errs <- err
}
return
}
// We call Close async here because it can be slow and blocks closing the channels. There is
// no return value so we're not really losing anything.
defer func() { go changes.Close() }()
for changes.Next() {
var unused interface{}
var changeJSON []byte
var primaryKeyValuesJSON []byte
if err := changes.Scan(&unused, &primaryKeyValuesJSON, &changeJSON); err != nil {
if errors.Is(ctx.Err(), context.Canceled) {
errs <- datastore.NewWatchCanceledErr()
} else {
errs <- err
}
return
}
var changeDetails struct {
Resolved string
Updated string
After interface{}
}
if err := json.Unmarshal(changeJSON, &changeDetails); err != nil {
errs <- err
return
}
if changeDetails.Resolved != "" {
// This entry indicates that we are ready to potentially emit some changes
resolved, err := decimal.NewFromString(changeDetails.Resolved)
if err != nil {
errs <- err
return
}
var toEmit []*datastore.RevisionChanges
for ts, values := range pendingChanges {
if values.Revision.LessThanOrEqual(resolved) {
delete(pendingChanges, ts)
toEmit = append(toEmit, values)
}
}
sort.Slice(toEmit, func(i, j int) bool {
return toEmit[i].Revision.LessThan(toEmit[j].Revision)
})
for _, change := range toEmit {
select {
case updates <- change:
default:
errs <- datastore.NewWatchDisconnectedErr()
return
}
}
continue
}
var pkValues [6]string
if err := json.Unmarshal(primaryKeyValuesJSON, &pkValues); err != nil {
errs <- err
return
}
revision, err := decimal.NewFromString(changeDetails.Updated)
if err != nil {
errs <- fmt.Errorf("malformed update timestamp: %w", err)
return
}
oneChange := &v0.RelationTupleUpdate{
Tuple: &v0.RelationTuple{
ObjectAndRelation: &v0.ObjectAndRelation{
Namespace: pkValues[0],
ObjectId: pkValues[1],
Relation: pkValues[2],
},
User: &v0.User{
UserOneof: &v0.User_Userset{
Userset: &v0.ObjectAndRelation{
Namespace: pkValues[3],
ObjectId: pkValues[4],
Relation: pkValues[5],
},
},
},
},
}
if changeDetails.After == nil {
oneChange.Operation = v0.RelationTupleUpdate_DELETE
} else {
oneChange.Operation = v0.RelationTupleUpdate_TOUCH
}
pending, ok := pendingChanges[changeDetails.Updated]
if !ok {
pending = &datastore.RevisionChanges{
Revision: revision,
}
pendingChanges[changeDetails.Updated] = pending
}
pending.Changes = append(pending.Changes, oneChange)
}
if changes.Err() != nil {
if errors.Is(ctx.Err(), context.Canceled) {
errs <- datastore.NewWatchCanceledErr()
} else {
errs <- changes.Err()
}
return
}
}()
return updates, errs
}