/
wrapper.go
136 lines (111 loc) · 3.39 KB
/
wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package meles
import (
"github.com/elliotcourant/timber"
"github.com/hashicorp/raft"
"net"
"time"
)
type transportWrapper interface {
NormalTransport() net.Listener
ForwardToRaft(net.Conn, error)
ForwardToRpc(net.Conn, error)
RaftTransport() transportInterface
RpcTransport() transportInterface
Port() int
Addr() net.Addr
Close()
SetNodeID(id raft.ServerID)
}
type accept struct {
conn net.Conn
err error
}
type transportWrapperItem struct {
listener transportInterface
acceptChannel chan accept
logger timber.Logger
closeCallback func()
}
func (t *transportWrapperItem) SendAccept(conn net.Conn, err error) {
t.acceptChannel <- accept{conn, err}
}
func (t *transportWrapperItem) Accept() (net.Conn, error) {
a := <-t.acceptChannel
return a.conn, a.err
}
func (t *transportWrapperItem) Close() error {
t.logger.Warningf("closing transport wrapper item")
// close(t.acceptChannel)
t.closeCallback()
return t.listener.Close()
}
func (t *transportWrapperItem) Addr() net.Addr {
return t.listener.Addr()
}
func (t *transportWrapperItem) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
t.logger.Warningf("dialing [%s] via transport wrapper item", address)
return t.listener.Dial(address, timeout)
}
type transportWrapperBase struct {
transport transportInterface
raftTransport *transportWrapperItem
rpcTransport *transportWrapperItem
}
func (wrapper *transportWrapperBase) SetNodeID(id raft.ServerID) {
wrapper.raftTransport.logger = wrapper.raftTransport.logger.Prefix(string(id))
wrapper.rpcTransport.logger = wrapper.rpcTransport.logger.Prefix(string(id))
}
func newTransportWrapperFromListener(listener net.Listener, logger timber.Logger) transportWrapper {
ln := newTransportFromListener(listener)
return newTransportWrapperEx(ln, logger)
}
func newTransportWrapperEx(listener transportInterface, logger timber.Logger) transportWrapper {
wrapper := &transportWrapperBase{
transport: listener,
raftTransport: &transportWrapperItem{
acceptChannel: make(chan accept, 0),
logger: logger,
},
rpcTransport: &transportWrapperItem{
acceptChannel: make(chan accept, 0),
logger: logger,
},
}
{
wrapper.raftTransport.closeCallback = wrapper.closeCallback
wrapper.raftTransport.listener = wrapper.transport
}
{
wrapper.rpcTransport.closeCallback = wrapper.closeCallback
wrapper.rpcTransport.listener = wrapper.transport
}
return wrapper
}
func (wrapper *transportWrapperBase) ForwardToRaft(conn net.Conn, err error) {
wrapper.raftTransport.SendAccept(conn, err)
}
func (wrapper *transportWrapperBase) ForwardToRpc(conn net.Conn, err error) {
wrapper.rpcTransport.SendAccept(conn, err)
}
func (wrapper *transportWrapperBase) closeCallback() {
wrapper.rpcTransport.logger.Verbosef("received close callback")
}
func (wrapper *transportWrapperBase) RaftTransport() transportInterface {
return wrapper.raftTransport
}
func (wrapper *transportWrapperBase) RpcTransport() transportInterface {
return wrapper.rpcTransport
}
func (wrapper *transportWrapperBase) NormalTransport() net.Listener {
return wrapper.transport
}
func (wrapper *transportWrapperBase) Port() int {
addr, _ := net.ResolveTCPAddr("tcp", wrapper.Addr().String())
return addr.Port
}
func (wrapper *transportWrapperBase) Addr() net.Addr {
return wrapper.transport.Addr()
}
func (wrapper *transportWrapperBase) Close() {
wrapper.raftTransport.Close()
}