-
Notifications
You must be signed in to change notification settings - Fork 0
/
analyticsquery_deferred.go
144 lines (120 loc) · 3.08 KB
/
analyticsquery_deferred.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package gocb
import (
"encoding/json"
"time"
"github.com/couchbaselabs/gocbcore8alpha"
)
// AnalyticsDeferredResultHandle allows access to the handle of a deferred Analytics query.
//
// Experimental: This API is subject to change at any time.
type AnalyticsDeferredResultHandle interface {
One(valuePtr interface{}) error
Next(valuePtr interface{}) bool
NextBytes() []byte
Close() error
Status() (string, error)
}
type analyticsDeferredResultHandle struct {
handleUri string
status string
rows *analyticsRows
err error
provider httpProvider
hasResult bool
timeout time.Duration
}
// Next assigns the next result from the results into the value pointer, returning whether the read was successful.
func (r *analyticsDeferredResultHandle) Next(valuePtr interface{}) bool {
if r.err != nil {
return false
}
row := r.NextBytes()
if row == nil {
return false
}
r.err = json.Unmarshal(row, valuePtr)
if r.err != nil {
return false
}
return true
}
// NextBytes returns the next result from the results as a byte array.
// TODO: how to deadline/timeout this?
func (r *analyticsDeferredResultHandle) NextBytes() []byte {
if r.err != nil {
return nil
}
if r.status == "success" && !r.hasResult {
req := &gocbcore.HttpRequest{
Service: gocbcore.CbasService,
Path: r.handleUri,
Method: "GET",
}
err := r.executeHandle(req, &r.rows.rows)
if err != nil {
r.err = err
return nil
}
r.hasResult = true
} else if r.status != "success" {
return nil
}
return r.rows.NextBytes()
}
// Close marks the results as closed, returning any errors that occurred during reading the results.
func (r *analyticsDeferredResultHandle) Close() error {
r.rows.Close()
return r.err
}
// One assigns the first value from the results into the value pointer.
func (r *analyticsDeferredResultHandle) One(valuePtr interface{}) error {
if !r.Next(valuePtr) {
err := r.Close()
if err != nil {
return err
}
// return ErrNoResults
}
// Ignore any errors occurring after we already have our result
err := r.Close()
if err != nil {
// Return no error as we got the one result already.
return nil
}
return nil
}
// Status triggers a network call to the handle URI, returning the current status of the long running query.
// TODO: how to deadline/timeout this?
func (r *analyticsDeferredResultHandle) Status() (string, error) {
req := &gocbcore.HttpRequest{
Service: gocbcore.CbasService,
Path: r.handleUri,
Method: "GET",
}
var resp *analyticsResponseHandle
err := r.executeHandle(req, &resp)
if err != nil {
r.err = err
return "", err
}
r.status = resp.Status
r.handleUri = resp.Handle
return r.status, nil
}
// TODO: how to deadline/timeout this?
func (r *analyticsDeferredResultHandle) executeHandle(req *gocbcore.HttpRequest, valuePtr interface{}) error {
resp, err := r.provider.DoHttpRequest(req)
if err != nil {
return err
}
jsonDec := json.NewDecoder(resp.Body)
err = jsonDec.Decode(valuePtr)
if err != nil {
return err
}
err = resp.Body.Close()
if err != nil {
logDebugf("Failed to close socket (%s)", err)
}
return nil
}