/
memcache_client.go
executable file
·220 lines (197 loc) · 6.32 KB
/
memcache_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package multitenant
import (
"fmt"
"net"
"sort"
"sync"
"time"
"context"
"github.com/bradfitz/gomemcache/memcache"
opentracing "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/scope/report"
)
var (
memcacheRequests = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "scope",
Name: "memcache_requests_total",
Help: "Total count of reports requested from memcache that were not found in our in-memory cache.",
})
memcacheHits = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "scope",
Name: "memcache_hits_total",
Help: "Total count of reports found in memcache that were not found in our in-memory cache.",
})
memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "scope",
Name: "memcache_request_duration_seconds",
Help: "Total time spent in seconds doing memcache requests.",
Buckets: prometheus.DefBuckets,
}, []string{"method", "status_code"})
)
func init() {
prometheus.MustRegister(memcacheRequests)
prometheus.MustRegister(memcacheHits)
prometheus.MustRegister(memcacheRequestDuration)
}
// MemcacheClient is a memcache client that gets its server list from SRV
// records, and periodically updates that ServerList.
type MemcacheClient struct {
client *memcache.Client
serverList *memcache.ServerList
expiration int32
hostname string
service string
compressionLevel int
quit chan struct{}
wait sync.WaitGroup
}
// MemcacheConfig defines how a MemcacheClient should be constructed.
type MemcacheConfig struct {
Host string
Service string
Timeout time.Duration
UpdateInterval time.Duration
Expiration time.Duration
CompressionLevel int
}
// NewMemcacheClient creates a new MemcacheClient that gets its server list
// from SRV and updates the server list on a regular basis.
func NewMemcacheClient(config MemcacheConfig) *MemcacheClient {
var servers memcache.ServerList
client := memcache.NewFromSelector(&servers)
client.Timeout = config.Timeout
newClient := &MemcacheClient{
client: client,
serverList: &servers,
expiration: int32(config.Expiration.Seconds()),
hostname: config.Host,
service: config.Service,
compressionLevel: config.CompressionLevel,
quit: make(chan struct{}),
}
err := newClient.updateMemcacheServers()
if err != nil {
log.Errorf("Error setting memcache servers to '%v': %v", config.Host, err)
}
newClient.wait.Add(1)
go newClient.updateLoop(config.UpdateInterval)
return newClient
}
// Stop the memcache client.
func (c *MemcacheClient) Stop() {
close(c.quit)
c.wait.Wait()
}
func (c *MemcacheClient) updateLoop(updateInterval time.Duration) error {
defer c.wait.Done()
ticker := time.NewTicker(updateInterval)
var err error
for {
select {
case <-ticker.C:
err = c.updateMemcacheServers()
if err != nil {
log.Warningf("Error updating memcache servers: %v", err)
}
case <-c.quit:
ticker.Stop()
}
}
}
// updateMemcacheServers sets a memcache server list from SRV records. SRV
// priority & weight are ignored.
func (c *MemcacheClient) updateMemcacheServers() error {
_, addrs, err := net.LookupSRV(c.service, "tcp", c.hostname)
if err != nil {
return err
}
var servers []string
for _, srv := range addrs {
servers = append(servers, fmt.Sprintf("%s:%d", srv.Target, srv.Port))
}
// ServerList deterministically maps keys to _index_ of the server list.
// Since DNS returns records in different order each time, we sort to
// guarantee best possible match between nodes.
sort.Strings(servers)
return c.serverList.SetServers(servers...)
}
func memcacheStatusCode(err error) string {
// See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables
switch err {
case nil:
return "200"
case memcache.ErrCacheMiss:
return "404"
case memcache.ErrMalformedKey:
return "400"
default:
return "500"
}
}
// FetchReports gets reports from memcache.
func (c *MemcacheClient) FetchReports(ctx context.Context, keys []string) (map[string]report.Report, []string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Memcache.FetchReports")
defer span.Finish()
defer memcacheRequests.Add(float64(len(keys)))
var found map[string]*memcache.Item
err := instrument.TimeRequestHistogramStatus(ctx, "Memcache.GetMulti", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
var err error
found, err = c.client.GetMulti(keys)
return err
})
span.LogFields(otlog.Int("keys", len(keys)), otlog.Int("hits", len(found)))
if err != nil {
return nil, keys, err
}
// Decode all the reports in parallel.
type result struct {
key string
report *report.Report
}
ch := make(chan result, len(keys))
var missing []string
for _, key := range keys {
item, ok := found[key]
if !ok {
missing = append(missing, key)
continue
}
go func(key string) {
rep, err := report.MakeFromBytes(item.Value)
if err != nil {
log.Warningf("Corrupt report in memcache %v: %v", key, err)
ch <- result{key: key}
return
}
ch <- result{key: key, report: rep}
}(key)
}
reports := map[string]report.Report{}
lenFound := len(keys) - len(missing)
for i := 0; i < lenFound; i++ {
r := <-ch
if r.report == nil {
missing = append(missing, r.key)
} else {
reports[r.key] = *r.report
}
}
if len(missing) > 0 {
sort.Strings(missing)
log.Warningf("Missing %d reports from memcache: %v", len(missing), missing)
}
memcacheHits.Add(float64(len(reports)))
return reports, missing, nil
}
// StoreReportBytes stores a report.
func (c *MemcacheClient) StoreReportBytes(ctx context.Context, key string, rpt []byte) (int, error) {
err := instrument.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
item := memcache.Item{Key: key, Value: rpt, Expiration: c.expiration}
return c.client.Set(&item)
})
return len(rpt), err
}