forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch.go
187 lines (168 loc) · 5.14 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package pipeline
import (
"bytes"
"reflect"
"time"
)
// A node that handles creating several child QueryNodes.
// Each call to `query` creates a child batch node that
// can further be configured. See QueryNode
// The `batch` variable in batch tasks is an instance of
// a BatchNode.
//
// Example:
// var errors = batch
// |query('SELECT value from errors')
// ...
// var views = batch
// |query('SELECT value from views')
// ...
//
// Available Statistics:
//
// * query_errors -- number of errors when querying
// * connect_errors -- number of errors connecting to InfluxDB
// * batches_queried -- number of batches returned from queries
// * points_queried -- total number of points in batches
//
type BatchNode struct {
node
}
func newBatchNode() *BatchNode {
return &BatchNode{
node: node{
desc: "batch",
wants: NoEdge,
provides: BatchEdge,
},
}
}
// The query to execute. Must not contain a time condition
// in the `WHERE` clause or contain a `GROUP BY` clause.
// The time conditions are added dynamically according to the period, offset and schedule.
// The `GROUP BY` clause is added dynamically according to the dimensions
// passed to the `groupBy` method.
func (b *BatchNode) Query(q string) *QueryNode {
n := newQueryNode()
n.QueryStr = q
b.linkChild(n)
return n
}
// Do not add the source batch node to the dot output
// since its not really an edge.
// tick:ignore
func (b *BatchNode) dot(buf *bytes.Buffer) {
}
// A QueryNode defines a source and a schedule for
// processing batch data. The data is queried from
// an InfluxDB database and then passed into the data pipeline.
//
// Example:
// batch
// |query('''
// SELECT mean("value")
// FROM "telegraf"."default".cpu_usage_idle
// WHERE "host" = 'serverA'
// ''')
// .period(1m)
// .every(20s)
// .groupBy(time(10s), 'cpu')
// ...
//
// In the above example InfluxDB is queried every 20 seconds; the window of time returned
// spans 1 minute and is grouped into 10 second buckets.
type QueryNode struct {
chainnode
// The query text
//tick:ignore
QueryStr string
// The period or length of time that will be queried from InfluxDB
Period time.Duration
// How often to query InfluxDB.
//
// The Every property is mutually exclusive with the Cron property.
Every time.Duration
// Align start and end times with the Every value
// Does not apply if Cron is used.
// tick:ignore
AlignFlag bool `tick:"Align"`
// Define a schedule using a cron syntax.
//
// The specific cron implementation is documented here:
// https://github.com/gorhill/cronexpr#implementation
//
// The Cron property is mutually exclusive with the Every property.
Cron string
// How far back in time to query from the current time
//
// For example an Offest of 2 hours and an Every of 5m,
// Kapacitor will query InfluxDB every 5 minutes for the window of data 2 hours ago.
//
// This applies to Cron schedules as well. If the cron specifies to run every Sunday at
// 1 AM and the Offset is 1 hour. Then at 1 AM on Sunday the data from 12 AM will be queried.
Offset time.Duration
// The list of dimensions for the group-by clause.
//tick:ignore
Dimensions []interface{} `tick:"GroupBy"`
// Fill the data.
// Options are:
//
// - Any numerical value
// - null - exhibits the same behavior as the default
// - previous - reports the value of the previous window
// - none - suppresses timestamps and values where the value is null
Fill interface{}
// The name of a configured InfluxDB cluster.
// If empty the default cluster will be used.
Cluster string
}
func newQueryNode() *QueryNode {
b := &QueryNode{
chainnode: newBasicChainNode("query", BatchEdge, BatchEdge),
}
return b
}
//tick:ignore
func (n *QueryNode) ChainMethods() map[string]reflect.Value {
return map[string]reflect.Value{
"GroupBy": reflect.ValueOf(n.chainnode.GroupBy),
}
}
// Group the data by a set of dimensions.
// Can specify one time dimension.
//
// This property adds a `GROUP BY` clause to the query
// so all the normal behaviors when quering InfluxDB with a `GROUP BY` apply.
//
// Example:
// batch
// |query(...)
// .groupBy(time(10s), 'tag1', 'tag2'))
// .align()
//
// A group by time offset is also possible
//
// Example:
// batch
// |query(...)
// .groupBy(time(10s, -5s), 'tag1', 'tag2'))
// .align()
// .offset(5s)
//
// It is recommended to use QueryNode.Align and QueryNode.Offset in conjunction with
// group by time dimensions so that the time bounds match up with the group by intervals.
//
// NOTE: Since QueryNode.Offset is inherently a negative property the second "offset" argument to the "time" function is negative to match.
//
// tick:property
func (b *QueryNode) GroupBy(d ...interface{}) *QueryNode {
b.Dimensions = d
return b
}
// Align start and stop times for quiries with even boundaries of the QueryNode.Every property.
// Does not apply if using the QueryNode.Cron property.
// tick:property
func (b *QueryNode) Align() *QueryNode {
b.AlignFlag = true
return b
}