/
object.go
105 lines (97 loc) · 2.28 KB
/
object.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package realtime
import (
"encoding/json"
"fmt"
"log"
"sync"
"github.com/mattbaird/jsonpatch"
)
type key string
//an Object is embedded into a parent marshallable struct.
//an Object has N subscribers.
//when the parent changes, it is marshalled and sent to each subscriber.
type Object struct {
mut sync.Mutex //protects all object fields
added bool
key key
value interface{}
bytes []byte
version int64
subscribers map[string]*User
checked bool
}
func (o *Object) add(k string, val interface{}) (*Object, error) {
if o.added {
return nil, fmt.Errorf("already been added to a handler")
}
o.added = true
if b, err := json.Marshal(val); err != nil {
return nil, fmt.Errorf("JSON marshalling failed: %s", err)
} else {
o.bytes = b //initial state
}
o.key = key(k)
o.value = val
o.version = 1
o.subscribers = map[string]*User{}
return o, nil
}
//Send the changes from this object since the last update Update subscribers
func (o *Object) Update() {
o.mut.Lock()
o.checked = false
o.mut.Unlock()
}
type update struct {
Key key
Delta bool `json:",omitempty"`
Version int64 //53 usable bits
Data jsonBytes
}
//called by realtime.flusher ONLY!
func (o *Object) computeUpdate() bool {
//ensure only 1 update computation at a time
o.mut.Lock()
defer o.mut.Unlock()
if o.checked {
return false
}
//mark
o.checked = true
newBytes, err := json.Marshal(o.value)
if err != nil {
log.Printf("go-realtime: %s: marshal failed: %s", o.key, err)
return false
}
//calculate change set
ops, _ := jsonpatch.CreatePatch(o.bytes, newBytes)
if len(o.bytes) > 0 && len(ops) == 0 {
return false
}
delta, _ := json.Marshal(ops)
prev := o.version
o.version++
//send this new change to each subscriber
for _, u := range o.subscribers {
update := &update{
Key: o.key,
Version: o.version,
}
u.mut.Lock()
//choose optimal update (send the smallest)
if u.versions[o.key] == prev && len(o.bytes) > 0 && len(delta) < len(o.bytes) {
update.Delta = true
update.Data = delta
} else {
update.Delta = false
update.Data = newBytes
}
//insert pending update
u.pending = append(u.pending, update)
//user now has this version
u.versions[o.key] = o.version
u.mut.Unlock()
}
o.bytes = newBytes
return true
}