forked from rethinkdb/rethinkdb-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
session.go
140 lines (115 loc) · 2.84 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package gorethink
import (
"sync"
"time"
p "github.com/dancannon/gorethink/ql2"
)
type Session struct {
opts ConnectOpts
pool *Pool
// Response cache, used for batched responses
sync.Mutex
closed bool
token int64
}
type ConnectOpts struct {
Address string `gorethink:"address,omitempty"`
Database string `gorethink:"database,omitempty"`
AuthKey string `gorethink:"authkey,omitempty"`
Timeout time.Duration `gorethink:"timeout,omitempty"`
MaxIdle int `gorethink:"max_idle,omitempty"`
MaxOpen int `gorethink:"max_open,omitempty"`
}
func (o *ConnectOpts) toMap() map[string]interface{} {
return optArgsToMap(o)
}
// Connect creates a new database session.
//
// Supported arguments include Address, Database, Timeout, Authkey. Pool
// options include MaxIdle, MaxOpen.
//
// By default maxIdle and maxOpen are set to 1: passing values greater
// than the default (e.g. MaxIdle: "10", MaxOpen: "20") will provide a
// pool of re-usable connections.
//
// Basic connection example:
//
// var session *r.Session
// session, err := r.Connect(r.ConnectOpts{
// Address: "localhost:28015",
// Database: "test",
// AuthKey: "14daak1cad13dj",
// })
func Connect(opts ConnectOpts) (*Session, error) {
// Connect
s := &Session{
opts: opts,
}
err := s.Reconnect()
if err != nil {
return nil, err
}
return s, nil
}
type CloseOpts struct {
NoReplyWait bool `gorethink:"noreplyWait,omitempty"`
}
func (o *CloseOpts) toMap() map[string]interface{} {
return optArgsToMap(o)
}
// Reconnect closes and re-opens a session.
func (s *Session) Reconnect(optArgs ...CloseOpts) error {
var err error
if err = s.Close(optArgs...); err != nil {
return err
}
s.pool, err = NewPool(&s.opts)
if err != nil {
return err
}
// Ping connection to check it is valid
err = s.pool.Ping()
if err != nil {
return err
}
s.closed = false
return nil
}
// Close closes the session
func (s *Session) Close(optArgs ...CloseOpts) error {
if s.closed {
return nil
}
if len(optArgs) >= 1 {
if optArgs[0].NoReplyWait {
s.NoReplyWait()
}
}
if s.pool != nil {
s.pool.Close()
}
s.pool = nil
s.closed = true
return nil
}
// SetMaxIdleConns sets the maximum number of connections in the idle
// connection pool.
func (s *Session) SetMaxIdleConns(n int) {
s.pool.SetMaxIdleConns(n)
}
// SetMaxOpenConns sets the maximum number of open connections to the database.
func (s *Session) SetMaxOpenConns(n int) {
s.pool.SetMaxOpenConns(n)
}
// NoReplyWait ensures that previous queries with the noreply flag have been
// processed by the server. Note that this guarantee only applies to queries
// run on the given connection
func (s *Session) NoReplyWait() error {
return s.pool.Exec(Query{
Type: p.Query_NOREPLY_WAIT,
})
}
// Use changes the default database used
func (s *Session) Use(database string) {
s.opts.Database = database
}