/
connection.go
143 lines (119 loc) · 3.22 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package autoupdate
import (
"context"
"encoding/json"
"fmt"
)
// Connection holds the state of a client. It has to be created by colling
// Connect() on a autoupdate.Service instance.
type Connection struct {
autoupdate *Autoupdate
uid int
kb KeysBuilder
tid uint64
filter filter
}
// Next returns the next data for the user.
//
// When Next is called for the first time, it does not block. In this case, it
// is possible, that it returns an empty map.
//
// On every other call, it blocks until there is new data. In this case, the map
// is never empty.
func (c *Connection) Next(ctx context.Context) (map[string]json.RawMessage, error) {
firstTime := c.filter.empty()
var data map[string]json.RawMessage
for len(data) == 0 {
keys, err := c.keys(ctx)
if err != nil {
return nil, fmt.Errorf("getting keys: %w", err)
}
data, err = c.autoupdate.RestrictedData(ctx, c.uid, keys...)
if err != nil {
return nil, fmt.Errorf("get first time restricted data: %w", err)
}
c.filter.filter(data)
if firstTime {
// On firstTime return the data, even when it is empty.
return data, nil
}
}
return data, nil
}
func (c *Connection) keys(ctx context.Context) ([]string, error) {
if c.filter.empty() {
keys, err := c.allKeys(ctx)
if err != nil {
return nil, fmt.Errorf("get all keys: %w", err)
}
return keys, nil
}
keys, err := c.nextKeys(ctx)
if err != nil {
return nil, fmt.Errorf("get next keys: %w", err)
}
return keys, nil
}
func (c *Connection) allKeys(ctx context.Context) ([]string, error) {
if c.tid == 0 {
c.tid = c.autoupdate.topic.LastID()
}
if err := c.kb.Update(ctx); err != nil {
return nil, fmt.Errorf("create keys for keysbuilder: %w", err)
}
return c.kb.Keys(), nil
}
// nextKeys blocks until there are new keys for the user.
func (c *Connection) nextKeys(ctx context.Context) ([]string, error) {
var keys []string
for len(keys) == 0 {
// Blocks until the topic is closed (on server exit) or the context is done.
tid, changedKeys, err := c.autoupdate.topic.Receive(ctx, c.tid)
if err != nil {
return nil, fmt.Errorf("get updated keys: %w", err)
}
c.tid = tid
changedSlice := make(map[string]bool, len(changedKeys))
for _, key := range changedKeys {
var uid int
if _, err := fmt.Sscanf(key, fullUpdateFormat, &uid); err == nil {
// The key is a fullUpdate key. Do not use it, exept of a full
// update.
if uid == -1 || uid == c.uid {
return c.allKeys(ctx)
}
continue
}
changedSlice[key] = true
}
oldKeys := c.kb.Keys()
// Update keysbuilder get new list of keys.
if err := c.kb.Update(ctx); err != nil {
return nil, fmt.Errorf("update keysbuilder: %w", err)
}
// Start with keys hat are new for the user.
keys = keysDiff(oldKeys, c.kb.Keys())
// Append keys that are old but have been changed.
for _, key := range oldKeys {
if !changedSlice[key] {
continue
}
keys = append(keys, key)
}
}
return keys, nil
}
func keysDiff(old []string, new []string) []string {
keySet := make(map[string]bool, len(old))
for _, key := range old {
keySet[key] = true
}
added := []string{}
for _, key := range new {
if keySet[key] {
continue
}
added = append(added, key)
}
return added
}