/
user_event.go
123 lines (99 loc) · 2.95 KB
/
user_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package sdk
import (
"fmt"
"github.com/blockchain-jd-com/framework-go/crypto/framework"
"github.com/blockchain-jd-com/framework-go/ledger_model"
"time"
)
/*
* Author: imuge
* Date: 2020/7/3 下午3:29
*/
type UserEventPoint struct {
EventAccount string // 事件账户地址
EventName string // 事件名
Sequence int64 // 序列
}
func NewUserEventPoint(eventAccount, eventName string, sequence int64) UserEventPoint {
return UserEventPoint{
EventAccount: eventAccount,
EventName: eventName,
Sequence: sequence,
}
}
// 事件监听器
type UserEventListener interface {
OnEvent(event *ledger_model.Event, context *UserEventContext)
}
type UserEventContext struct {
LedgerHash *framework.HashDigest
EventHandler *UserEventListenerHandle
}
func NewUserEventContext(ledgerHash *framework.HashDigest, eventHandler *UserEventListenerHandle) *UserEventContext {
return &UserEventContext{
LedgerHash: ledgerHash,
EventHandler: eventHandler,
}
}
// 事件监听处理器
type UserEventListenerHandle struct {
queryService ledger_model.BlockchainQueryService // 区块链查询器
ledgerHash *framework.HashDigest // 账本hash
eventPoints []UserEventPoint // 监听事件列表
listener UserEventListener // 事件监听器
eventSequences map[string]int64 // 事件当前监听起始序号
stop bool
}
func NewUserEventListenerHandle(queryService ledger_model.BlockchainQueryService, ledgerHash *framework.HashDigest, eventPoints []UserEventPoint, listener UserEventListener) UserEventListenerHandle {
// init event sequences
eventSequences := make(map[string]int64, len(eventPoints))
for _, point := range eventPoints {
eventSequences[point.EventAccount+point.EventName] = point.Sequence
}
handler := UserEventListenerHandle{
queryService: queryService,
ledgerHash: ledgerHash,
eventPoints: eventPoints,
listener: listener,
eventSequences: eventSequences,
}
// start events request
go handler.start()
return handler
}
func (h *UserEventListenerHandle) EventPoints() []UserEventPoint {
return h.eventPoints
}
func (h *UserEventListenerHandle) Cancel() {
h.stop = true
}
func (h *UserEventListenerHandle) start() {
for !h.stop {
// 每隔一秒监听一次
time.Sleep(time.Second)
h.loadEvents()
}
}
func (h *UserEventListenerHandle) loadEvents() {
for _, point := range h.eventPoints {
startSequence := h.eventSequences[point.EventAccount+point.EventName]
if startSequence < 0 {
startSequence = 0
}
events, err := h.queryService.GetUserEvents(h.ledgerHash, point.EventAccount, point.EventName, startSequence, 10)
if err != nil {
fmt.Println(err)
break
}
maxSequence := startSequence
for _, event := range events {
if event.Sequence > maxSequence {
maxSequence = event.Sequence
}
h.listener.OnEvent(event, NewUserEventContext(h.ledgerHash, h))
}
if len(events) > 0 {
h.eventSequences[point.EventAccount+point.EventName] = maxSequence + 1
}
}
}