-
Notifications
You must be signed in to change notification settings - Fork 24
/
dist-cache.go
166 lines (144 loc) · 5.09 KB
/
dist-cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package distcache
import (
"context"
"strconv"
"sync"
"time"
"github.com/buraksezer/olric"
olricconfig "github.com/buraksezer/olric/config"
"github.com/clarketm/json"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/structpb"
distcachev1 "github.com/fluxninja/aperture/v2/api/gen/proto/go/aperture/distcache/v1"
"github.com/fluxninja/aperture/v2/pkg/log"
"github.com/fluxninja/aperture/v2/pkg/metrics"
)
// DistCache is a peer to peer distributed cache.
type DistCache struct {
distcachev1.UnimplementedDistCacheServiceServer
lock sync.Mutex
config *olricconfig.Config
olric *olric.Olric
client olric.Client
metrics *DistCacheMetrics
shutDowner fx.Shutdowner
statsFailureCount uint8
}
// NewDistCache creates a new instance of DistCache.
func NewDistCache(config *olricconfig.Config, olric *olric.Olric, metrics *DistCacheMetrics, shutDowner fx.Shutdowner) *DistCache {
return &DistCache{
config: config,
olric: olric,
client: olric.NewEmbeddedClient(),
metrics: metrics,
shutDowner: shutDowner,
}
}
// NewDMap creates a new DMap.
func (dc *DistCache) NewDMap(name string, config olricconfig.DMap) (olric.DMap, error) {
dc.lock.Lock()
defer dc.lock.Unlock()
dc.config.DMaps.Custom[name] = config
d, err := dc.client.NewDMap(name)
if err != nil {
log.Error().Err(err).Msgf("Failed to create new DMap: %s, shutting down", name)
// shutdown
_ = dc.shutDowner.Shutdown()
return nil, err
}
return d, nil
}
// DeleteDMap deletes a DMap.
func (dc *DistCache) DeleteDMap(name string) error {
dc.lock.Lock()
defer dc.lock.Unlock()
defer delete(dc.config.DMaps.Custom, name)
return dc.client.DeleteDMap(name)
}
func (dc *DistCache) scrapeMetrics(ctx context.Context) (proto.Message, error) {
stats, err := dc.client.Stats(ctx, "")
if err != nil {
dc.statsFailureCount++
if dc.statsFailureCount > 10 {
log.Error().Err(err).Msgf("Failed to scrape Olric statistics 10 times in a row, shutting down")
_ = dc.shutDowner.Shutdown()
}
log.Error().Err(err).Msgf("Failed to scrape Olric statistics")
return nil, err
}
dc.statsFailureCount = 0
memberID := stats.Member.ID
memberName := stats.Member.Name
metricLabels := make(prometheus.Labels)
metricLabels[metrics.DistCacheMemberIDLabel] = strconv.FormatUint(memberID, 10)
metricLabels[metrics.DistCacheMemberNameLabel] = memberName
entriesTotalGauge, err := dc.metrics.EntriesTotal.GetMetricWith(metricLabels)
if err != nil {
log.Debug().Msgf("Could not extract entries total gauge metric from olric instance: %v", err)
} else {
entriesTotalGauge.Set(float64(stats.DMaps.EntriesTotal))
}
deleteHitsGauge, err := dc.metrics.DeleteHits.GetMetricWith(metricLabels)
if err != nil {
log.Debug().Msgf("Could not extract delete hits gauge metric from olric instance: %v", err)
} else {
deleteHitsGauge.Set(float64(stats.DMaps.DeleteHits))
}
deleteMissesGauge, err := dc.metrics.DeleteMisses.GetMetricWith(metricLabels)
if err != nil {
log.Debug().Msgf("Could not extract delete misses gauge metric from olric instance: %v", err)
} else {
deleteMissesGauge.Set(float64(stats.DMaps.DeleteMisses))
}
getMissesGauge, err := dc.metrics.GetMisses.GetMetricWith(metricLabels)
if err != nil {
log.Debug().Msgf("Could not extract get misses gauge metric from olric instance: %v", err)
} else {
getMissesGauge.Set(float64(stats.DMaps.GetMisses))
}
getHitsGauge, err := dc.metrics.GetHits.GetMetricWith(metricLabels)
if err != nil {
log.Debug().Msgf("Could not extract get hits gauge metric from olric instance: %v", err)
} else {
getHitsGauge.Set(float64(stats.DMaps.GetHits))
}
evictedTotalGauge, err := dc.metrics.EvictedTotal.GetMetricWith(metricLabels)
if err != nil {
log.Debug().Msgf("Could not extract evicted total gauge metric from olric instance: %v", err)
} else {
evictedTotalGauge.Set(float64(stats.DMaps.EvictedTotal))
}
return nil, nil
}
// GetStats returns stats of the current Olric member.
func (dc *DistCache) GetStats(ctx context.Context, _ *emptypb.Empty) (*structpb.Struct, error) {
// create a new context with a timeout to avoid hanging
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
stats, err := dc.client.Stats(ctx, "")
if err != nil {
log.Error().Err(err).Msgf("Failed to scrape Olric statistics")
return nil, err
}
rawStats, err := json.Marshal(stats)
if err != nil {
log.Error().Err(err).Msgf("Failed to marshal Olric statistics")
return nil, err
}
structpbStats := &structpb.Struct{}
err = json.Unmarshal(rawStats, structpbStats)
if err != nil {
log.Error().Err(err).Msgf("Failed to unmarshal Olric statistics")
return nil, err
}
// remove empty partitions from the stats
for k, v := range structpbStats.GetFields()["partitions"].GetStructValue().GetFields() {
if v.GetStructValue().GetFields()["length"].GetNumberValue() == 0 {
delete(structpbStats.GetFields()["partitions"].GetStructValue().GetFields(), k)
}
}
return structpbStats, nil
}