-
Notifications
You must be signed in to change notification settings - Fork 672
/
timeout_manager.go
142 lines (115 loc) · 2.84 KB
/
timeout_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package timer
import (
"container/list"
"sync"
"time"
"github.com/ava-labs/avalanchego/ids"
)
type timeout struct {
id ids.ID
handler func()
timer time.Time
}
// TimeoutManager is a manager for timeouts.
type TimeoutManager struct {
lock sync.Mutex
duration time.Duration // Amount of time before a timeout
timeoutMap map[ids.ID]*list.Element
timeoutList *list.List
timer *Timer // Timer that will fire to clear the timeouts
}
// Initialize is a constructor b/c Golang, in its wisdom, doesn't ... have them?
func (tm *TimeoutManager) Initialize(duration time.Duration) {
tm.duration = duration
tm.timeoutMap = make(map[ids.ID]*list.Element)
tm.timeoutList = list.New()
tm.timer = NewTimer(tm.Timeout)
}
func (tm *TimeoutManager) Dispatch() {
tm.timer.Dispatch()
}
// Stop executing timeouts
func (tm *TimeoutManager) Stop() {
tm.timer.Stop()
}
// Put puts hash into the hash map
func (tm *TimeoutManager) Put(id ids.ID, handler func()) {
tm.lock.Lock()
defer tm.lock.Unlock()
tm.put(id, handler)
}
// Remove the item that no longer needs to be there.
func (tm *TimeoutManager) Remove(id ids.ID) {
tm.lock.Lock()
defer tm.lock.Unlock()
tm.remove(id)
}
// Timeout registers a timeout
func (tm *TimeoutManager) Timeout() {
tm.lock.Lock()
defer tm.lock.Unlock()
tm.timeout()
}
func (tm *TimeoutManager) timeout() {
timeBound := time.Now().Add(-tm.duration)
// removeExpiredHead returns false once there is nothing left to remove
for {
timeout := tm.removeExpiredHead(timeBound)
if timeout == nil {
break
}
// Don't execute a callback with a lock held
tm.lock.Unlock()
timeout()
tm.lock.Lock()
}
tm.registerTimeout()
}
func (tm *TimeoutManager) put(id ids.ID, handler func()) {
tm.remove(id)
tm.timeoutMap[id] = tm.timeoutList.PushBack(timeout{
id: id,
handler: handler,
timer: time.Now(),
})
if tm.timeoutList.Len() == 1 {
tm.registerTimeout()
}
}
func (tm *TimeoutManager) remove(id ids.ID) {
e, exists := tm.timeoutMap[id]
if !exists {
return
}
delete(tm.timeoutMap, id)
tm.timeoutList.Remove(e)
}
// Returns true if the head was removed, false otherwise
func (tm *TimeoutManager) removeExpiredHead(t time.Time) func() {
if tm.timeoutList.Len() == 0 {
return nil
}
e := tm.timeoutList.Front()
head := e.Value.(timeout)
headTime := head.timer
if headTime.Before(t) {
tm.remove(head.id)
return head.handler
}
return nil
}
func (tm *TimeoutManager) registerTimeout() {
if tm.timeoutList.Len() == 0 {
// There are no pending timeouts
tm.timer.Cancel()
return
}
e := tm.timeoutList.Front()
head := e.Value.(timeout)
timeBound := time.Now().Add(-tm.duration)
headTime := head.timer
duration := headTime.Sub(timeBound)
tm.timer.SetTimeoutIn(duration)
}