-
Notifications
You must be signed in to change notification settings - Fork 277
/
metric_uid.go
136 lines (123 loc) · 3.62 KB
/
metric_uid.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package index
import (
"go.uber.org/zap"
"github.com/eleme/lindb/kv"
"github.com/eleme/lindb/pkg/logger"
"github.com/eleme/lindb/pkg/tree"
"github.com/eleme/lindb/pkg/util"
)
//MetricUid represents metric name unique id under the database
type MetricUID struct {
partition uint32
metrics *tree.BTree //Key is the ascii of the first letter of metric
sequenceID uint32 //unique sequence id
family kv.Family
dbField zap.Field
}
//NewMetricUID creation requires kvFamily
func NewMetricUID(metricFamily kv.Family) *MetricUID {
//Get the last written sequenceID, otherwise start from 0
seq := uint32(0)
metricFamily.Lookup(MetricSequenceIDKey, func(bytes []byte) bool {
seq = util.BytesToUint32(bytes)
return true
})
return &MetricUID{
metrics: tree.NewBTree(),
sequenceID: seq,
family: metricFamily,
dbField: zap.String("db", "db"),
}
}
//GetOrCreateMetricID returns find the metric ID associated with a given name or create it.
func (m *MetricUID) GetOrCreateMetricID(metricName string, create bool) (uint32, bool) {
if len(metricName) > 0 {
nameBytes := []byte(metricName)
partition := getPartition(nameBytes)
id := m.getMetricIDFromDisk(partition, nameBytes)
if id == NotFoundMetricID {
// if not exists
if partition != m.partition {
err := m.Flush()
if nil != err {
logger.GetLogger("tsdb/index").Error("flush metric tree error!", m.dbField)
}
m.partition = partition
m.metrics.Clear()
}
if create {
m.sequenceID++
m.metrics.Put(nameBytes, int(m.sequenceID))
return m.sequenceID, true
}
return NotFoundMetricID, false
}
return id, true
}
return NotFoundMetricID, false
}
//SuggestMetrics returns suggestions of metric names given a search prefix.
func (m *MetricUID) SuggestMetrics(prefix string, limit int16) map[string]struct{} {
if len(prefix) > 0 {
nameBytes := []byte(prefix)
partition := getPartition(nameBytes)
m.family.Lookup(partition, func(bytes []byte) bool {
treeReader := tree.NewReader(bytes)
it := treeReader.Seek(nameBytes)
if nil != it {
//todo
logger.GetLogger("tsdb/index").Error("", m.dbField)
}
return true
})
} else {
//todo
logger.GetLogger("tsdb/index").Warn("", m.dbField)
//partitions := m.getSortPartition()
}
return nil
}
//Flush represents forces a flush of in-memory data, and clear it
func (m *MetricUID) Flush() error {
if m.metrics.Len() == 0 {
return nil
}
flusher := m.family.NewFlusher()
bs := util.Uint32ToBytes(m.sequenceID)
err := flusher.Add(MetricSequenceIDKey, bs)
if nil != err {
logger.GetLogger("tsdb/index").Error("write metric sequenceID error!", m.dbField, logger.Error(err))
return err
}
writer := tree.NewWriter(m.metrics)
byteArray, err := writer.Encode()
if nil != err {
logger.GetLogger("tsdb/index").Error(" metricTree encode error!", m.dbField, logger.Error(err))
}
err = flusher.Add(m.partition, byteArray)
if nil != err {
logger.GetLogger("tsdb/index").Error("write metric tree error!",
m.dbField, zap.String("partition", string(m.partition)))
return err
}
m.metrics.Clear()
return flusher.Commit()
}
// getMetricIdFromDisk return unique int32 id, return -1 if not found
func (m *MetricUID) getMetricIDFromDisk(partition uint32, metric []byte) uint32 {
var metricID = NotFoundMetricID
m.family.Lookup(partition, func(bytes []byte) bool {
treeReader := tree.NewReader(bytes)
v, ok := treeReader.Get(metric)
if ok {
metricID = uint32(v)
return true
}
return false
})
return metricID
}
//getPartition returns determine partition according to the first byte
func getPartition(name []byte) uint32 {
return uint32(name[0])
}