/
event.go
98 lines (86 loc) · 2.77 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package service
import (
"encoding/json"
"time"
"github.com/IBM/fablet/api"
"github.com/gorilla/websocket"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/pkg/errors"
)
// EventReq use fields instead of anonlymous fields, to have a more clear structure.
type EventReq struct {
BaseRequest
ChannelID string `json:"channelID"`
}
// BlockEventResult block event result
type BlockEventResult struct {
Number uint64 `json:"number"`
TXNumber int `json:"TXNumber"`
UpdateTime int64 `json:"updateTime"`
SourceURL string `json:"sourceURL"`
}
const (
// WSPingInterval interval of ping
WSPingInterval = time.Second * 10
// WSWriteDeadline deadline time of write
WSWriteDeadline = time.Second * 10
)
// HandleBlockEvent handle event
func HandleBlockEvent(wsConn *websocket.Conn) error {
logger.Info("Service HandleBlockEvent")
reqBody := &EventReq{}
if err := wsConn.ReadJSON(reqBody); err != nil {
return err
}
conn, err := getConnOfReq(reqBody.GetReqConn(), true)
if err != nil {
return err
}
eventChan := make(chan *fab.FilteredBlockEvent, 1)
closeChan := make(chan int, 1)
eventCloseChan := make(chan int, 1)
go api.MonitorBlockEvent(conn, reqBody.ChannelID, eventChan, closeChan, eventCloseChan)
pingTicker := time.NewTicker(WSPingInterval)
defer func() {
logger.Info("Service HandleBlockEvent end")
// TODO closeChan might be a little later, so then MonitorBlockEvent will ends a little later.
closeChan <- 0
pingTicker.Stop()
}()
for {
select {
case event := <-eventChan:
logger.Debugf("Get an event from %s.", event.SourceURL)
// wsConn.SetWriteDeadline(time.Now().Add(WSWriteDeadline))
if wsConn == nil {
return errors.Errorf("Websocket connection is nil.")
}
if event == nil {
return errors.Errorf("Event is nil")
}
// TODO In fact, we don't know the block time now.
blockEventRes := BlockEventResult{
Number: event.FilteredBlock.GetNumber(),
TXNumber: len(event.FilteredBlock.GetFilteredTransactions()),
UpdateTime: time.Now().UnixNano() / 1000000,
SourceURL: event.SourceURL,
}
resultJSON, _ := json.Marshal(blockEventRes)
if err := wsConn.WriteMessage(websocket.TextMessage, resultJSON); err != nil {
return errors.WithMessage(err, "Error of write event data.")
}
case <-pingTicker.C:
// logger.Debug("Ping................")
// wsConn.SetWriteDeadline(time.Now().Add(time.Second * 3))
// TODO ping is not enough, there always be 60 seconds later after connection close.
if wsConn == nil {
return errors.Errorf("Websocket connection is nil.")
}
if err := wsConn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return errors.WithMessage(err, "Error of write websocket ping.")
}
case <-eventCloseChan:
return nil
}
}
}