-
Notifications
You must be signed in to change notification settings - Fork 3
/
timeouttrigger.go
78 lines (69 loc) · 1.75 KB
/
timeouttrigger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package chunkinfo
import (
"github.com/FavorLabs/favorX/pkg/boson"
"strings"
"sync"
"time"
)
const (
TimeOut = 30
Time = 5
)
// timeoutTrigger
type timeoutTrigger struct {
sync.RWMutex
// rootCid_overlay : Timestamp
trigger map[string]int64
}
func newTimeoutTrigger() *timeoutTrigger {
return &timeoutTrigger{trigger: make(map[string]int64)}
}
// updateTimeOutTrigger
func (tt *timeoutTrigger) updateTimeOutTrigger(rootCid, overlay []byte) {
tt.Lock()
defer tt.Unlock()
key := boson.NewAddress(rootCid).String() + "_" + boson.NewAddress(overlay).String()
tt.trigger[key] = time.Now().Unix()
}
// removeTimeOutTrigger
func (tt *timeoutTrigger) removeTimeOutTrigger(rootCid, overlay boson.Address) {
tt.Lock()
defer tt.Unlock()
key := rootCid.String() + "_" + overlay.String()
delete(tt.trigger, key)
}
// getTimeOutRootCidAndNode
func (tt *timeoutTrigger) getTimeOutRootCidAndNode() (boson.Address, boson.Address) {
tt.RLock()
defer tt.RUnlock()
for k, t := range tt.trigger {
if t+TimeOut <= time.Now().Unix() {
arr := strings.Split(k, "_")
return boson.MustParseHexAddress(arr[0]), boson.MustParseHexAddress(arr[1])
}
}
return boson.ZeroAddress, boson.ZeroAddress
}
// triggerTimeOut
func (ci *ChunkInfo) triggerTimeOut() {
t := time.NewTicker(Time * time.Second)
go func() {
for {
<-t.C
rootCid, overlay := ci.timeoutTrigger.getTimeOutRootCidAndNode()
if !rootCid.Equal(boson.ZeroAddress) {
q := ci.getQueue(rootCid.String())
if q != nil {
msgChan, ok := ci.syncMsg.Load(rootCid.String())
if ok {
msgChan.(chan bool) <- false
}
q.popNode(Pulling, overlay.Bytes())
q.push(UnPull, overlay.Bytes())
} else {
ci.timeoutTrigger.removeTimeOutTrigger(rootCid, overlay)
}
}
}
}()
}