-
Notifications
You must be signed in to change notification settings - Fork 0
/
gtimer_entry.go
194 lines (175 loc) · 5.51 KB
/
gtimer_entry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// Copyright GoFrame Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gtimer
import (
"time"
"github.com/gogf/gf/container/gtype"
)
// Entry is the timing job entry to wheel.
type Entry struct {
wheel *wheel // Belonged wheel.
job JobFunc // The job function.
singleton *gtype.Bool // Singleton mode.
status *gtype.Int // Job status.
times *gtype.Int // Limit running times.
create int64 // Timer ticks when the job installed.
interval int64 // The interval ticks of the job.
createMs int64 // The timestamp in milliseconds when job installed.
intervalMs int64 // The interval milliseconds of the job.
rawIntervalMs int64 // Raw input interval in milliseconds.
}
// JobFunc is the job function.
type JobFunc = func()
// addEntry adds a timing job to the wheel.
func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry {
if times <= 0 {
times = defaultTimes
}
var (
ms = interval.Nanoseconds() / 1e6
num = ms / w.intervalMs
)
if num == 0 {
// If the given interval is lesser than the one of the wheel,
// then sets it to one tick, which means it will be run in one interval.
num = 1
}
nowMs := time.Now().UnixNano() / 1e6
ticks := w.ticks.Val()
entry := &Entry{
wheel: w,
job: job,
times: gtype.NewInt(times),
status: gtype.NewInt(status),
create: ticks,
interval: num,
singleton: gtype.NewBool(singleton),
createMs: nowMs,
intervalMs: ms,
rawIntervalMs: ms,
}
// Install the job to the list of the slot.
w.slots[(ticks+num)%w.number].PushBack(entry)
return entry
}
// addEntryByParent adds a timing job with parent entry.
func (w *wheel) addEntryByParent(interval int64, parent *Entry) *Entry {
num := interval / w.intervalMs
if num == 0 {
num = 1
}
nowMs := time.Now().UnixNano() / 1e6
ticks := w.ticks.Val()
entry := &Entry{
wheel: w,
job: parent.job,
times: parent.times,
status: parent.status,
create: ticks,
interval: num,
singleton: parent.singleton,
createMs: nowMs,
intervalMs: interval,
rawIntervalMs: parent.rawIntervalMs,
}
w.slots[(ticks+num)%w.number].PushBack(entry)
return entry
}
// Status returns the status of the job.
func (entry *Entry) Status() int {
return entry.status.Val()
}
// SetStatus custom sets the status for the job.
func (entry *Entry) SetStatus(status int) int {
return entry.status.Set(status)
}
// Start starts the job.
func (entry *Entry) Start() {
entry.status.Set(StatusReady)
}
// Stop stops the job.
func (entry *Entry) Stop() {
entry.status.Set(StatusStopped)
}
//Reset reset the job.
func (entry *Entry) Reset() {
entry.status.Set(StatusReset)
}
// Close closes the job, and then it will be removed from the timer.
func (entry *Entry) Close() {
entry.status.Set(StatusClosed)
}
// IsSingleton checks and returns whether the job in singleton mode.
func (entry *Entry) IsSingleton() bool {
return entry.singleton.Val()
}
// SetSingleton sets the job singleton mode.
func (entry *Entry) SetSingleton(enabled bool) {
entry.singleton.Set(enabled)
}
// SetTimes sets the limit running times for the job.
func (entry *Entry) SetTimes(times int) {
entry.times.Set(times)
}
// Run runs the job.
func (entry *Entry) Run() {
entry.job()
}
// check checks if the job should be run in given ticks and timestamp milliseconds.
func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool) {
switch entry.status.Val() {
case StatusStopped:
return false, true
case StatusClosed:
return false, false
case StatusReset:
return false, true
}
// Firstly checks using the ticks, this may be low precision as one tick is a little bit long.
if diff := nowTicks - entry.create; diff > 0 && diff%entry.interval == 0 {
// If not the lowest level wheel.
if entry.wheel.level > 0 {
diffMs := nowMs - entry.createMs
switch {
// Add it to the next slot, which means it will run on next interval.
case diffMs < entry.wheel.timer.intervalMs:
entry.wheel.slots[(nowTicks+entry.interval)%entry.wheel.number].PushBack(entry)
return false, false
// Normal rolls on the job.
case diffMs >= entry.wheel.timer.intervalMs:
// Calculate the leftover milliseconds,
// if it is greater than the minimum interval, then re-install it.
if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.timer.intervalMs {
// Re-calculate and re-installs the job proper slot.
entry.wheel.timer.doAddEntryByParent(leftMs, entry)
return false, false
}
}
}
// Singleton mode check.
if entry.IsSingleton() {
// Note that it is atomic operation to ensure concurrent safety.
if entry.status.Set(StatusRunning) == StatusRunning {
return false, true
}
}
// Limit running times.
times := entry.times.Add(-1)
if times <= 0 {
// Note that it is atomic operation to ensure concurrent safety.
if entry.status.Set(StatusClosed) == StatusClosed || times < 0 {
return false, false
}
}
// This means it does not limit the running times.
// I know it's ugly, but it is surely high performance for running times limit.
if times < 2000000000 && times > 1000000000 {
entry.times.Set(defaultTimes)
}
return true, true
}
return false, true
}