forked from samsarahq/thunder
/
live.go
203 lines (168 loc) · 4.94 KB
/
live.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package livesql
import (
"context"
"sync"
"github.com/samsarahq/thunder/internal"
"github.com/samsarahq/thunder/reactive"
"github.com/samsarahq/thunder/sqlgen"
)
// dbResource tracks changes to a specific table matching a filter
type dbResource struct {
table string
tester sqlgen.Tester
resource *reactive.Resource
}
func (r *dbResource) shouldInvalidate(update *update) bool {
// Bail out quickly if the table does not match.
if r.table != update.table {
return false
}
// If we failed to parse an update we don't know what happened, so we
// invalidate.
if update.err != nil {
return true
}
// Invalidate if any of the updated rows match the query.
for _, d := range update.deltas {
if r.tester.Test(d.before) || r.tester.Test(d.after) {
return true
}
}
return false
}
// dbTracker tracks many dbResources
type dbTracker struct {
mu sync.Mutex
resources map[*dbResource]struct{}
}
func newDbTracker() *dbTracker {
return &dbTracker{
resources: make(map[*dbResource]struct{}),
}
}
func (t *dbTracker) add(r *dbResource) {
t.mu.Lock()
defer t.mu.Unlock()
t.resources[r] = struct{}{}
}
func (t *dbTracker) remove(r *dbResource) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.resources, r)
}
// processBinlog processes a set of updates from the MySQL binlog
func (t *dbTracker) processBinlog(update *update) {
t.mu.Lock()
defer t.mu.Unlock()
for q := range t.resources {
if q.shouldInvalidate(update) {
q.resource.Invalidate()
}
}
}
func (t *dbTracker) registerDependency(ctx context.Context, table string, tester sqlgen.Tester) {
r := &dbResource{
table: table,
tester: tester,
resource: reactive.NewResource(),
}
r.resource.Cleanup(func() {
t.remove(r)
})
reactive.AddDependency(ctx, r.resource)
t.add(r)
}
// LiveDB is a SQL client that supports live updating queries.
// It relies on a reactive.Rerunner being in the context to register changes in the database (which
// are propagated through said rerunner to its clients). Without this rerunner being in the context
// it falls back to non-live (sqlgen) behavior.
// See https://godoc.org/github.com/samsarahq/thunder/reactive for information on reactive.
type LiveDB struct {
*sqlgen.DB
tracker *dbTracker
}
// NewLiveDB constructs a new LiveDB
func NewLiveDB(db *sqlgen.DB) *LiveDB {
return &LiveDB{
DB: db,
tracker: newDbTracker(),
}
}
type queryCacheKey struct {
clause string
args interface{}
}
// query reactively performs a SelectQuery
func (ldb *LiveDB) query(ctx context.Context, query *sqlgen.BaseSelectQuery) ([]interface{}, error) {
// Fall back to sqlgen querying if there is no reactive rerunner present or if we're in
// a transaction.
if !reactive.HasRerunner(ctx) || ldb.HasTx(ctx) {
return ldb.DB.BaseQuery(ctx, query)
}
selectQuery, err := query.MakeSelectQuery()
if err != nil {
return nil, err
}
clause, args := selectQuery.ToSQL()
// Build a cache key for the query. Convert the args slice into an array so
// it can be stored as a map key.
key := queryCacheKey{clause: clause, args: internal.ToArray(args)}
result, err := reactive.Cache(ctx, key, func(ctx context.Context) (interface{}, error) {
// Build a tester for the dependency.
tester, err := ldb.Schema.MakeTester(query.Table.Name, query.Filter)
if err != nil {
return nil, err
}
// Register the dependency before we do the query to not miss any updates
// between querying and registering.
ldb.tracker.registerDependency(ctx, query.Table.Name, tester)
// Perform the query.
// XXX: This will build the SQL string again... :(
return ldb.DB.BaseQuery(ctx, query)
})
if err != nil {
return nil, err
}
return result.([]interface{}), nil
}
// Query fetches a collection of rows from the database and will invalidate ctx
// when the query result changes
//
// result should be a pointer to a slice of pointers to structs, for example:
//
// var users []*User
// if err := ldb.Query(ctx, &users, nil, nil); err != nil {
//
func (ldb *LiveDB) Query(ctx context.Context, result interface{}, filter sqlgen.Filter, options *sqlgen.SelectOptions) error {
query, err := ldb.Schema.MakeSelect(result, filter, options)
if err != nil {
return err
}
rows, err := ldb.query(ctx, query)
if err != nil {
return err
}
return sqlgen.CopySlice(result, rows)
}
// QueryRow fetches a single row from the database and will invalidate ctx when
// the query result changes
//
// result should be a pointer to a pointer to a struct, for example:
//
// var user *User
// if err := ldb.Query(ctx, &user, Filter{"id": 10}, nil); err != nil {
//
func (ldb *LiveDB) QueryRow(ctx context.Context, result interface{}, filter sqlgen.Filter, options *sqlgen.SelectOptions) error {
query, err := ldb.Schema.MakeSelectRow(result, filter, options)
if err != nil {
return err
}
rows, err := ldb.query(ctx, query)
if err != nil {
return err
}
return sqlgen.CopySingletonSlice(result, rows)
}
func (ldb *LiveDB) Close() error {
return ldb.Conn.Close()
}