forked from OpenBazaar/openbazaar-go
/
inbound_message_scanner.go
123 lines (108 loc) · 3.22 KB
/
inbound_message_scanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package core
import (
"time"
"github.com/op/go-logging"
"gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
"github.com/OpenBazaar/openbazaar-go/net"
"github.com/OpenBazaar/openbazaar-go/pb"
"github.com/OpenBazaar/openbazaar-go/repo"
)
const (
scannerTestingInterval = time.Duration(1) * time.Minute
scannerRegularInterval = time.Duration(10) * time.Minute
)
type inboundMessageScanner struct {
// PerformTask dependencies
datastore repo.Datastore
service net.NetworkService
getHandler func(t pb.Message_MessageType) func(peer.ID, *pb.Message, interface{}) (*pb.Message, error)
extractID func([]byte) (*peer.ID, error)
broadcast chan repo.Notifier
// Worker-handling dependencies
intervalDelay time.Duration
logger *logging.Logger
watchdogTimer *time.Ticker
stopWorker chan bool
}
func peerIDExtractor(data []byte) (*peer.ID, error) {
i := peer.ID(data)
return &i, nil
}
// StartInboundMsgScanner - start the notifier
func (n *OpenBazaarNode) StartInboundMsgScanner() {
n.InboundMsgScanner = &inboundMessageScanner{
datastore: n.Datastore,
service: n.Service,
getHandler: n.Service.HandlerForMsgType,
extractID: peerIDExtractor,
broadcast: n.Broadcast,
intervalDelay: n.scannerIntervalDelay(),
logger: logging.MustGetLogger("inboundMessageScanner"),
}
go n.InboundMsgScanner.Run()
}
func (n *OpenBazaarNode) scannerIntervalDelay() time.Duration {
if n.TestnetEnable {
return scannerTestingInterval
}
return scannerRegularInterval
}
func (scanner *inboundMessageScanner) Run() {
scanner.watchdogTimer = time.NewTicker(scanner.intervalDelay)
scanner.stopWorker = make(chan bool)
// Run once on start, then wait for watchdog
scanner.PerformTask()
for {
select {
case <-scanner.watchdogTimer.C:
scanner.PerformTask()
case <-scanner.stopWorker:
scanner.watchdogTimer.Stop()
return
}
}
}
func (scanner *inboundMessageScanner) Stop() {
scanner.stopWorker <- true
close(scanner.stopWorker)
}
func (scanner *inboundMessageScanner) PerformTask() {
msgs, err := scanner.datastore.Messages().GetAllErrored()
if err != nil {
scanner.logger.Error(err)
return
}
for _, m := range msgs {
if m.MsgErr == ErrInsufficientFunds.Error() {
// Get handler for this msg type
handler := scanner.getHandler(pb.Message_MessageType(m.MessageType))
if handler == nil {
scanner.logger.Errorf("err fetching handler for msg: %v", pb.Message_MessageType(m.MessageType))
continue
}
i, err := scanner.extractID(m.PeerPubkey)
if err != nil {
scanner.logger.Errorf("Error processing message %s. Type %s: %s", m, m.MessageType, err.Error())
continue
}
msg := new(repo.Message)
if len(m.Message) > 0 {
err = msg.UnmarshalJSON(m.Message)
if err != nil {
scanner.logger.Errorf("Error processing message %s. Type %s: %s", m, m.MessageType, err.Error())
continue
}
}
// Dispatch handler
_, err = handler(*i, &msg.Msg, nil)
if err != nil {
scanner.logger.Errorf("%d handle message error from %s: %s", m.MessageType, m.PeerID, err)
continue
}
err = scanner.datastore.Messages().MarkAsResolved(m)
if err != nil {
scanner.logger.Errorf("marking message resolved: %s", err)
}
}
}
}