/
storagehandler.go
101 lines (88 loc) · 2.78 KB
/
storagehandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package eth
import (
"errors"
"github.com/DxChainNetwork/godx/log"
"github.com/DxChainNetwork/godx/p2p"
"github.com/DxChainNetwork/godx/storage"
"github.com/DxChainNetwork/godx/storage/storagehost"
)
func (pm *ProtocolManager) hostConfigMsgHandler(p *peer, configMsg p2p.Msg) error {
// avoid multiple host config request calls attack
// generate too many go routines and used all resources
if err := p.HostConfigProcessing(); err != nil {
return err
}
// start the go routine, handle the host config request
// once done, release the channel
go func() {
pm.wg.Add(1)
defer pm.wg.Done()
defer p.HostConfigProcessingDone()
config := pm.eth.storageHost.RetrieveExternalConfig()
if err := p.SendStorageHostConfig(config); err != nil {
p.TriggerError(err)
}
}()
return nil
}
func (pm *ProtocolManager) contractMsgHandler(p *peer, msg p2p.Msg) error {
// send the message to the hostContractMsg channel if the handler
// does not exist
select {
case p.hostContractMsg <- msg:
default:
err := errors.New("hostMsgSchedule error: message received before finishing the previous message handling")
log.Error("error handling hostContractMsg", "err", err.Error())
return err
}
return nil
}
func (pm *ProtocolManager) contractReqHandler(handler func(h *storagehost.StorageHost, sp storage.Peer, msg p2p.Msg), p *peer, msg p2p.Msg) error {
// avoid continuously contract related requests attack
// generate too many go routines and used all resources
if err := p.HostContractProcessing(); err != nil {
// error is ignored intentionally. If error occurred,
// the client must wait until time out
_ = p.SendHostBusyHandleRequestErr()
return err
}
// start the go routine, handle the host contract request
// once done, release the channel
go func() {
pm.wg.Add(1)
defer pm.wg.Done()
defer p.HostContractProcessingDone()
handler(pm.eth.storageHost, p, msg)
}()
return nil
}
func (pm *ProtocolManager) ethMsgHandler(p *peer) {
// get the initial number of eth messages in the ethMsgBuffer
messages := p.GetEthMsgBuffer()
for {
// loop through the messages, handle each of them, and then
// update the eth message buffer
for _, msg := range messages {
if err := pm.handleEthMsg(p, msg); err != nil {
p.Log().Error("Ethereum handle message failed", "err", err.Error())
p.TriggerError(err)
}
// remove the message from the eth message buffer
p.UpdateEthMsgBuffer()
}
// waiting fro the start signal was sent, then update the
// eth message buffer
if err := pm.waitEthStartIndicator(p); err != nil {
return
}
messages = p.GetEthMsgBuffer()
}
}
func (pm *ProtocolManager) waitEthStartIndicator(p *peer) error {
select {
case <-p.ethStartIndicator:
return nil
case <-pm.quitSync:
return errors.New("protocol manager sync quit")
}
}