/
cache_shared.go
146 lines (132 loc) · 3.87 KB
/
cache_shared.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package db
import (
"encoding/json"
"fmt"
"github.com/gme-sh/gme.sh-api/pkg/gme-sh/short"
"strings"
)
const (
// SCacheChannelBreak -> Channel to subscribe for cache break notifications
SCacheChannelBreak = "gme.sh-scache:break"
// SCacheChannelUpdate -> Channel to subscribe for cache update notifications
SCacheChannelUpdate = "gme.sh-scache:update"
)
// SharedCache only makes sense if you want to run multiple backend shards / servers at the same time.
// If a request is then cached on one server, this cache is passed on to all other servers via PubSub,
// whereby the requests to the database are brought to a minimum.
type SharedCache struct {
NodeID string
pubSub PubSub
local *LocalCache
}
// NewSharedCache creates a new SharedCache object and returns it
func NewSharedCache(pubSub PubSub) *SharedCache {
return &SharedCache{
NodeID: string(short.GenerateID(6, short.AlwaysTrue, 0)),
pubSub: pubSub,
local: NewLocalCache(),
}
}
// UpdateCache adds a new ShortURL object to the cache.
func (s *SharedCache) UpdateCache(u *short.ShortURL) (err error) {
err = s.local.UpdateCache(u)
if err != nil {
return
}
err = s.pubSub.Publish(s.createSCacheUpdatePayload(u))
return
}
// BreakCache removes the ShortURL object from the cache that matches the ShortID.
// No further check is made whether it was already in the cache or not.
// returns an error if there was an error publishing the break notification
func (s *SharedCache) BreakCache(id *short.ShortID) (err error) {
// since the BreakCache from LocalCache always returns nil,
// we don't have to deal with any exception here
_ = s.local.BreakCache(id)
err = s.pubSub.Publish(s.createSCacheBreakPayload(id))
return
}
func (s *SharedCache) GetShortURL(id *short.ShortID) *short.ShortURL {
i, found := s.local.Get(id.String())
if !found {
return nil
}
u, ok := i.(*short.ShortURL)
if !ok {
return nil
}
if u.IsExpired() {
_ = s.BreakCache(id)
u = nil
}
return u
}
// Get returns an interface from the cache if it exists.
// Otherwise the interface is nil and the return value is false.
// Alias for LocalCache.Get()
func (s *SharedCache) Get(key string) (interface{}, bool) {
return s.local.Get(key)
}
func extractID(in *string) (id string) {
// strip
*in = strings.TrimSpace(*in)
// find space
space := strings.Index(*in, " ")
// extract id
id = (*in)[:space]
// update in
*in = strings.TrimSpace((*in)[space:])
return
}
func (s *SharedCache) sameID(id string) bool {
if id == "" {
return true
}
if id == s.NodeID {
return true
}
return false
}
// publish gme.sh-scache:update <nodeid> <json>
func (s *SharedCache) createSCacheUpdatePayload(u *short.ShortURL) (string, string) {
js, _ := json.Marshal(u)
return SCacheChannelUpdate, fmt.Sprintf("%s %s", s.NodeID, string(js))
}
// publish gme.sh-scache:break <nodeid> <id>
func (s *SharedCache) createSCacheBreakPayload(i *short.ShortID) (string, string) {
return SCacheChannelBreak, fmt.Sprintf("%s %s", s.NodeID, i.String())
}
// Subscribe subscribes to SCacheChannelBreak + SCacheChannelUpdate channels and processes their messages
func (s *SharedCache) Subscribe() (err error) {
err = s.pubSub.Subscribe(func(channel, payload string) {
switch channel {
case SCacheChannelUpdate:
// publish gme.sh-scache:update <nodeid> <json>
// get node id
nodeID := extractID(&payload)
if s.sameID(nodeID) {
return
}
// decode json to shortURL object
var sh *short.ShortURL
if err := json.Unmarshal([]byte(payload), &sh); err != nil {
return
}
// save to PersistentDatabase
_ = s.local.UpdateCache(sh)
break
case SCacheChannelBreak:
// publish gme.sh-scache:break <nodeid> <id>
// get node id
nodeID := extractID(&payload)
if s.sameID(nodeID) {
return
}
id := short.ShortID(payload)
// remove from cache
_ = s.local.BreakCache(&id)
break
}
}, SCacheChannelBreak, SCacheChannelUpdate)
return
}