-
Notifications
You must be signed in to change notification settings - Fork 0
/
metrics.go
127 lines (113 loc) · 6.09 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright (c) 2018 The MATRIX Authors
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php
package man
import (
"github.com/MatrixAINetwork/go-matrix/metrics"
"github.com/MatrixAINetwork/go-matrix/p2p"
)
var (
propTxnInPacketsMeter = metrics.NewRegisteredMeter("man/prop/txns/in/packets", nil)
propTxnInTrafficMeter = metrics.NewRegisteredMeter("man/prop/txns/in/traffic", nil)
propTxnOutPacketsMeter = metrics.NewRegisteredMeter("man/prop/txns/out/packets", nil)
propTxnOutTrafficMeter = metrics.NewRegisteredMeter("man/prop/txns/out/traffic", nil)
propHashInPacketsMeter = metrics.NewRegisteredMeter("man/prop/hashes/in/packets", nil)
propHashInTrafficMeter = metrics.NewRegisteredMeter("man/prop/hashes/in/traffic", nil)
propHashOutPacketsMeter = metrics.NewRegisteredMeter("man/prop/hashes/out/packets", nil)
propHashOutTrafficMeter = metrics.NewRegisteredMeter("man/prop/hashes/out/traffic", nil)
propBlockInPacketsMeter = metrics.NewRegisteredMeter("man/prop/blocks/in/packets", nil)
propBlockInTrafficMeter = metrics.NewRegisteredMeter("man/prop/blocks/in/traffic", nil)
propBlockOutPacketsMeter = metrics.NewRegisteredMeter("man/prop/blocks/out/packets", nil)
propBlockOutTrafficMeter = metrics.NewRegisteredMeter("man/prop/blocks/out/traffic", nil)
reqHeaderInPacketsMeter = metrics.NewRegisteredMeter("man/req/headers/in/packets", nil)
reqHeaderInTrafficMeter = metrics.NewRegisteredMeter("man/req/headers/in/traffic", nil)
reqHeaderOutPacketsMeter = metrics.NewRegisteredMeter("man/req/headers/out/packets", nil)
reqHeaderOutTrafficMeter = metrics.NewRegisteredMeter("man/req/headers/out/traffic", nil)
reqBodyInPacketsMeter = metrics.NewRegisteredMeter("man/req/bodies/in/packets", nil)
reqBodyInTrafficMeter = metrics.NewRegisteredMeter("man/req/bodies/in/traffic", nil)
reqBodyOutPacketsMeter = metrics.NewRegisteredMeter("man/req/bodies/out/packets", nil)
reqBodyOutTrafficMeter = metrics.NewRegisteredMeter("man/req/bodies/out/traffic", nil)
reqStateInPacketsMeter = metrics.NewRegisteredMeter("man/req/states/in/packets", nil)
reqStateInTrafficMeter = metrics.NewRegisteredMeter("man/req/states/in/traffic", nil)
reqStateOutPacketsMeter = metrics.NewRegisteredMeter("man/req/states/out/packets", nil)
reqStateOutTrafficMeter = metrics.NewRegisteredMeter("man/req/states/out/traffic", nil)
reqReceiptInPacketsMeter = metrics.NewRegisteredMeter("man/req/receipts/in/packets", nil)
reqReceiptInTrafficMeter = metrics.NewRegisteredMeter("man/req/receipts/in/traffic", nil)
reqReceiptOutPacketsMeter = metrics.NewRegisteredMeter("man/req/receipts/out/packets", nil)
reqReceiptOutTrafficMeter = metrics.NewRegisteredMeter("man/req/receipts/out/traffic", nil)
miscInPacketsMeter = metrics.NewRegisteredMeter("man/misc/in/packets", nil)
miscInTrafficMeter = metrics.NewRegisteredMeter("man/misc/in/traffic", nil)
miscOutPacketsMeter = metrics.NewRegisteredMeter("man/misc/out/packets", nil)
miscOutTrafficMeter = metrics.NewRegisteredMeter("man/misc/out/traffic", nil)
)
// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of
// accumulating the above defined metrics based on the data stream contents.
type meteredMsgReadWriter struct {
p2p.MsgReadWriter // Wrapped message stream to meter
version int // Protocol version to select correct meters
}
// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the
// metrics system is disabled, this function returns the original object.
func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter {
if !metrics.Enabled {
return rw
}
return &meteredMsgReadWriter{MsgReadWriter: rw}
}
// Init sets the protocol version used by the stream to know which meters to
// increment in case of overlapping message ids between protocol versions.
func (rw *meteredMsgReadWriter) Init(version int) {
rw.version = version
}
func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) {
// Read the message and short circuit in case of an error
msg, err := rw.MsgReadWriter.ReadMsg()
if err != nil {
return msg, err
}
// Account for the data traffic
packets, traffic := miscInPacketsMeter, miscInTrafficMeter
switch {
case msg.Code == BlockHeadersMsg:
packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter
case msg.Code == BlockBodiesMsg:
packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter
case rw.version >= man63 && msg.Code == NodeDataMsg:
packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter
case rw.version >= man63 && msg.Code == ReceiptsMsg:
packets, traffic = reqReceiptInPacketsMeter, reqReceiptInTrafficMeter
case msg.Code == NewBlockHashesMsg:
packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter
case msg.Code == NewBlockMsg:
packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter
case msg.Code == TxMsg:
packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter
}
packets.Mark(1)
traffic.Mark(int64(msg.Size))
return msg, err
}
func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error {
// Account for the data traffic
packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter
switch {
case msg.Code == BlockHeadersMsg:
packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter
case msg.Code == BlockBodiesMsg:
packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter
case rw.version >= man63 && msg.Code == NodeDataMsg:
packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter
case rw.version >= man63 && msg.Code == ReceiptsMsg:
packets, traffic = reqReceiptOutPacketsMeter, reqReceiptOutTrafficMeter
case msg.Code == NewBlockHashesMsg:
packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter
case msg.Code == NewBlockMsg:
packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter
case msg.Code == TxMsg:
packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter
}
packets.Mark(1)
traffic.Mark(int64(msg.Size))
// Send the packet to the p2p layer
return rw.MsgReadWriter.WriteMsg(msg)
}