forked from bpowers/radix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
134 lines (105 loc) · 3.22 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package redis
//* Client
// Client manages the access to a database.
type Client struct {
config Config
pool *connPool
}
// NewClient creates a new Client.
func NewClient(config Config) *Client {
c := new(Client)
c.config = config
c.pool = newConnPool(&c.config)
return c
}
// Close closes all connections of the client.
func (c *Client) Close() {
c.pool.close()
}
func (c *Client) call(cmd Cmd, args ...interface{}) *Reply {
// Connection handling
conn, err := c.pool.pull()
if err != nil {
return &Reply{Type: ReplyError, Err: err}
}
defer c.pool.push(conn)
return conn.call(Cmd(cmd), args...)
}
// Call calls the given Redis command.
func (c *Client) Call(cmd string, args ...interface{}) *Reply {
return c.call(Cmd(cmd), args...)
}
func (c *Client) asyncCall(cmd Cmd, args ...interface{}) Future {
f := newFuture()
go func() {
f <- c.call(cmd, args...)
}()
return f
}
// AsyncCall calls the given Redis command asynchronously.
func (c *Client) AsyncCall(cmd string, args ...interface{}) Future {
return c.asyncCall(Cmd(cmd), args...)
}
// InfoMap calls the INFO command, parses and returns the results as a map[string]string or an error.
// Use Info method for fetching the unparsed INFO results.
func (c *Client) InfoMap() (map[string]string, error) {
// Connection handling
conn, err := c.pool.pull()
if err != nil {
return nil, err
}
defer c.pool.push(conn)
return conn.infoMap()
}
func (c *Client) multiCall(transaction bool, f func(*MultiCall)) *Reply {
// Connection handling
conn, err := c.pool.pull()
if err != nil {
return &Reply{Type: ReplyError, Err: err}
}
defer c.pool.push(conn)
return newMultiCall(transaction, conn).process(f)
}
// MultiCall executes the given MultiCall.
// Multicall reply is guaranteed to have the same number of sub-replies as calls, if it succeeds.
func (c *Client) MultiCall(f func(*MultiCall)) *Reply {
return c.multiCall(false, f)
}
// Transaction performs a simple transaction.
// Simple transaction is a multi command that is wrapped in a MULTI-EXEC block.
// For complex transactions with WATCH, UNWATCH or DISCARD commands use MultiCall.
// Transaction reply is guaranteed to have the same number of sub-replies as calls, if it succeeds.
func (c *Client) Transaction(f func(*MultiCall)) *Reply {
return c.multiCall(true, f)
}
// AsyncMultiCall calls an asynchronous MultiCall.
func (c *Client) AsyncMultiCall(mc func(*MultiCall)) Future {
f := newFuture()
go func() {
f <- c.MultiCall(mc)
}()
return f
}
// AsyncTransaction performs a simple asynchronous transaction.
func (c *Client) AsyncTransaction(mc func(*MultiCall)) Future {
f := newFuture()
go func() {
f <- c.Transaction(mc)
}()
return f
}
//* PubSub
// Subscription returns a new Subscription instance with the given message handler callback or
// an error. The message handler is called whenever a new message arrives.
// Subscriptions create their own dedicated connections,
// they do not pull connections from the connection pool.
func (c *Client) Subscription(msgHdlr func(msg *Message)) (*Subscription, *Error) {
if msgHdlr == nil {
panic(errmsg("message handler must not be nil"))
}
sub, err := newSubscription(&c.config, msgHdlr)
if err != nil {
return nil, err
}
return sub, nil
}