-
Notifications
You must be signed in to change notification settings - Fork 47
/
statcollector.go
124 lines (103 loc) · 2.87 KB
/
statcollector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT License was not distributed with this
// file, you can obtain one at https://opensource.org/licenses/MIT.
//
// Copyright (c) DUSK NETWORK. All rights reserved.
package diagnostics
import (
"bytes"
"encoding/hex"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
j "encoding/json"
cfg "github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
"github.com/dusk-network/dusk-crypto/hash"
)
var _, enableCollecting = os.LookupEnv("DUSK_ENABLE_TPS_TEST")
const (
// serviceURL is address of the external network-data-collector.
serviceURL = "http://localhost:1337/rpc"
)
// RegisterWireMsg is a util for registering a wire message to a network-monitor.
// service in a zero-overhead manner. Under the hood, it encodes a json-rpc with following fields
// msg_id,
// node_address (or node id),
// receive timestamp in,
// networkType is the type of the network (Kadcst, Gossip are allowed valued),
// rawdata is the wire message data.
func RegisterWireMsg(networkType string, rawdata []byte) {
// Record receive time before running the go-routine sender
recv_at := time.Now().UnixNano()
if !enableCollecting {
return
}
go func(recv_at int64) {
// extract message wire type from the message blob
buf := bytes.NewBuffer(rawdata)
category, err := topics.Extract(buf)
if err != nil {
return
}
// Check if this wire message type should be registered
if !canRegister(category) {
return
}
digest, err := hash.Xxhash(rawdata)
if err != nil {
panic(err)
}
// notify monitoring
addr := cfg.Get().Network.Port
msgID := networkType + "_" + category.String() + "_" + hex.EncodeToString(digest)
sendNote(msgID, addr, uint64(recv_at))
}(recv_at)
}
// NoteArgs rpc request args.
type NoteArgs struct {
MsgID, Addr, MsgType string
Recv_at uint64
}
// NoteRequest rpc request.
type NoteRequest struct {
Method string `json:"method"`
Params []NoteArgs `json:"params"`
ID string `json:"id"`
}
// Say rpc method.
func (h *NoteRequest) Say(p NoteArgs) *NoteRequest {
h.Method = "NoteService.Say"
h.Params = []NoteArgs{p}
h.ID = "1"
return h
}
func sendNote(msgID, addr string, recv_at uint64) {
r := &NoteRequest{}
if b, err := j.Marshal(r.Say(NoteArgs{MsgID: msgID, Addr: addr, Recv_at: recv_at})); err != nil {
log.Fatal(err)
} else {
log.Printf("json %s", b)
if res, err := http.Post(serviceURL, "application/json;charset=UTF-8", strings.NewReader(string(b))); err != nil {
log.Fatal(err)
} else {
// log.Printf("res : %v", res)
_, err := ioutil.ReadAll(res.Body)
_ = res.Body.Close()
if err != nil {
log.Fatal(err)
}
}
}
}
func canRegister(category topics.Topic) bool {
switch category {
case topics.Tx:
// /topics.Block:
return true
}
return false
}