/
muxhandler.go
129 lines (121 loc) · 3.74 KB
/
muxhandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package peering
import (
"net"
"sync"
"github.com/MadBase/MadNet/constants"
"github.com/MadBase/MadNet/interfaces"
"github.com/MadBase/MadNet/logging"
pb "github.com/MadBase/MadNet/proto"
"github.com/MadBase/MadNet/types"
"github.com/MadBase/MadNet/utils"
"github.com/sirupsen/logrus"
)
// MuxHandler allows a P2PMuxConn to be converted into a bidirectional grpc
// client and server connection. The server side connection is injected into an
// exiting grpc server running the P2P service and the client is bound against
// a the P2P service.
type MuxHandler struct {
closeOnce sync.Once
ch *clientHandler
sh *ServerHandler
logger *logrus.Logger
}
// Close will shutdown the server handler.
func (rpcm *MuxHandler) Close() error {
fn := func() {
rpcm.ch.Close()
err := rpcm.sh.Close()
if err != nil {
utils.DebugTrace(rpcm.logger, err)
}
}
rpcm.closeOnce.Do(fn)
return nil
}
// HandleConnection binds the P2PMuxConn to a grpc client and server.
// Internally this method uses the Initiator() method to determine if it should
// run the client or server side handshake. The only object returned is the
// P2PClient. The server side connection is handed off to the grpc server.
// Both the client and the server side connections may be shut down using the
// original P2PMuxConn Close method.
func (rpcm *MuxHandler) HandleConnection(conn interfaces.P2PMuxConn) (interfaces.P2PClient, error) {
switch conn.Initiator() {
case types.SelfInitiatedConnection:
return rpcm.gRPCclientHandler(conn)
case types.PeerInitiatedConnection:
return rpcm.gRPCserverHandler(conn)
default:
panic("Unknown initiator in RPCHandshake")
}
}
func (rpcm *MuxHandler) gRPCserverHandler(conn interfaces.P2PMuxConn) (interfaces.P2PClient, error) {
//bind a client
rpcclientconn, err := rpcm.ch.HandleConnection(conn.ClientConn())
if err != nil {
utils.DebugTrace(rpcm.logger, err)
return nil, err
}
client := pb.NewP2PClient(rpcclientconn)
//submit connection to server
err = rpcm.sh.HandleConnection(conn.ServerConn())
if err != nil {
utils.DebugTrace(rpcm.logger, err)
return nil, err
}
c := &p2PClient{
logger: logging.GetLogger(constants.LoggerPeerMan),
P2PClientRaw: client,
nodeAddr: conn.NodeAddr(),
conn: conn,
}
c.consensusQueue, err = newMsgQueue(constants.ConsensusMsgQSize, constants.ConsensusMsgQWorkers, c)
if err != nil {
return nil, err
}
c.txQueue, err = newMsgQueue(constants.TxMsgQSize, constants.TxMsgQWorkers, c)
if err != nil {
return nil, err
}
return c, nil
}
func (rpcm *MuxHandler) gRPCclientHandler(conn interfaces.P2PMuxConn) (interfaces.P2PClient, error) {
//submit connection to server
err := rpcm.sh.HandleConnection(conn.ServerConn())
if err != nil {
utils.DebugTrace(rpcm.logger, err)
return nil, err
}
//bind a client
rpcclientconn, err := rpcm.ch.HandleConnection(conn.ClientConn())
if err != nil {
utils.DebugTrace(rpcm.logger, err)
return nil, err
}
client := pb.NewP2PClient(rpcclientconn)
c := &p2PClient{
logger: logging.GetLogger(constants.LoggerPeerMan),
P2PClientRaw: client,
nodeAddr: conn.NodeAddr(),
conn: conn,
}
c.consensusQueue, err = newMsgQueue(constants.ConsensusMsgQSize, constants.ConsensusMsgQWorkers, c)
if err != nil {
return nil, err
}
c.txQueue, err = newMsgQueue(constants.TxMsgQSize, constants.TxMsgQWorkers, c)
if err != nil {
return nil, err
}
return c, nil
}
// NewMuxServerHandler creates a new multiplexed grpc tunneling system for
// P2PMuxConn objects.
func NewMuxServerHandler(logger *logrus.Logger, addr net.Addr, service interfaces.P2PServer) *MuxHandler {
sh := newP2PServerHandler(logger, addr, service)
ch := newClientHandler()
return &MuxHandler{
ch: ch,
sh: sh,
logger: logger,
}
}