/
conn.go
129 lines (100 loc) · 2.22 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package pool
import (
"net"
"strconv"
"sync/atomic"
"time"
)
var noDeadline = time.Time{}
type Conn struct {
netConn net.Conn
buf []byte
rd *Reader
rdLocked bool
wb *WriteBuffer
InitedAt time.Time
pooled bool
usedAt atomic.Value
ProcessId int32
SecretKey int32
_lastId int64
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
buf: makeBuffer(),
rd: NewReader(NewElasticBufReader(netConn)),
wb: NewWriteBuffer(),
}
cn.SetNetConn(netConn)
cn.SetUsedAt(time.Now())
return cn
}
func (cn *Conn) UsedAt() time.Time {
return cn.usedAt.Load().(time.Time)
}
func (cn *Conn) SetUsedAt(tm time.Time) {
cn.usedAt.Store(tm)
}
func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr()
}
func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn
cn.rd.Reset(netConn)
}
func (cn *Conn) NetConn() net.Conn {
return cn.netConn
}
func (cn *Conn) NextId() string {
cn._lastId++
return strconv.FormatInt(cn._lastId, 10)
}
func (cn *Conn) setReadTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
return cn.netConn.SetReadDeadline(now.Add(timeout))
}
return cn.netConn.SetReadDeadline(noDeadline)
}
func (cn *Conn) setWriteTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
return cn.netConn.SetWriteDeadline(now.Add(timeout))
}
return cn.netConn.SetWriteDeadline(noDeadline)
}
func (cn *Conn) LockReaderBuffer() {
cn.rdLocked = true
cn.rd.ResetBuffer(makeBuffer())
}
func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *Reader) error) error {
_ = cn.setReadTimeout(timeout)
if !cn.rdLocked {
cn.rd.ResetBuffer(cn.buf)
}
err := fn(cn.rd)
if !cn.rdLocked {
cn.buf = cn.rd.Buffer()
}
return err
}
func (cn *Conn) WithWriter(timeout time.Duration, fn func(wb *WriteBuffer) error) error {
_ = cn.setWriteTimeout(timeout)
cn.wb.ResetBuffer(cn.buf)
firstErr := fn(cn.wb)
_, err := cn.netConn.Write(cn.wb.Bytes)
cn.buf = cn.wb.Buffer()
if err != nil && firstErr == nil {
firstErr = err
}
return firstErr
}
func (cn *Conn) Close() error {
return cn.netConn.Close()
}
func makeBuffer() []byte {
const defaulBufSize = 4096
return make([]byte, defaulBufSize)
}