/
querystreamer.go
175 lines (141 loc) · 3.96 KB
/
querystreamer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package gocbcore
import (
"encoding/json"
"errors"
"io"
"sync"
)
// QueryResult allows access to the results of a N1QL query.
type queryStreamer struct {
metaDataBytes []byte
err error
lock sync.Mutex
stream io.ReadCloser
streamer *rowStreamer
}
func newQueryStreamer(stream io.ReadCloser, rowsAttrib string) (*queryStreamer, error) {
rowStreamer, err := newRowStreamer(stream, rowsAttrib)
if err != nil {
closeErr := stream.Close()
if closeErr != nil {
logDebugf("query stream close failed after error: %s", closeErr)
}
return nil, err
}
return &queryStreamer{
stream: stream,
streamer: rowStreamer,
}, nil
}
// NextRow returns the next row from the results, returning nil when the rows are exhausted.
func (r *queryStreamer) NextRow() []byte {
if r.streamer == nil {
return nil
}
rowBytes, err := r.streamer.NextRowBytes()
if err != nil {
r.finishWithError(err)
return nil
}
// Check if there were any rows left
if rowBytes == nil {
r.finishWithoutError()
return nil
}
return rowBytes
}
// Err returns any errors that have occurred on the stream
func (r *queryStreamer) Err() error {
r.lock.Lock()
err := r.err
r.lock.Unlock()
return err
}
// EarlyMetadata returns the value (or nil) of an attribute from a query metadata before the query has completed.
func (r *queryStreamer) EarlyMetadata(key string) json.RawMessage {
return r.streamer.EarlyAttrib(key)
}
func (r *queryStreamer) finishWithoutError() {
// Lets finalize the streamer so we Get the meta-data
metaDataBytes, err := r.streamer.Finalize()
if err != nil {
r.finishWithError(err)
return
}
// Streamer is no longer valid now that it's been Finalized
r.streamer = nil
// Close the stream now that we are done with it
err = r.stream.Close()
if err != nil {
logWarnf("query stream close failed after meta-data: %s", err)
}
// The stream itself is no longer valid
r.lock.Lock()
r.stream = nil
r.lock.Unlock()
r.metaDataBytes = metaDataBytes
}
func (r *queryStreamer) finishWithError(err error) {
// Lets record the error that happened
r.err = err
// Our streamer is invalidated as soon as an error occurs
r.streamer = nil
// Lets close the underlying stream
closeErr := r.stream.Close()
if closeErr != nil {
// We log this at debug level, but its almost always going to be an
// error since thats the most likely reason we are in finishWithError
logDebugf("query stream close failed after error: %s", closeErr)
}
// The stream itself is now no longer valid
r.stream = nil
}
// Close marks the results as closed, returning any errors that occurred during reading the results.
func (r *queryStreamer) Close() error {
// If an error occurred before, we should return that (forever)
err := r.Err()
if err != nil {
return err
}
r.lock.Lock()
stream := r.stream
r.lock.Unlock()
// If the stream is already closed, we can imply that no error occurred
if stream == nil {
return nil
}
return stream.Close()
}
// One assigns the first value from the results into the value pointer.
// It will close the results but not before iterating through all remaining
// results, as such this should only be used for very small resultsets - ideally
// of, at most, length 1.
func (r *queryStreamer) One() ([]byte, error) {
rowBytes := r.NextRow()
if rowBytes == nil {
if r.Err() == nil {
return nil, errors.New("no rows available")
}
return nil, r.Close()
}
// Read any remaining rows
for r.NextRow() != nil {
// skip
}
// If an error occurred during the streaming, we need to
// return that, and make sure the result is closed
err := r.Err()
if err != nil {
return nil, err
}
return rowBytes, nil
}
func (r *queryStreamer) MetaData() ([]byte, error) {
if r.streamer != nil {
return nil, errors.New("the result must be closed before accessing the meta-data")
}
if r.metaDataBytes == nil {
return nil, errors.New("an error occurred during querying which has made the meta-data unavailable")
}
return r.metaDataBytes, nil
}