forked from christopherhesse/rethinkgo
/
rows.go
283 lines (253 loc) · 7.27 KB
/
rows.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
package rethinkgo
import (
"code.google.com/p/goprotobuf/proto"
"errors"
"fmt"
"io"
"reflect"
p "github.com/christopherhesse/rethinkgo/ql2"
)
// Rows is an interator to move through the rows returned by the database, call
// rows.Scan(&dest) in a loop to scan a row into the variable `dest`,
// rows.Next() returns false when there is an error or no more rows left.
//
// There are three methods on the rows object that can be used to avoid
// iterating in this manner. These three methods correspond to the return types
// of a query:
//
// .Exec() for an empty response:
//
// err := r.Db("marvel").TableCreate("heroes").Exec()
//
// .One(&dest) for a response that always returns a single result:
//
// var response string
// err := r.Table("heroes").Get("Omega Red", "name").Run(session).One(&response)
//
// .All(&dest) for a list of results:
//
// var response []string
// err := r.Db("marvel").TableList().Run(session).All(&response)
//
// .All() may perform multiple network requests to get all of the results of
// the query. Use .Limit() if you only need a certain number.
//
// All three of these methods will return errors if used on a query response
// that does not match the expected type (ErrWrongResponseType).
type Rows struct {
session *Session
conn *connection
closed bool
buffer []*p.Datum
current *p.Datum
complete bool
lasterr error
token int64
responseType p.Response_ResponseType
}
// continueQuery creates a query that will cause this query to continue
func (rows *Rows) continueQuery() error {
queryProto := &p.Query{
Type: p.Query_CONTINUE.Enum(),
Token: proto.Int64(rows.token),
}
buffer, responseType, err := rows.conn.executeQuery(queryProto, rows.session.timeout)
if err != nil {
return err
}
switch responseType {
case p.Response_SUCCESS_PARTIAL:
// continuation of a stream of rows
rows.buffer = buffer
case p.Response_SUCCESS_SEQUENCE:
// end of a stream of rows, there's no more after this
rows.buffer = buffer
rows.complete = true
// since we won't be needing this connection anymore, we can close the
// iterator, if the user closes it again, it won't hurt it
rows.Close()
default:
return fmt.Errorf("rethinkdb: Unexpected response type: %v", responseType)
}
return nil
}
// Next moves the iterator forward by one document, returns false if there are
// no more rows or some sort of error has occurred (use .Err() to get the last
// error). `dest` must be passed by reference.
//
// Example usage:
//
// rows := r.Table("heroes").Run(session)
// for rows.Next() {
// var hero interface{}
// rows.Scan(&hero)
// fmt.Println("hero:", hero)
// }
// if rows.Err() != nil {
// ...
// }
func (rows *Rows) Next() bool {
if rows.closed {
return false
}
if rows.lasterr != nil {
return false
}
if len(rows.buffer) == 0 {
// we're out of results, may need to fetch some more
if rows.complete {
// no more rows left to fetch
rows.lasterr = io.EOF
} else {
// more rows to get, fetch 'em
err := rows.continueQuery()
if err != nil {
rows.lasterr = err
}
}
}
if len(rows.buffer) > 0 {
rows.current = rows.buffer[0]
rows.buffer = rows.buffer[1:len(rows.buffer)]
}
if rows.lasterr == io.EOF {
rows.closed = true
}
return rows.lasterr == nil
}
// Scan writes the current row into the provided variable, which must be passed
// by reference.
//
// NOTE: Scan uses json.Unmarshal internally and will not clear the destination
// before writing the next row. Make sure to create a new destination or clear
// it before calling .Scan(&dest).
func (rows *Rows) Scan(dest interface{}) error {
return datumUnmarshal(rows.current, dest)
}
// Err returns the last error encountered, for example, a network error while
// contacting the database server, or while parsing.
//
// Example usage:
//
// err := r.Table("heroes").Run(session).Err()
func (rows *Rows) Err() error {
if rows.lasterr == io.EOF {
// this represents a normal termination of the iterator, so it doesn't really
// count as an error
return nil
}
return rows.lasterr
}
// All gets all results from the iterator into a reference to a slice. It
// may perform multiple network requests to the server until it has retrieved
// all results.
//
// Example usage:
//
// var result []interface{}
// err := r.Table("heroes").Run(session).All(&result)
func (rows *Rows) All(slice interface{}) error {
defer rows.Close()
if rows.Err() != nil {
return rows.Err()
}
slicePointerValue := reflect.ValueOf(slice)
if slicePointerValue.Kind() != reflect.Ptr {
return errors.New("rethinkdb: `slice` should probably should be a pointer to a slice")
}
sliceValue := slicePointerValue.Elem()
if sliceValue.Kind() != reflect.Slice {
return errors.New("rethinkdb: A slice type must be provided")
}
if rows.responseType != p.Response_SUCCESS_PARTIAL && rows.responseType != p.Response_SUCCESS_SEQUENCE {
return ErrWrongResponseType{}
}
// create a new slice to hold the results
newSliceValue := reflect.MakeSlice(sliceValue.Type(), 0, 0)
// create a new element of the kind that the slice holds so we can scan
// into it
elemValue := reflect.New(sliceValue.Type().Elem())
for rows.Next() {
if err := rows.Scan(elemValue.Interface()); err != nil {
return err
}
newSliceValue = reflect.Append(newSliceValue, elemValue.Elem())
}
if rows.Err() != nil {
return rows.Err()
}
sliceValue.Set(newSliceValue)
return nil
}
// One gets the first result from a query response.
//
// Example usage:
//
// var result interface{}
// err := r.Table("villains").Get("Galactus", "name").Run(session).One(&result)
func (rows *Rows) One(row interface{}) error {
defer rows.Close()
if rows.Err() != nil {
return rows.Err()
}
if rows.responseType != p.Response_SUCCESS_ATOM {
return ErrWrongResponseType{}
}
if rows.lasterr == io.EOF {
return ErrNoSuchRow{}
}
rows.Next()
if err := rows.Scan(row); err != nil {
return err
}
return rows.Err()
}
// Exec is for queries for which you wish to ignore the result. For instance,
// creating a table.
//
// Example usage:
//
// err := r.TableCreate("villains").Run(session).Exec()
func (rows *Rows) Exec() error {
defer rows.Close()
if rows.Err() != nil {
return rows.Err()
}
return nil
}
// Close frees up the connection associated with this iterator, if any. Just
// use defer rows.Close() after retrieving a Rows iterator. Not required with
// .Exec(), .One(), or .All().
//
// Only stream responses will have an associated connection.
//
// Example usage:
//
// rows := r.Table("villains").Run(session)
// defer rows.Close()
//
// for rows.Next() {
// var result interface{}
// rows.Scan(&result)
// fmt.Println("result:", result)
// }
func (rows *Rows) Close() (err error) {
if !rows.closed {
if rows.conn != nil {
// if rows.conn is not nil, that means this is a stream response
// if this Rows iterator was closed before retrieving all results, send a
// stop query to the server to discard any remaining results
if !rows.complete {
queryProto := &p.Query{
Type: p.Query_STOP.Enum(),
Token: proto.Int64(rows.token),
}
_, _, err = rows.conn.executeQuery(queryProto, rows.session.timeout)
}
// return this connection to the pool
rows.session.putConn(rows.conn)
}
rows.closed = true
}
return
}