-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
242 lines (204 loc) · 6.26 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package epaxos
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
"github.com/bbengfort/epaxos/pb"
"github.com/bbengfort/x/peers"
"google.golang.org/grpc"
)
// DefaultRetries specifies the number of times to attempt a commit.
const DefaultRetries = 3
// NewClient creates a new ePaxos client to connect to a quorum.
func NewClient(remote string, options *Config) (client *Client, err error) {
// Create a new configuration from defaults, configuration file, and the
// environment; verify it and return any improperly configured errors.
config := new(Config)
if err = config.Load(); err != nil {
return nil, err
}
// Update the configuration with the specified options
if err = config.Update(options); err != nil {
return nil, err
}
// Create the client
client = &Client{config: config}
// Compute the identity
hostname, _ := config.GetName()
if hostname != "" {
client.identity = fmt.Sprintf("%s-%04X", hostname, rand.Intn(0x10000))
} else {
client.identity = fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000))
}
// Connect when client is created to capture any errors as early as possible.
// NOTE: connection errors still return the client for retry
if err = client.connect(remote); err != nil {
return client, err
}
return client, nil
}
// Client maintains network information embedded in the configuration to
// connect to an ePaxos consensus quorum and make propose requests.
type Client struct {
sync.RWMutex
config *Config // network details for connection
conn *grpc.ClientConn // grpc connection to dial an ePaxos server
client pb.EpaxosClient // grpc RPC interface
identity string // a unique identity for all clients
}
//===========================================================================
// Request API
//===========================================================================
// Get a value for a key (execute a read operation)
func (c *Client) Get(key string) ([]byte, error) {
rep, err := c.Propose(pb.AccessType_READ, key, nil)
if err != nil {
return nil, err
}
return rep.Value, nil
}
// Put a value for a key (execute a write operation)
func (c *Client) Put(key string, value []byte, execute bool) error {
var access pb.AccessType
if execute {
access = pb.AccessType_WRITEREAD
} else {
access = pb.AccessType_WRITE
}
_, err := c.Propose(access, key, value)
return err
}
// Del a value for a key (execute a delete operation)
func (c *Client) Del(key string) error {
_, err := c.Propose(pb.AccessType_DELETE, key, nil)
return err
}
// Propose an operation to be applied to the state store.
func (c *Client) Propose(access pb.AccessType, key string, value []byte) (rep *pb.ProposeReply, err error) {
// Create the request for the operation
req := &pb.ProposeRequest{
Identity: c.identity,
Op: &pb.Operation{
Type: access, Key: key, Value: value,
},
}
// Send the request
if rep, err = c.send(req, DefaultRetries); err != nil {
return nil, err
}
return rep, nil
}
// Send the propose request, handling retries.
func (c *Client) send(req *pb.ProposeRequest, retries int) (*pb.ProposeReply, error) {
// Don't attempt if there are no more retries.
if retries <= 0 {
return nil, ErrRetries
}
// Connect if not connected
if !c.isConnected() {
if err := c.connect(""); err != nil {
return nil, err
}
}
// Create the context
timeout, err := c.config.GetTimeout()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
rep, err := c.client.Propose(ctx, req)
if err != nil {
if retries > 1 {
// If there is an error connecting to the current host, try another
if err = c.connect(""); err != nil {
return nil, err
}
return c.send(req, retries-1)
}
return nil, err
}
if !rep.Success {
// If there was an error, return the error, otherwise retry.
if rep.Error != "" {
return nil, errors.New(rep.Error)
}
return c.send(req, retries-1)
}
return rep, nil
}
//===========================================================================
// Connection Handlers
//===========================================================================
// Connect to the remote client using the specified timeout. If a remote is not
// specified (e.g. empty string) then a random replica is selected from the
// configuration to connect to, prioritizing any replica on the same host as the
// client.
func (c *Client) connect(remote string) (err error) {
// Close the connection if one is already open.
c.close()
// Get the peer by name or select peer from configuration.
var host *peers.Peer
if host, err = c.selectRemote(remote); err != nil {
return err
}
// Parse timeout from configuration
var timeout time.Duration
if timeout, err = c.config.GetTimeout(); err != nil {
return err
}
// Connect to the remote's address
addr := host.Endpoint(false)
if c.conn, err = grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(timeout)); err != nil {
return fmt.Errorf("could not connect to '%s': %s", addr, err)
}
// Create gRPC client and return
c.client = pb.NewEpaxosClient(c.conn)
return nil
}
// Close the connection to the remote host and clean up.
func (c *Client) close() (err error) {
defer func() {
c.conn = nil
c.client = nil
}()
if c.conn == nil {
return nil
}
return c.conn.Close()
}
// Find the remote by name, or select the remote on the same host as the client,
// otherwise return a random remote. Future versions should select a remote in the
// same region as the client.
func (c *Client) selectRemote(remote string) (*peers.Peer, error) {
if remote == "" {
if len(c.config.Peers) == 0 {
return nil, ErrNoNetwork
}
// Search for host by client name/local hostname
hostname, _ := c.config.GetName()
if hostname != "" {
for _, peer := range c.config.Peers {
if peer.Name == hostname || peer.Hostname == hostname {
return &peer, nil
}
}
}
// Return a random peer
idx := rand.Intn(len(c.config.Peers))
return &c.config.Peers[idx], nil
}
for _, peer := range c.config.Peers {
if peer.Name == remote {
return &peer, nil
}
}
return nil, fmt.Errorf("could not find remote '%s' in configuration", remote)
}
// Ensures a client and connection exist
func (c *Client) isConnected() bool {
return c.client != nil && c.conn != nil
}