/
query_options.go
175 lines (136 loc) · 4.91 KB
/
query_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package gocb
import (
"context"
"strconv"
"strings"
"time"
"github.com/google/uuid"
)
// QueryScanConsistency indicates the level of data consistency desired for a query.
type QueryScanConsistency uint
const (
// QueryScanConsistencyNotBounded indicates no data consistency is required.
QueryScanConsistencyNotBounded QueryScanConsistency = iota + 1
// QueryScanConsistencyRequestPlus indicates that request-level data consistency is required.
QueryScanConsistencyRequestPlus
)
// QueryOptions represents the options available when executing a query.
type QueryOptions struct {
ScanConsistency QueryScanConsistency
ConsistentWith *MutationState
Profile QueryProfileMode
// ScanCap is the maximum buffered channel size between the indexer connectionManager and the query service for index scans.
ScanCap uint32
// PipelineBatch controls the number of items execution operators can batch for Fetch from the KV.
PipelineBatch uint32
// PipelineCap controls the maximum number of items each execution operator can buffer between various operators.
PipelineCap uint32
// ScanWait is how long the indexer is allowed to wait until it can satisfy ScanConsistency/ConsistentWith criteria.
ScanWait time.Duration
Readonly bool
// MaxParallelism is the maximum number of index partitions, for computing aggregation in parallel.
MaxParallelism uint32
// ClientContextID provides a unique ID for this query which can be used matching up requests between connectionManager and
// server. If not provided will be assigned a uuid value.
ClientContextID string
PositionalParameters []interface{}
NamedParameters map[string]interface{}
Metrics bool
// Raw provides a way to provide extra parameters in the request body for the query.
Raw map[string]interface{}
Adhoc bool
Timeout time.Duration
RetryStrategy RetryStrategy
// FlexIndex tells the query engine to use a flex index (utilizing the search service).
FlexIndex bool
// PreserveExpiry tells the query engine to preserve expiration values set on any documents modified by this query.
PreserveExpiry bool
ParentSpan RequestSpan
// Using a deadlined Context alongside a Timeout will cause the shorter of the two to cause cancellation, this
// also applies to global level timeouts.
// UNCOMMITTED: This API may change in the future.
Context context.Context
// AsTransaction indicates to run this query as a transaction, providing any additional transaction specific
// configuration.
// UNCOMMITTED: This API may change in the future.
AsTransaction *SingleQueryTransactionOptions
// Internal: This should never be used and is not supported.
Internal struct {
User string
Endpoint string
}
}
func (opts *QueryOptions) toMap() (map[string]interface{}, error) {
execOpts := make(map[string]interface{})
if opts.ScanConsistency != 0 && opts.ConsistentWith != nil {
return nil, makeInvalidArgumentsError("ScanConsistency and ConsistentWith must be used exclusively")
}
if opts.ScanConsistency != 0 {
if opts.ScanConsistency == QueryScanConsistencyNotBounded {
execOpts["scan_consistency"] = "not_bounded"
} else if opts.ScanConsistency == QueryScanConsistencyRequestPlus {
execOpts["scan_consistency"] = "request_plus"
} else {
return nil, makeInvalidArgumentsError("Unexpected consistency option")
}
}
if opts.ConsistentWith != nil {
execOpts["scan_consistency"] = "at_plus"
execOpts["scan_vectors"] = opts.ConsistentWith
}
if opts.Profile != "" {
execOpts["profile"] = opts.Profile
}
if opts.Readonly {
execOpts["readonly"] = opts.Readonly
}
if opts.PositionalParameters != nil && opts.NamedParameters != nil {
return nil, makeInvalidArgumentsError("Positional and named parameters must be used exclusively")
}
if opts.PositionalParameters != nil {
execOpts["args"] = opts.PositionalParameters
}
if opts.NamedParameters != nil {
for key, value := range opts.NamedParameters {
if !strings.HasPrefix(key, "$") {
key = "$" + key
}
execOpts[key] = value
}
}
if opts.ScanCap != 0 {
execOpts["scan_cap"] = strconv.FormatUint(uint64(opts.ScanCap), 10)
}
if opts.PipelineBatch != 0 {
execOpts["pipeline_batch"] = strconv.FormatUint(uint64(opts.PipelineBatch), 10)
}
if opts.PipelineCap != 0 {
execOpts["pipeline_cap"] = strconv.FormatUint(uint64(opts.PipelineCap), 10)
}
if opts.ScanWait > 0 {
execOpts["scan_wait"] = opts.ScanWait.String()
}
if opts.Raw != nil {
for k, v := range opts.Raw {
execOpts[k] = v
}
}
if opts.MaxParallelism > 0 {
execOpts["max_parallelism"] = strconv.FormatUint(uint64(opts.MaxParallelism), 10)
}
if !opts.Metrics {
execOpts["metrics"] = false
}
if opts.ClientContextID == "" {
execOpts["client_context_id"] = uuid.New()
} else {
execOpts["client_context_id"] = opts.ClientContextID
}
if opts.FlexIndex {
execOpts["use_fts"] = true
}
if opts.PreserveExpiry {
execOpts["preserve_expiry"] = true
}
return execOpts, nil
}