forked from keimoon/gore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conn.go
154 lines (140 loc) · 3.02 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package gore
import (
"bufio"
"net"
"sync"
"time"
)
const (
connStateNotConnected = iota
connStateConnected
connStateReconnecting
)
// Conn holds a persistent connection to a redis server
type Conn struct {
address string
tcpConn net.Conn
state int
mutex sync.Mutex
rb *bufio.Reader
wb *bufio.Writer
sentinel bool
RequestTimeout time.Duration
isClosed bool
password string
}
// Dial opens a TCP connection with a redis server.
func Dial(address string) (*Conn, error) {
conn := &Conn{
RequestTimeout: time.Duration(Config.RequestTimeout) * time.Second,
}
conn.mutex.Lock()
defer conn.mutex.Unlock()
err := conn.connect(address, 0)
return conn, err
}
// DialTimeout opens a TCP connection with a redis server with a connection timeout
func DialTimeout(address string, timeout time.Duration) (*Conn, error) {
conn := &Conn{
RequestTimeout: time.Duration(Config.RequestTimeout) * time.Second,
}
conn.mutex.Lock()
defer conn.mutex.Unlock()
err := conn.connect(address, timeout)
return conn, err
}
// Auth makes authentication with redis server
func (c *Conn) Auth(password string) error {
c.password = password
if c.password == "" {
return nil
}
rep, err := NewCommand("AUTH", password).Run(c)
if err != nil {
return err
}
if !rep.IsOk() {
return ErrAuth
}
return nil
}
// Close closes the connection
func (c *Conn) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()
c.isClosed = true
if c.state == connStateNotConnected || c.tcpConn == nil {
return nil
}
c.state = connStateNotConnected
return c.tcpConn.Close()
}
// IsConnected returns true if connection is okay
func (c *Conn) IsConnected() bool {
return c.state == connStateConnected
}
// GetAddress returns connection address
func (c *Conn) GetAddress() string {
return c.address
}
// Lock locks the whole connection
func (c *Conn) Lock() {
c.mutex.Lock()
}
// Unlock unlocks the whole connection
func (c *Conn) Unlock() {
c.mutex.Unlock()
}
func (c *Conn) connect(address string, timeout time.Duration) error {
if c.state == connStateConnected {
return nil
}
var err error
c.address = address
if timeout == 0 {
c.tcpConn, err = net.Dial("tcp", address)
} else {
c.tcpConn, err = net.DialTimeout("tcp", address, timeout)
}
if err == nil {
c.state = connStateConnected
c.rb = bufio.NewReader(c.tcpConn)
c.wb = bufio.NewWriter(c.tcpConn)
}
return err
}
func (c *Conn) fail() {
if !c.sentinel {
c.mutex.Lock()
if c.state == connStateReconnecting {
c.mutex.Unlock()
return
}
c.tcpConn.Close()
c.state = connStateReconnecting
c.mutex.Unlock()
go c.reconnect()
}
}
func (c *Conn) reconnect() {
sleepTime := Config.ReconnectTime
for {
c.mutex.Lock()
if c.isClosed {
c.mutex.Unlock()
break
}
if err := c.connect(c.address, 0); err == nil {
c.mutex.Unlock()
break
}
c.mutex.Unlock()
time.Sleep(time.Duration(sleepTime) * time.Second)
if sleepTime < 30 {
sleepTime += 2
}
}
if c.password != "" {
c.Auth(c.password)
}
}