/
client.go
167 lines (147 loc) · 4.21 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package rpcserver
import (
"sync"
"time"
"github.com/decred/dcrd/rpcclient/v3"
)
const (
// connectionRetryInterval is the amount of time to wait in between
// retries when automatically reconnecting to an RPC server.
connectionRetryInterval = time.Second * 5
// disconnectCheckInterval is the amount of time to wait between
// checks for a disconnection.
disconnectCheckInterval = time.Second * 10
)
// Client holds the information related to an rpcclient and handles access to
// that client through a mutex.
//
// It should be noted that this is a temporary fix to the problem that rpcclient
// does not return an error when autoreconnect is turned on but the client is
// disconnected. The permanent solution is to change the behaviour of rpccleint.
// TODO: Remove this file.
type Client struct {
// client is protected by a mutex that must be held for reads/writes.
client *rpcclient.Client
mux sync.RWMutex
cfg *rpcclient.ConnConfig
ntfnHandlers *rpcclient.NotificationHandlers
wg sync.WaitGroup
stop chan struct{}
connected chan struct{}
connectedMux sync.Mutex
}
// Connected returns a receiving copy of the current connected channel. If
// disconnected and the channel is not yet blocking, creates a new channel that
// will be closed on a successful reconnect.
func (c *Client) Connected() <-chan struct{} {
c.connectedMux.Lock()
defer c.connectedMux.Unlock()
if c.RPCClient().Disconnected() {
// Start blocking on connected chan if not already.
select {
case <-c.connected:
c.connected = make(chan struct{})
default:
}
}
return c.connected
}
// IsConnected checks and returns whethere the client is currently connected.
func (c *Client) IsConnected() bool {
select {
case <-c.Connected():
return true
default:
return false
}
}
// RPCClient allows access to the underlying rpcclient by providing a copy of
// its address.
func (c *Client) RPCClient() *rpcclient.Client {
c.mux.RLock()
defer c.mux.RUnlock()
return c.client
}
// New creates a new Client and starts the automatic reconnection handler.
// Returns an error if unable to construct a new rpcclient.
func NewClient(cfg *rpcclient.ConnConfig, ntfnHandlers *rpcclient.NotificationHandlers) (*Client, error) {
client, err := rpcclient.New(cfg, ntfnHandlers)
if err != nil {
return nil, err
}
c := &Client{
client: client,
cfg: cfg,
ntfnHandlers: ntfnHandlers,
stop: make(chan struct{}),
connected: make(chan struct{}),
}
// A closed connected channel indcates successfully connected.
close(c.connected)
c.wg.Add(1)
go c.autoReconnect()
return c, nil
}
// Stop stops automatic reconnections.
func (c *Client) Stop() {
c.stop <- struct{}{}
// Wait for autoReconnect to stop.
c.wg.Wait()
// Stop blocking on connected if blocking, as we will never reconnect again.
if !c.IsConnected() {
close(c.connected)
}
}
// autoReconnect waits for a disconnect or stop. On disconnect it attempts to
// reconnect to the client every connectionRetryInterval.
//
// This function must be run as a goroutine.
func (c *Client) autoReconnect() {
out:
for {
select {
case <-c.stop:
break out
case <-time.After(disconnectCheckInterval):
if c.IsConnected() {
continue out
}
// Client is disconnected. Try to reconnect.
}
reconnect:
for {
select {
case <-c.stop:
break out
default:
}
// Start a new client.
client, err := rpcclient.New(c.cfg, c.ntfnHandlers)
if err != nil {
log.Warnf("Failed to connect to %s: %v",
c.cfg.Host, err)
log.Infof("Retrying connection to %s in "+
"%s", c.cfg.Host, connectionRetryInterval)
time.Sleep(connectionRetryInterval)
continue reconnect
}
// Properly shutdown old client.
c.mux.Lock()
c.client.Shutdown()
c.client.WaitForShutdown()
// Switch the new client with the old, shutdown one.
c.client = client
c.mux.Unlock()
// Close the connected channel so that all waiting
// processes can continue.
close(c.connected)
log.Infof("Reestablished connection to RPC server %s",
c.cfg.Host)
// Break out of the reconnect loop back to wait for
// disconnect again.
break
}
}
c.wg.Done()
log.Tracef("RPC client reconnect handler done for %s", c.cfg.Host)
}