/
event.go
107 lines (91 loc) · 2.79 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package delaytrigger
import (
"fmt"
"time"
redigoplus "github.com/cheetah-fun-gs/goplus/dao/redigo"
redigo "github.com/gomodule/redigo/redis"
)
// EventStatus 事件状态
type EventStatus int
// 常量
const (
EventStatusDisable EventStatus = iota // 已关闭
EventStatusActived // 已激活
EventStatusFinished // 已完成
)
// Event 事件数据
type Event struct {
ID string `json:"id,omitempty"` // ID
Status EventStatus `json:"status,omitempty"` // 状态
TriggerTs int64 `json:"trigger_ts,omitempty"` // 触发时间戳
Data string `json:"data,omitempty"` // 事件数据
IsKeep bool `json:"is_keep,omitempty"` // 目标处理完毕后是否保留事件, 不保留会自动反注册
}
// EventRegister 注册自定义事件 isCover 是否覆盖
func (trigger *DelayTrigger) EventRegister(event *Event, isCover bool) error {
if event.Status == EventStatusFinished {
return fmt.Errorf("Status is finished")
}
now := time.Now()
if event.Status == EventStatusActived && event.TriggerTs > now.Unix()+triggerTsMax {
return fmt.Errorf("TriggerTs is too later")
}
conn := trigger.pool.Get()
defer conn.Close()
var err error
key := trigger.getTriggerKey()
if isCover {
err = redigoplus.HSet(conn, key, event.ID, event)
} else {
_, err = redigoplus.HSetNX(conn, key, event.ID, event)
}
return err
}
// EventRegisterTimer 注册定时触发事件 isCover 是否覆盖
func (trigger *DelayTrigger) EventRegisterTimer(eventID string, triggerTs int64, eventData string, isCover bool) error {
now := time.Now()
if triggerTs > now.Unix()+triggerTsMax || triggerTs < now.Unix()+triggerTsMin {
return fmt.Errorf("TriggerTs is our range")
}
event := &Event{
ID: eventID,
TriggerTs: triggerTs,
Data: eventData,
Status: EventStatusActived,
}
return trigger.EventRegister(event, isCover)
}
// EventUnregister 取消注册事件 清除改事件的所有信息
func (trigger *DelayTrigger) EventUnregister(eventID string) error {
conn := trigger.pool.Get()
defer conn.Close()
key := trigger.getTriggerKey()
eventKey := trigger.getEventKey(eventID)
if err := conn.Send("DEL", eventKey); err != nil {
return err
}
if err := conn.Send("HDEL", key, eventID); err != nil {
return err
}
if err := conn.Flush(); err != nil {
return err
}
if _, err := conn.Receive(); err != nil {
return err
}
if _, err := conn.Receive(); err != nil {
return err
}
return nil
}
// EventExists 是否存在事件
func (trigger *DelayTrigger) EventExists(eventID string) (bool, error) {
conn := trigger.pool.Get()
defer conn.Close()
key := trigger.getTriggerKey()
ok, err := redigo.Int(conn.Do("HEXISTS", key, eventID))
if err != nil {
return false, err
}
return ok > 0, nil
}