-
Notifications
You must be signed in to change notification settings - Fork 30
/
shard_follow_progress.go
132 lines (106 loc) · 4.17 KB
/
shard_follow_progress.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package store
import (
"context"
"fmt"
"github.com/chrislusf/glog"
"github.com/chrislusf/vasto/topology"
"github.com/chrislusf/vasto/util"
)
type progressKey struct {
shardId VastoShardId
serverAdminAddress string
}
type progressValue struct {
segment uint32
offset uint64
}
type followProcess struct {
cancelFunc context.CancelFunc
}
var (
// VastoInternalKeyPrefix is a reserved key prefix for Vasto internal meta data
VastoInternalKeyPrefix = []byte("_vasto.")
)
func genSegmentOffsetKeys(serverAdminAddress string, shardId VastoShardId) (segmentKey []byte, offsetKey []byte) {
segmentKey = []byte(fmt.Sprintf("%snext.segment.%s.%d", VastoInternalKeyPrefix, serverAdminAddress, shardId))
offsetKey = []byte(fmt.Sprintf("%snext.offset.%s.%d", VastoInternalKeyPrefix, serverAdminAddress, shardId))
return
}
// implementing PeriodicTask
func (s *shard) EverySecond() {
// glog.V(2).Infof("%s every second", s)
s.followProcessesLock.Lock()
for pk, pv := range s.followProgress {
if segment, offset, hasProgress, err := s.loadProgress(pk.serverAdminAddress, pk.shardId); err == nil {
if !hasProgress || segment != pv.segment || offset != pv.offset {
s.saveProgress(pk.serverAdminAddress, pk.shardId, pv.segment, pv.offset)
}
}
}
s.followProcessesLock.Unlock()
}
func (s *shard) loadProgress(serverAdminAddress string, targetShardId VastoShardId) (segment uint32, offset uint64, hasProgress bool, err error) {
segmentKey, offsetKey := genSegmentOffsetKeys(serverAdminAddress, targetShardId)
nextSegment := uint32(0)
nextOffset := uint64(0)
t, err := s.db.Get(segmentKey)
if err == nil && len(t) > 0 {
hasProgress = true
nextSegment = util.BytesToUint32(t)
}
t, err = s.db.Get(offsetKey)
if err == nil && len(t) > 0 {
hasProgress = true
nextOffset = util.BytesToUint64(t)
}
return nextSegment, nextOffset, hasProgress, err
}
func (s *shard) saveProgress(serverAdminAddress string, targetShardId VastoShardId, segment uint32, offset uint64) (err error) {
glog.V(1).Infof("shard %s follow server %v shard %d next segment %d offset %d", s, serverAdminAddress, targetShardId, segment, offset)
segmentKey, offsetKey := genSegmentOffsetKeys(serverAdminAddress, targetShardId)
err = s.db.Put(segmentKey, util.Uint32toBytes(segment))
if err != nil {
return fmt.Errorf("setting %v = %d %d: %v", string(segmentKey), segment, offset, err)
}
err = s.db.Put(offsetKey, util.Uint64toBytes(offset))
if err != nil {
return fmt.Errorf("setting %v = %d %d: %v", string(offsetKey), segment, offset, err)
}
return nil
}
func (s *shard) clearProgress(serverAdminAddress string, targetShardId VastoShardId) {
glog.V(1).Infof("shard %s stops following server %v.%d", s, serverAdminAddress, targetShardId)
segmentKey, offsetKey := genSegmentOffsetKeys(serverAdminAddress, targetShardId)
s.db.Delete(segmentKey)
s.db.Delete(offsetKey)
}
func (s *shard) isFollowing(peer topology.ClusterShard) bool {
s.followProcessesLock.Lock()
_, found := s.followProcesses[peer]
s.followProcessesLock.Unlock()
return found
}
func (s *shard) startFollowProcess(peer topology.ClusterShard, cancelFunc context.CancelFunc) {
s.followProcessesLock.Lock()
s.followProcesses[peer] = &followProcess{cancelFunc: cancelFunc}
s.followProcessesLock.Unlock()
}
func (s *shard) insertInMemoryFollowProgress(serverAdminAddress string, targetShardId VastoShardId, segment uint32, offset uint64) {
s.followProgressLock.Lock()
s.followProgress[progressKey{targetShardId, serverAdminAddress}] = progressValue{segment, offset}
s.followProgressLock.Unlock()
}
func (s *shard) updateInMemoryFollowProgressIfPresent(serverAdminAddress string, targetShardId VastoShardId, segment uint32, offset uint64) (found bool) {
s.followProgressLock.Lock()
_, found = s.followProgress[progressKey{targetShardId, serverAdminAddress}]
if found {
s.followProgress[progressKey{targetShardId, serverAdminAddress}] = progressValue{segment, offset}
}
s.followProgressLock.Unlock()
return found
}
func (s *shard) deleteInMemoryFollowProgress(serverAdminAddress string, targetShardId VastoShardId) {
s.followProgressLock.Lock()
delete(s.followProgress, progressKey{targetShardId, serverAdminAddress})
s.followProgressLock.Unlock()
}