-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
130 lines (119 loc) · 2.65 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package libgrn
import (
"io"
"github.com/s-yata/grnci"
)
const (
maxIdleConns = 2 // Maximum number of idle connections
)
// Client is a thread-safe GQTP client or DB handle.
type Client struct {
addr *grnci.Address
baseConn *Conn
idleConns chan *Conn
}
// DialClient returns a new Client connected to a GQTP server.
// The expected address format is [scheme://][host][:port].
func DialClient(addr string) (*Client, error) {
a, err := grnci.ParseGQTPAddress(addr)
if err != nil {
return nil, err
}
conn, err := Dial(addr)
if err != nil {
return nil, err
}
conns := make(chan *Conn, maxIdleConns)
conns <- conn
return &Client{
addr: a,
idleConns: conns,
}, nil
}
// OpenClient opens an existing DB and returns a new Client.
func OpenClient(path string) (*Client, error) {
conn, err := Open(path)
if err != nil {
return nil, err
}
return &Client{
baseConn: conn,
}, nil
}
// CreateClient creates a new DB and returns a new Client.
func CreateClient(path string) (*Client, error) {
conn, err := Create(path)
if err != nil {
return nil, err
}
return &Client{
baseConn: conn,
}, nil
}
// Close closes the idle connections.
// Close should be called after all responses are closed.
// Otherwise, connections will be leaked.
func (c *Client) Close() error {
var err error
Loop:
for {
select {
case conn := <-c.idleConns:
if e := conn.Close(); e != nil && err == nil {
err = e
}
default:
break Loop
}
}
if c.baseConn != nil {
if e := c.baseConn.Close(); e != nil {
err = e
}
}
return err
}
// Exec sends a request and receives a response.
// It is the caller's responsibility to close the response.
func (c *Client) Exec(cmd string, body io.Reader) (grnci.Response, error) {
var conn *Conn
var err error
select {
case conn = <-c.idleConns:
default:
if c.addr != nil {
conn, err = Dial(c.addr.String())
if err != nil {
return nil, err
}
} else {
conn, err = c.baseConn.Dup()
if err != nil {
return nil, err
}
}
}
resp, err := conn.Exec(cmd, body)
if err != nil {
conn.Close()
return nil, err
}
resp.(*response).client = c
return resp, nil
}
// Invoke assembles cmd, params and body into a grnci.Request and calls Query.
func (c *Client) Invoke(cmd string, params map[string]interface{}, body io.Reader) (grnci.Response, error) {
req, err := grnci.NewRequest(cmd, params, body)
if err != nil {
return nil, err
}
return c.Query(req)
}
// Query calls Exec with req.GQTPRequest and returns the result.
func (c *Client) Query(req *grnci.Request) (grnci.Response, error) {
cmd, body, err := req.GQTPRequest()
if err != nil {
return nil, err
}
return c.Exec(cmd, body)
}