This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 106
/
notifier.go
111 lines (95 loc) · 2.93 KB
/
notifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package mdata
import (
"encoding/json"
"github.com/raintank/schema"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/stats"
log "github.com/sirupsen/logrus"
)
var (
notifiers []Notifier
// metric cluster.notifier.all.messages-received is a counter of messages received from cluster notifiers
messagesReceived = stats.NewCounter32("cluster.notifier.all.messages-received")
)
type Notifier interface {
Send(SavedChunk)
}
//PersistMessage format version
const PersistMessageBatchV1 = 1
type PersistMessageBatch struct {
Instance string `json:"instance"`
SavedChunks []SavedChunk `json:"saved_chunks"`
}
// SavedChunk represents a chunk persisted to the store
// Key is a stringified schema.AMKey
type SavedChunk struct {
Key string `json:"key"`
T0 uint32 `json:"t0"`
}
func SendPersistMessage(key string, t0 uint32) {
sc := SavedChunk{Key: key, T0: t0}
for _, h := range notifiers {
h.Send(sc)
}
}
func InitPersistNotifier(not ...Notifier) {
notifiers = not
}
type NotifierHandler interface {
// Handle handles an incoming message
Handle([]byte)
// PartitionOf is used for notifiers that want to flush and need partition information for metrics
PartitionOf(key schema.MKey) (int32, bool)
}
type DefaultNotifierHandler struct {
idx idx.MetricIndex
metrics Metrics
}
func NewDefaultNotifierHandler(metrics Metrics, idx idx.MetricIndex) DefaultNotifierHandler {
return DefaultNotifierHandler{
idx: idx,
metrics: metrics,
}
}
func (dn DefaultNotifierHandler) PartitionOf(key schema.MKey) (int32, bool) {
def, ok := dn.idx.Get(key)
return def.Partition, ok
}
func (dn DefaultNotifierHandler) Handle(data []byte) {
version := uint8(data[0])
if version == uint8(PersistMessageBatchV1) {
batch := PersistMessageBatch{}
err := json.Unmarshal(data[1:], &batch)
if err != nil {
log.Errorf("failed to unmarsh batch message: %s -- skipping", err)
return
}
messagesReceived.Add(len(batch.SavedChunks))
for _, c := range batch.SavedChunks {
amkey, err := schema.AMKeyFromString(c.Key)
if err != nil {
log.Errorf("notifier: failed to convert %q to AMKey: %s -- skipping", c.Key, err)
continue
}
// we only need to handle saves for series that we know about.
// if the series is not in the index, then we dont need to worry about it.
def, ok := dn.idx.Get(amkey.MKey)
if !ok {
log.Debugf("notifier: skipping metric with MKey %s as it is not in the index", amkey.MKey)
continue
}
agg := dn.metrics.GetOrCreate(amkey.MKey, def.SchemaId, def.AggId, uint32(def.Interval))
if amkey.Archive != 0 {
consolidator := consolidation.FromArchive(amkey.Archive.Method())
aggSpan := amkey.Archive.Span()
agg.(*AggMetric).SyncAggregatedChunkSaveState(c.T0, consolidator, aggSpan)
} else {
agg.(*AggMetric).SyncChunkSaveState(c.T0)
}
}
} else {
log.Errorf("notifier: unknown version %d", version)
}
return
}