/
consistent.go
158 lines (138 loc) · 3.42 KB
/
consistent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package sinking_sdk_go
import (
"errors"
"hash/crc32"
"sort"
"strconv"
"sync"
)
//声明新切片类型
type units []uint32
// Len 返回切片长度
func (x units) Len() int {
return len(x)
}
// Less 比对两个数大小
func (x units) Less(i, j int) bool {
return x[i] < x[j]
}
// Swap 切片中两个值的交换
func (x units) Swap(i, j int) {
x[i], x[j] = x[j], x[i]
}
//当hash环上没有数据时,提示错误
var errEmpty = errors.New("no hash data")
// Consistent 创建结构体保存一致性hash信息
type Consistent struct {
//hash环,key为哈希值,值存放节点的信息
circle map[uint32]string
//已经排序的节点hash切片
sortedHashes units
//虚拟节点个数,用来增加hash的平衡性
VirtualNode int
//map 读写锁
sync.RWMutex
}
// NewConsistent 创建一致性hash算法结构体,设置默认节点数量
func NewConsistent() *Consistent {
return &Consistent{
//初始化变量
circle: make(map[uint32]string),
//设置虚拟节点个数
VirtualNode: 20,
}
}
// generateKey 自动生成key值
func (c *Consistent) generateKey(element string, index int) string {
//副本key生成逻辑
return element + strconv.Itoa(index)
}
// hashKey 获取hash位置
func (c *Consistent) hashKey(key string) uint32 {
if len(key) < 64 {
//声明一个数组长度为64
var scratch [64]byte
//拷贝数据到数组中
copy(scratch[:], key)
//使用IEEE 多项式返回数据的CRC-32校验和
return crc32.ChecksumIEEE(scratch[:len(key)])
}
return crc32.ChecksumIEEE([]byte(key))
}
//更新排序,方便查找
func (c *Consistent) updateSortedHashes() {
hashes := c.sortedHashes[:0]
//判断切片容量,是否过大,如果过大则重置
if cap(c.sortedHashes)/(c.VirtualNode*4) > len(c.circle) {
hashes = nil
}
//添加hashes
for k := range c.circle {
hashes = append(hashes, k)
}
//对所有节点hash值进行排序,
//方便之后进行二分查找
sort.Sort(hashes)
//重新赋值
c.sortedHashes = hashes
}
// Add 向hash环中添加节点
func (c *Consistent) Add(element string) {
//加锁
c.Lock()
//解锁
defer c.Unlock()
c.add(element)
}
// add 添加节点
func (c *Consistent) add(element string) {
//循环虚拟节点,设置副本
for i := 0; i < c.VirtualNode; i++ {
//根据生成的节点添加到hash环中
c.circle[c.hashKey(c.generateKey(element, i))] = element
}
//更新排序
c.updateSortedHashes()
}
// remove 删除节点
func (c *Consistent) remove(element string) {
for i := 0; i < c.VirtualNode; i++ {
delete(c.circle, c.hashKey(c.generateKey(element, i)))
}
c.updateSortedHashes()
}
// Remove 删除一个节点
func (c *Consistent) Remove(element string) {
c.Lock()
defer c.Unlock()
c.remove(element)
}
// search 顺时针查找最近的节点
func (c *Consistent) search(key uint32) int {
//查找算法
f := func(x int) bool {
return c.sortedHashes[x] > key
}
//使用"二分查找"算法来搜索指定切片满足条件的最小值
i := sort.Search(len(c.sortedHashes), f)
//如果超出范围则设置i=0
if i >= len(c.sortedHashes) {
i = 0
}
return i
}
// Get 根据数据标示获取最近的服务器节点信息
func (c *Consistent) Get(name string) (string, error) {
//添加锁
c.RLock()
//解锁
defer c.RUnlock()
//如果为零则返回错误
if len(c.circle) == 0 {
return "", errEmpty
}
//计算hash值
key := c.hashKey(name)
i := c.search(key)
return c.circle[c.sortedHashes[i]], nil
}