/
conn.go
184 lines (154 loc) · 3.87 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package mux
// NetConn wraps net.Conn which communicates using gob
import (
"io"
"net"
"strings"
"sync"
"time"
"github.com/doubledutch/lager"
)
// Conn defines a mux connection
type Conn interface {
// Receive registers a receiver to receive t
Receive(t uint8, r Receiver)
// Send encodes a frame on conn using t and e
Send(t uint8, e interface{}) error
// Recv listens for frames and sends them to a receiver
Recv()
// Pool returns the pool used by the Conn
Pool() Pool
// Shutdown closes the gob connection
Shutdown()
// IsShutdown provides a way to listen for this connection to shutdown
IsShutdown() chan struct{}
}
// Receiver defines an interface for receiving
type Receiver interface {
Receive(b []byte) error
Close() error
}
// conn is a Conn using net.Conn for communication
type conn struct {
// store the net.Conn to SetDeadlines
conn net.Conn
// used to encode data into frames
sendEnc BufferEncoder
sendLock sync.Mutex
// encode and decode conn
enc Encoder
dec Decoder
// Store receivers for Frames
Receivers map[uint8]Receiver
// allow of users and ourselves to listen for shutdown
ShutdownCh chan struct{}
isShutdown bool
// timeout for receiving frames
timeout time.Duration
lgr lager.Lager
pool Pool
}
// Frame represents transport
type Frame struct {
Type uint8
Data []byte
}
// NewConn creates a new NetConn using the specified conn and config
func NewConn(netConn net.Conn, pool Pool, config *Config) (Conn, error) {
if err := config.Verify(); err != nil {
return nil, err
}
return &conn{
conn: netConn,
sendEnc: pool.NewBufferEncoder(),
sendLock: sync.Mutex{},
dec: pool.NewDecoder(netConn),
enc: pool.NewEncoder(netConn),
Receivers: make(map[uint8]Receiver),
ShutdownCh: make(chan struct{}),
timeout: config.Timeout,
lgr: config.Lager,
pool: pool,
}, nil
}
// Send encodes a frame on conn using t and e
func (c *conn) Send(t uint8, e interface{}) error {
// Single threaded through here
c.sendLock.Lock()
c.sendEnc.Encode(e)
d := make([]byte, c.sendEnc.Len())
copy(d, c.sendEnc.Bytes())
c.sendEnc.Reset()
c.sendLock.Unlock()
f := Frame{
Type: t,
Data: d,
}
c.lgr.Debugf("Sending frame: %v\n", f)
return c.enc.Encode(f)
}
// Receive registers a receiver to receive t
func (c *conn) Receive(t uint8, r Receiver) {
c.Receivers[t] = r
c.lgr.Debugf("Added receiver type %d\n", t)
}
// Recv listens for frames and sends them to a receiver
func (c *conn) Recv() {
for {
var frame Frame
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
err := c.dec.Decode(&frame)
if err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") || strings.Contains(err.Error(), "reset by peer") {
// This is the expected way for us to return
c.lgr.Debugf("Recv loop disconnected from %s", c.conn.RemoteAddr())
return
}
if err, ok := err.(*net.OpError); ok && err.Timeout() {
select {
case <-c.ShutdownCh:
return
default: // Keep listening
continue
}
} else {
c.lgr.Errorf("Unexpected net.OpError: %s", err)
return
}
}
c.lgr.Debugf("Received frame: %v\n", frame)
r, ok := c.Receivers[frame.Type]
if !ok {
c.lgr.Warnf("dropping frame %d\n", frame.Type)
continue
}
err = r.Receive(frame.Data)
if err != nil {
c.lgr.Errorf("Receive error %s while receiving %s", err, frame.Data)
}
}
}
// IsShutdown provides a way to listen for this connection to shutdown
func (c *conn) IsShutdown() chan struct{} {
return c.ShutdownCh
}
// Pool returns the pool used by the Conn
func (c *conn) Pool() Pool {
return c.pool
}
// Shutdown closes the gob connection
func (c *conn) Shutdown() {
if c.isShutdown {
return
}
c.lgr.Infof("Shutting down")
c.isShutdown = true
// Notify that we're shutdown
close(c.ShutdownCh)
// Let receivers clean themselves up
for _, h := range c.Receivers {
h.Close()
}
// We're done with conn
c.conn.Close()
}