-
Notifications
You must be signed in to change notification settings - Fork 0
/
shard.go
161 lines (129 loc) · 3.38 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package tsdb
import (
"sync"
"github.com/cespare/xxhash"
)
const _partitionNum = 16
// Value 保存时间和值
type Value[T any] struct {
UnixNano int64
V T
}
// entry 保存 values,目的减少写入已存在系列的数据的锁争用
type entry[T any] struct {
mu sync.RWMutex
values []Value[T]
}
// newEntry copy Value 并构建一个新的 entry
func newEntry[T any](vs []Value[T]) *entry[T] {
values := make([]Value[T], 0, len(vs))
values = append(values, vs...)
return &entry[T]{values: values}
}
// add 往 entry 中写入数据
func (e *entry[T]) add(values []Value[T]) {
e.mu.Lock()
defer e.mu.Unlock()
e.values = append(e.values, values...)
}
// removeBefore 删除小于 unixNano 的数据
func (e *entry[T]) removeBefore(unixNano int64) {
e.mu.Lock()
defer e.mu.Unlock()
values := make([]Value[T], 0, len(e.values))
for _, v := range e.values {
if v.UnixNano >= unixNano {
values = append(values, v)
}
}
e.values = values
}
// valuesBetween 获取两个时间之间的 Value
func (e *entry[T]) valuesBetween(min, max int64) []Value[T] {
e.mu.RLock()
defer e.mu.RUnlock()
var values []Value[T]
for _, v := range e.values {
if v.UnixNano >= min && v.UnixNano <= max {
values = append(values, v)
}
}
return values
}
// partition hash ring 的一个分片,目的是减少新新系列的锁争用
type partition[T any] struct {
mu sync.RWMutex
// 存储系列和值
// {"series ex:host=A,region=SH":[value1, value2]}
store map[string]*entry[T]
}
func newPartition[T any]() *partition[T] {
store := make(map[string]*entry[T])
return &partition[T]{store: store}
}
// write 往分片中写入数据
func (p *partition[T]) write(key string, values []Value[T]) {
p.mu.RLock()
e := p.store[key]
p.mu.RUnlock()
if e != nil {
// 大部分情况会走进这个 if 里面,如果 系列 已经存在
e.add(values)
return
}
p.mu.Lock()
defer p.mu.Unlock()
// 因为中间有一段过程没锁,可能有别的协程已经写入,所以再检查一遍
if e := p.store[key]; e != nil {
e.add(values)
return
}
e = newEntry(values)
p.store[key] = e
}
// removeBefore 移除时间小于给定值的数据
func (p *partition[T]) removeBefore(unixNano int64) {
p.mu.Lock()
defer p.mu.Unlock()
store := make(map[string]*entry[T], len(p.store))
for k, e := range p.store {
e.removeBefore(unixNano)
// cap = 0 说明上次 remove 的时候已经没有 Value , 较大可能后续也没有 Value ,就不加入 store 了
if cap(e.values) != 0 {
store[k] = e
}
}
p.store = store
}
func (p *partition[T]) valuesBetween(key string, min, max int64) []Value[T] {
p.mu.RLock()
e := p.store[key]
p.mu.RUnlock()
if e == nil {
return nil
}
return e.valuesBetween(min, max)
}
type shard[T any] struct {
partitions []*partition[T]
}
func newShard[T any]() *shard[T] {
partitions := make([]*partition[T], 0, _partitionNum)
for i := 0; i < _partitionNum; i++ {
partitions = append(partitions, newPartition[T]())
}
return &shard[T]{partitions: partitions}
}
func (s *shard[T]) removeBefore(unixNano int64) {
for _, p := range s.partitions {
p.removeBefore(unixNano)
}
}
func (s *shard[T]) getPartitions(key string) *partition[T] {
return s.partitions[int(xxhash.Sum64([]byte(key))%uint64(len(s.partitions)))]
}
func (s *shard[T]) writeMulti(values map[string][]Value[T]) {
for k, v := range values {
s.getPartitions(k).write(k, v)
}
}