-
Notifications
You must be signed in to change notification settings - Fork 5
/
monitor.go
107 lines (101 loc) · 3.32 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package chains
import (
"context"
"strings"
"time"
"github.com/celer-network/goutils/eth/mon2"
"github.com/celer-network/goutils/log"
"github.com/celer-network/im-executor/alert"
"github.com/celer-network/im-executor/contracts"
"github.com/celer-network/im-executor/dal"
"github.com/celer-network/im-executor/sgn-v2/eth"
"github.com/celer-network/im-executor/types"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
)
func (c *Chain) startMonitoringEvents(filters contracts.ReceiverContracts) {
c.monitorBusExecuted(filters)
}
func (c *Chain) monitorBusExecuted(filters contracts.ReceiverContracts) {
addrConfig := mon2.PerAddrCfg{
Addr: c.MessageBus.Address,
ChkIntv: time.Minute,
AbiStr: eth.MessageBusABI,
}
msgbusABI, err := abi.JSON(strings.NewReader(eth.MessageBusABI))
if err != nil {
log.Fatalf("failed to parse MessageBus ABI")
}
evExecuted, ok := msgbusABI.Events["Executed"]
if !ok {
log.Fatalf("failed to get Executed event MessageBus ABI")
}
filters = filters.GetByChain(c.ChainID)
if len(filters) > 0 {
// first topic would be a signature of the event
// while we switch eventName later, so we don't filter event signature here.
addrs := filters.AddressHashes(c.ChainID)
addrConfig.Topics = [][]common.Hash{{evExecuted.ID}, addrs}
} else {
log.Infof("no address filter for chain %d, stop monitoring MessageBus Executed on this chain", c.ChainID)
return
}
log.Infof("monitoring MessageBus Executed on chain %d with address filters %#s", c.ChainID, addrConfig.Topics[1])
db := dal.GetDB()
go c.monitor2.MonAddr(addrConfig, func(_ string, eLog ethtypes.Log) {
e, err := c.MessageBus.ParseExecuted(eLog)
if err != nil {
log.Errorln("cannot parse event Executed", err)
return
}
log.Infof("monitorBusExecuted: got event Executed %v", e)
status, err := types.NewExecutionStatus(e.Status)
if err != nil {
log.Errorln("monitorBusExecuted: ", err)
return
}
err = db.UpdateStatus(e.MsgId[:], status)
if err != nil {
log.Errorf("failed to update execution_context %x: %v", e.MsgId[:], err)
}
})
}
func (c *Chain) startMonitoringBalance(signers []eth.Addr) {
alertConfig := alert.GetConfig()
if alertConfig == nil || len(alertConfig.LowGasThresholds) == 0 {
log.Infof("opt out of low gas monitoring because alert.low_gas_thresholds for chain %d is not configured", c.ChainID)
return
}
for _, addr := range signers {
log.Infof("monitoring native token balance on chain %d at %s", c.ChainID, addr)
localAddr := addr
go func() {
t := time.NewTicker(types.BalanceCheckInterval)
defer t.Stop()
for {
select {
case <-c.balAlertCtl:
log.Infof("Terminated monitor for native token balance on chain %d at %s", c.ChainID, localAddr)
return
case <-t.C:
c.checkSignerBalanceThenAlert(alertConfig, localAddr)
}
}
}()
}
}
func (c *Chain) checkSignerBalanceThenAlert(config *alert.AlertConfig, addr eth.Addr) {
threshold, found := config.GetLowGasThreshold(c.ChainID)
if !found {
return
}
balance, err := c.EthClient.PendingBalanceAt(context.Background(), addr)
if err != nil {
log.Errorf("failed to query pending balance at %s: %s", addr, err.Error())
return
}
if balance.Cmp(threshold) < 0 {
alert.LowGasAlert(c.ChainID, addr, balance, threshold)
}
}