/
snowflakehelper.go
146 lines (118 loc) · 4.09 KB
/
snowflakehelper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package snowflakehelper
import (
"context"
"errors"
"fmt"
"strconv"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
const (
workerNodePrefix = "_worker" //节点key前缀, 节点值为CurrentWorkNodeNum, ex: servicename_worker/1, servicename_worker/2
lockKeyPrefix = "_snowflake_lock" //锁的名字, ex: servicename__snowflake_lock
dialTimeout = 2 * time.Second
businessTimeout = 5 * time.Second
leaseTime = int64(60) //租期时间 60s
maxNodeNum = 10240
)
//GenerateSnowFlakeWorkerID 产生一个workerID
func GenerateSnowFlakeWorkerID(config *clientv3.Config, serviceName string) (int64, error) {
workerNodeDir := serviceName + workerNodePrefix
lockDir := serviceName + lockKeyPrefix
client, err := clientv3.New(*config)
if err != nil {
//glog.Errorf("create client err:%v", err)
return 0, err
}
//get a lock
session, err := concurrency.NewSession(client)
if err != nil {
//glog.Errorf("concurrency.NewSession error:%v", err)
return 0, err
}
ctx, cancel := context.WithTimeout(context.Background(), businessTimeout)
defer cancel()
//需要一把锁,防止多个进程重启获得相同的workerId
m := concurrency.NewMutex(session, lockDir)
if err := m.Lock(ctx); err != nil {
//glog.Errorf("lock fail:%v", err)
return 0, err
}
defer m.Unlock(ctx)
ctx2, cancel := context.WithTimeout(context.Background(), businessTimeout)
defer cancel()
resp, err := client.Get(ctx2, workerNodeDir, clientv3.WithPrefix()) //获取所有前缀为worker的节点
if err != nil {
return 0, errors.New("get prefix worker node error")
}
existNodeMap := make(map[int]int) //定义一个map,保存已经存在的节点
for _, ev := range resp.Kvs {
num, _ := strconv.Atoi(string(ev.Value))
existNodeMap[num] = num //put到existNodeMap中
//glog.V(8).Infof("%s, %s", ev.Key, ev.Value)
}
count := 1
var validWorkNodeNum int //从1到1024找最小的number
for ; count < maxNodeNum+1; count++ {
if _, ok := existNodeMap[count]; !ok { //如果不存在,就会直接break
validWorkNodeNum = count
break
}
}
if count == maxNodeNum+1 { //代表maxNodeNum个节点都已经用完了,或者部分节点已经挂掉了,然后key的租期还没有结束,可以重新启动
return 0, errors.New("服务节点数目大于maxNodeNum")
}
err = activeCurrentWorkerNode(client, validWorkNodeNum, workerNodeDir) //启动一个协程一直激活当前key,如果当前服务挂了,key就会在租期结束后查询不到了
if err != nil {
//glog.Errorf("activeCurrentWorkerNode error:%v", err)
return 0, err
}
return int64(validWorkNodeNum), nil
}
//InitSnowFlake ...
func InitSnowFlake(config *clientv3.Config, serviceName string) (*Node, error) {
currentWorkNodeNum, err := GenerateSnowFlakeWorkerID(config, serviceName)
if err != nil {
return nil, err
}
instance, err := NewNode(currentWorkNodeNum)
if err != nil {
//glog.Errorf("create snowflake error:%v", err)
return nil, err
}
return instance, nil
}
func activeCurrentWorkerNode(client *clientv3.Client, workerID int, workerNodeDir string) error {
lease := clientv3.NewLease(client)
//glog.V(8).Infof("active currerntNode : %v", workerID)
ctx, cancel := context.WithTimeout(context.Background(), businessTimeout)
defer cancel()
leaseRes, err := lease.Grant(ctx, leaseTime)
if err != nil {
//glog.Errorf("Grant error:%v", err)
return err
}
key := fmt.Sprintf("%v/%v", workerNodeDir, workerID)
val := fmt.Sprintf("%v", workerID)
ctx, cancel = context.WithTimeout(context.Background(), businessTimeout)
defer cancel()
_, err = client.Put(ctx, key, val, clientv3.WithLease(leaseRes.ID))
if err != nil {
//glog.Errorf("Put error:%v", err)
return err
}
//glog.V(8).Infof("activeCurrentWorkerNode key:%v val:%v success!", key, val)
leaseRespChan, err := client.KeepAlive(context.Background(), leaseRes.ID)
if err != nil {
//glog.Errorf("keepalive error:%v", err)
return err
}
go func() {
for _ = range leaseRespChan {
//glog.V(10).Infof("续约成功 %v", leaseKeepResp)
}
//glog.V(10).Infof("关闭续租")
}()
return nil
}