This repository has been archived by the owner on Jul 19, 2022. It is now read-only.
forked from influxdata/influxdb
/
compile.go
877 lines (778 loc) · 28.3 KB
/
compile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
package query
import (
"errors"
"fmt"
"strings"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxql"
)
// CompileOptions are the customization options for the compiler.
type CompileOptions struct {
Now time.Time
}
// Statement is a compiled query statement.
type Statement interface {
// Prepare prepares the statement by mapping shards and finishing the creation
// of the query plan.
Prepare(shardMapper ShardMapper, opt SelectOptions) (PreparedStatement, error)
}
// compiledStatement represents a select statement that has undergone some initial processing to
// determine if it is valid and to have some initial modifications done on the AST.
type compiledStatement struct {
// Condition is the condition used for accessing data.
Condition influxql.Expr
// TimeRange is the TimeRange for selecting data.
TimeRange influxql.TimeRange
// Interval holds the time grouping interval.
Interval Interval
// InheritedInterval marks if the interval was inherited by a parent.
// If this is set, then an interval that was inherited will not cause
// a query that shouldn't have an interval to fail.
InheritedInterval bool
// Ascending is true if the time ordering is ascending.
Ascending bool
// FunctionCalls holds a reference to the call expression of every function
// call that has been encountered.
FunctionCalls []*influxql.Call
// OnlySelectors is set to true when there are no aggregate functions.
OnlySelectors bool
// HasDistinct is set when the distinct() function is encountered.
HasDistinct bool
// FillOption contains the fill option for aggregates.
FillOption influxql.FillOption
// TopBottomFunction is set to top or bottom when one of those functions are
// used in the statement.
TopBottomFunction string
// HasAuxiliaryFields is true when the function requires auxiliary fields.
HasAuxiliaryFields bool
// Fields holds all of the fields that will be used.
Fields []*compiledField
// TimeFieldName stores the name of the time field's column.
// The column names generated by the compiler will not conflict with
// this name.
TimeFieldName string
// Limit is the number of rows per series this query should be limited to.
Limit int
// HasTarget is true if this query is being written into a target.
HasTarget bool
// Options holds the configured compiler options.
Options CompileOptions
stmt *influxql.SelectStatement
}
func newCompiler(opt CompileOptions) *compiledStatement {
if opt.Now.IsZero() {
opt.Now = time.Now().UTC()
}
return &compiledStatement{
OnlySelectors: true,
TimeFieldName: "time",
Options: opt,
}
}
func Compile(stmt *influxql.SelectStatement, opt CompileOptions) (Statement, error) {
c := newCompiler(opt)
if err := c.preprocess(stmt); err != nil {
return nil, err
}
if err := c.compile(stmt); err != nil {
return nil, err
}
c.stmt = stmt.Clone()
c.stmt.TimeAlias = c.TimeFieldName
c.stmt.Condition = c.Condition
// Convert DISTINCT into a call.
c.stmt.RewriteDistinct()
// Remove "time" from fields list.
c.stmt.RewriteTimeFields()
// Rewrite any regex conditions that could make use of the index.
c.stmt.RewriteRegexConditions()
return c, nil
}
// preprocess retrieves and records the global attributes of the current statement.
func (c *compiledStatement) preprocess(stmt *influxql.SelectStatement) error {
c.Ascending = stmt.TimeAscending()
c.Limit = stmt.Limit
c.HasTarget = stmt.Target != nil
valuer := influxql.NowValuer{Now: c.Options.Now, Location: stmt.Location}
cond, t, err := influxql.ConditionExpr(stmt.Condition, &valuer)
if err != nil {
return err
}
c.Condition = cond
c.TimeRange = t
// Read the dimensions of the query, validate them, and retrieve the interval
// if it exists.
if err := c.compileDimensions(stmt); err != nil {
return err
}
// Retrieve the fill option for the statement.
c.FillOption = stmt.Fill
// Resolve the min and max times now that we know if there is an interval or not.
if c.TimeRange.Min.IsZero() {
c.TimeRange.Min = time.Unix(0, influxql.MinTime).UTC()
}
if c.TimeRange.Max.IsZero() {
// If the interval is non-zero, then we have an aggregate query and
// need to limit the maximum time to now() for backwards compatibility
// and usability.
if !c.Interval.IsZero() {
c.TimeRange.Max = c.Options.Now
} else {
c.TimeRange.Max = time.Unix(0, influxql.MaxTime).UTC()
}
}
return nil
}
func (c *compiledStatement) compile(stmt *influxql.SelectStatement) error {
if err := c.compileFields(stmt); err != nil {
return err
}
if err := c.validateFields(); err != nil {
return err
}
// Look through the sources and compile each of the subqueries (if they exist).
// We do this after compiling the outside because subqueries may require
// inherited state.
for _, source := range stmt.Sources {
switch source := source.(type) {
case *influxql.SubQuery:
if err := c.subquery(source.Statement); err != nil {
return err
}
}
}
return nil
}
func (c *compiledStatement) compileFields(stmt *influxql.SelectStatement) error {
c.Fields = make([]*compiledField, 0, len(stmt.Fields))
for _, f := range stmt.Fields {
// Remove any time selection (it is automatically selected by default)
// and set the time column name to the alias of the time field if it exists.
// Such as SELECT time, max(value) FROM cpu will be SELECT max(value) FROM cpu
// and SELECT time AS timestamp, max(value) FROM cpu will return "timestamp"
// as the column name for the time.
if ref, ok := f.Expr.(*influxql.VarRef); ok && ref.Val == "time" {
if f.Alias != "" {
c.TimeFieldName = f.Alias
}
continue
}
// Append this field to the list of processed fields and compile it.
field := &compiledField{
global: c,
Field: f,
AllowWildcard: true,
}
c.Fields = append(c.Fields, field)
if err := field.compileExpr(f.Expr); err != nil {
return err
}
}
return nil
}
type compiledField struct {
// This holds the global state from the compiled statement.
global *compiledStatement
// Field is the top level field that is being compiled.
Field *influxql.Field
// AllowWildcard is set to true if a wildcard or regular expression is allowed.
AllowWildcard bool
}
// compileExpr creates the node that executes the expression and connects that
// node to the WriteEdge as the output.
func (c *compiledField) compileExpr(expr influxql.Expr) error {
switch expr := expr.(type) {
case *influxql.VarRef:
// A bare variable reference will require auxiliary fields.
c.global.HasAuxiliaryFields = true
return nil
case *influxql.Wildcard:
// Wildcards use auxiliary fields. We assume there will be at least one
// expansion.
c.global.HasAuxiliaryFields = true
if !c.AllowWildcard {
return errors.New("unable to use wildcard in a binary expression")
}
return nil
case *influxql.RegexLiteral:
if !c.AllowWildcard {
return errors.New("unable to use regex in a binary expression")
}
c.global.HasAuxiliaryFields = true
return nil
case *influxql.Call:
// Register the function call in the list of function calls.
c.global.FunctionCalls = append(c.global.FunctionCalls, expr)
switch expr.Name {
case "percentile":
return c.compilePercentile(expr.Args)
case "sample":
return c.compileSample(expr.Args)
case "distinct":
return c.compileDistinct(expr.Args)
case "top", "bottom":
return c.compileTopBottom(expr)
case "derivative", "non_negative_derivative":
isNonNegative := expr.Name == "non_negative_derivative"
return c.compileDerivative(expr.Args, isNonNegative)
case "difference", "non_negative_difference":
isNonNegative := expr.Name == "non_negative_difference"
return c.compileDifference(expr.Args, isNonNegative)
case "cumulative_sum":
return c.compileCumulativeSum(expr.Args)
case "moving_average":
return c.compileMovingAverage(expr.Args)
case "elapsed":
return c.compileElapsed(expr.Args)
case "integral":
return c.compileIntegral(expr.Args)
case "holt_winters", "holt_winters_with_fit":
withFit := expr.Name == "holt_winters_with_fit"
return c.compileHoltWinters(expr.Args, withFit)
default:
return c.compileFunction(expr)
}
case *influxql.Distinct:
call := expr.NewCall()
c.global.FunctionCalls = append(c.global.FunctionCalls, call)
return c.compileDistinct(call.Args)
case *influxql.BinaryExpr:
// Disallow wildcards in binary expressions. RewriteFields, which expands
// wildcards, is too complicated if we allow wildcards inside of expressions.
c.AllowWildcard = false
// Check if either side is a literal so we only compile one side if it is.
if _, ok := expr.LHS.(influxql.Literal); ok {
if _, ok := expr.RHS.(influxql.Literal); ok {
return errors.New("cannot perform a binary expression on two literals")
}
return c.compileExpr(expr.RHS)
} else if _, ok := expr.RHS.(influxql.Literal); ok {
return c.compileExpr(expr.LHS)
} else {
// Validate both sides of the expression.
if err := c.compileExpr(expr.LHS); err != nil {
return err
}
if err := c.compileExpr(expr.RHS); err != nil {
return err
}
return nil
}
case *influxql.ParenExpr:
return c.compileExpr(expr.Expr)
}
return errors.New("unimplemented")
}
func (c *compiledField) compileSymbol(name string, field influxql.Expr) error {
// Must be a variable reference, wildcard, or regexp.
switch field.(type) {
case *influxql.VarRef:
return nil
case *influxql.Wildcard:
if !c.AllowWildcard {
return fmt.Errorf("unsupported expression with wildcard: %s()", name)
}
c.global.OnlySelectors = false
return nil
case *influxql.RegexLiteral:
if !c.AllowWildcard {
return fmt.Errorf("unsupported expression with regex field: %s()", name)
}
c.global.OnlySelectors = false
return nil
default:
return fmt.Errorf("expected field argument in %s()", name)
}
}
func (c *compiledField) compileFunction(expr *influxql.Call) error {
// Validate the function call and mark down some meta properties
// related to the function for query validation.
switch expr.Name {
case "max", "min", "first", "last":
// top/bottom are not included here since they are not typical functions.
case "count", "sum", "mean", "median", "mode", "stddev", "spread":
// These functions are not considered selectors.
c.global.OnlySelectors = false
default:
return fmt.Errorf("undefined function %s()", expr.Name)
}
if exp, got := 1, len(expr.Args); exp != got {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
}
// If this is a call to count(), allow distinct() to be used as the function argument.
if expr.Name == "count" {
// If we have count(), the argument may be a distinct() call.
if arg0, ok := expr.Args[0].(*influxql.Call); ok && arg0.Name == "distinct" {
return c.compileDistinct(arg0.Args)
} else if arg0, ok := expr.Args[0].(*influxql.Distinct); ok {
call := arg0.NewCall()
return c.compileDistinct(call.Args)
}
}
return c.compileSymbol(expr.Name, expr.Args[0])
}
func (c *compiledField) compilePercentile(args []influxql.Expr) error {
if exp, got := 2, len(args); got != exp {
return fmt.Errorf("invalid number of arguments for percentile, expected %d, got %d", exp, got)
}
switch args[1].(type) {
case *influxql.IntegerLiteral:
case *influxql.NumberLiteral:
default:
return fmt.Errorf("expected float argument in percentile()")
}
return c.compileSymbol("percentile", args[0])
}
func (c *compiledField) compileSample(args []influxql.Expr) error {
if exp, got := 2, len(args); got != exp {
return fmt.Errorf("invalid number of arguments for sample, expected %d, got %d", exp, got)
}
switch arg1 := args[1].(type) {
case *influxql.IntegerLiteral:
if arg1.Val <= 0 {
return fmt.Errorf("sample window must be greater than 1, got %d", arg1.Val)
}
default:
return fmt.Errorf("expected integer argument in sample()")
}
return c.compileSymbol("sample", args[0])
}
func (c *compiledField) compileDerivative(args []influxql.Expr, isNonNegative bool) error {
name := "derivative"
if isNonNegative {
name = "non_negative_derivative"
}
if min, max, got := 1, 2, len(args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", name, min, max, got)
}
// Retrieve the duration from the derivative() call, if specified.
if len(args) == 2 {
switch arg1 := args[1].(type) {
case *influxql.DurationLiteral:
if arg1.Val <= 0 {
return fmt.Errorf("duration argument must be positive, got %s", influxql.FormatDuration(arg1.Val))
}
default:
return fmt.Errorf("second argument to %s must be a duration, got %T", name, args[1])
}
}
c.global.OnlySelectors = false
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", name)
}
return c.compileExpr(arg0)
default:
if !c.global.Interval.IsZero() {
return fmt.Errorf("aggregate function required inside the call to %s", name)
}
return c.compileSymbol(name, arg0)
}
}
func (c *compiledField) compileElapsed(args []influxql.Expr) error {
if min, max, got := 1, 2, len(args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for elapsed, expected at least %d but no more than %d, got %d", min, max, got)
}
// Retrieve the duration from the elapsed() call, if specified.
if len(args) == 2 {
switch arg1 := args[1].(type) {
case *influxql.DurationLiteral:
if arg1.Val <= 0 {
return fmt.Errorf("duration argument must be positive, got %s", influxql.FormatDuration(arg1.Val))
}
default:
return fmt.Errorf("second argument to elapsed must be a duration, got %T", args[1])
}
}
c.global.OnlySelectors = false
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("elapsed aggregate requires a GROUP BY interval")
}
return c.compileExpr(arg0)
default:
if !c.global.Interval.IsZero() {
return fmt.Errorf("aggregate function required inside the call to elapsed")
}
return c.compileSymbol("elapsed", arg0)
}
}
func (c *compiledField) compileDifference(args []influxql.Expr, isNonNegative bool) error {
name := "difference"
if isNonNegative {
name = "non_negative_difference"
}
if got := len(args); got != 1 {
return fmt.Errorf("invalid number of arguments for %s, expected 1, got %d", name, got)
}
c.global.OnlySelectors = false
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", name)
}
return c.compileExpr(arg0)
default:
if !c.global.Interval.IsZero() {
return fmt.Errorf("aggregate function required inside the call to %s", name)
}
return c.compileSymbol(name, arg0)
}
}
func (c *compiledField) compileCumulativeSum(args []influxql.Expr) error {
if got := len(args); got != 1 {
return fmt.Errorf("invalid number of arguments for cumulative_sum, expected 1, got %d", got)
}
c.global.OnlySelectors = false
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("cumulative_sum aggregate requires a GROUP BY interval")
}
return c.compileExpr(arg0)
default:
if !c.global.Interval.IsZero() {
return fmt.Errorf("aggregate function required inside the call to cumulative_sum")
}
return c.compileSymbol("cumulative_sum", arg0)
}
}
func (c *compiledField) compileMovingAverage(args []influxql.Expr) error {
if got := len(args); got != 2 {
return fmt.Errorf("invalid number of arguments for moving_average, expected 2, got %d", got)
}
switch arg1 := args[1].(type) {
case *influxql.IntegerLiteral:
if arg1.Val <= 1 {
return fmt.Errorf("moving_average window must be greater than 1, got %d", arg1.Val)
}
default:
return fmt.Errorf("second argument for moving_average must be an integer, got %T", args[1])
}
c.global.OnlySelectors = false
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("moving_average aggregate requires a GROUP BY interval")
}
return c.compileExpr(arg0)
default:
if !c.global.Interval.IsZero() {
return fmt.Errorf("aggregate function required inside the call to moving_average")
}
return c.compileSymbol("moving_average", arg0)
}
}
func (c *compiledField) compileIntegral(args []influxql.Expr) error {
if min, max, got := 1, 2, len(args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for integral, expected at least %d but no more than %d, got %d", min, max, got)
}
if len(args) == 2 {
switch arg1 := args[1].(type) {
case *influxql.DurationLiteral:
if arg1.Val <= 0 {
return fmt.Errorf("duration argument must be positive, got %s", influxql.FormatDuration(arg1.Val))
}
default:
return errors.New("second argument must be a duration")
}
}
c.global.OnlySelectors = false
// Must be a variable reference, wildcard, or regexp.
return c.compileSymbol("integral", args[0])
}
func (c *compiledField) compileHoltWinters(args []influxql.Expr, withFit bool) error {
name := "holt_winters"
if withFit {
name = "holt_winters_with_fit"
}
if exp, got := 3, len(args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", name, exp, got)
}
n, ok := args[1].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("expected integer argument as second arg in %s", name)
} else if n.Val <= 0 {
return fmt.Errorf("second arg to %s must be greater than 0, got %d", name, n.Val)
}
s, ok := args[2].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("expected integer argument as third arg in %s", name)
} else if s.Val < 0 {
return fmt.Errorf("third arg to %s cannot be negative, got %d", name, s.Val)
}
c.global.OnlySelectors = false
call, ok := args[0].(*influxql.Call)
if !ok {
return fmt.Errorf("must use aggregate function with %s", name)
} else if c.global.Interval.IsZero() {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", name)
}
return c.compileExpr(call)
}
func (c *compiledField) compileDistinct(args []influxql.Expr) error {
if len(args) == 0 {
return errors.New("distinct function requires at least one argument")
} else if len(args) != 1 {
return errors.New("distinct function can only have one argument")
}
if _, ok := args[0].(*influxql.VarRef); !ok {
return errors.New("expected field argument in distinct()")
}
c.global.HasDistinct = true
c.global.OnlySelectors = false
return nil
}
func (c *compiledField) compileTopBottom(call *influxql.Call) error {
if c.global.TopBottomFunction != "" {
return fmt.Errorf("selector function %s() cannot be combined with other functions", c.global.TopBottomFunction)
}
if exp, got := 2, len(call.Args); got < exp {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d, got %d", call.Name, exp, got)
}
limit, ok := call.Args[len(call.Args)-1].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("expected integer as last argument in %s(), found %s", call.Name, call.Args[len(call.Args)-1])
} else if limit.Val <= 0 {
return fmt.Errorf("limit (%d) in %s function must be at least 1", limit.Val, call.Name)
} else if c.global.Limit > 0 && int(limit.Val) > c.global.Limit {
return fmt.Errorf("limit (%d) in %s function can not be larger than the LIMIT (%d) in the select statement", limit.Val, call.Name, c.global.Limit)
}
if _, ok := call.Args[0].(*influxql.VarRef); !ok {
return fmt.Errorf("expected first argument to be a field in %s(), found %s", call.Name, call.Args[0])
}
if len(call.Args) > 2 {
for _, v := range call.Args[1 : len(call.Args)-1] {
ref, ok := v.(*influxql.VarRef)
if !ok {
return fmt.Errorf("only fields or tags are allowed in %s(), found %s", call.Name, v)
}
// Add a field for each of the listed dimensions when not writing the results.
if !c.global.HasTarget {
field := &compiledField{
global: c.global,
Field: &influxql.Field{Expr: ref},
}
c.global.Fields = append(c.global.Fields, field)
if err := field.compileExpr(ref); err != nil {
return err
}
}
}
}
c.global.TopBottomFunction = call.Name
return nil
}
func (c *compiledStatement) compileDimensions(stmt *influxql.SelectStatement) error {
for _, d := range stmt.Dimensions {
switch expr := d.Expr.(type) {
case *influxql.VarRef:
if strings.ToLower(expr.Val) == "time" {
return errors.New("time() is a function and expects at least one argument")
}
case *influxql.Call:
// Ensure the call is time() and it has one or two duration arguments.
// If we already have a duration
if expr.Name != "time" {
return errors.New("only time() calls allowed in dimensions")
} else if got := len(expr.Args); got < 1 || got > 2 {
return errors.New("time dimension expected 1 or 2 arguments")
} else if lit, ok := expr.Args[0].(*influxql.DurationLiteral); !ok {
return errors.New("time dimension must have duration argument")
} else if c.Interval.Duration != 0 {
return errors.New("multiple time dimensions not allowed")
} else {
c.Interval.Duration = lit.Val
if len(expr.Args) == 2 {
switch lit := expr.Args[1].(type) {
case *influxql.DurationLiteral:
c.Interval.Offset = lit.Val % c.Interval.Duration
case *influxql.TimeLiteral:
c.Interval.Offset = lit.Val.Sub(lit.Val.Truncate(c.Interval.Duration))
case *influxql.Call:
if lit.Name != "now" {
return errors.New("time dimension offset function must be now()")
} else if len(lit.Args) != 0 {
return errors.New("time dimension offset now() function requires no arguments")
}
now := c.Options.Now
c.Interval.Offset = now.Sub(now.Truncate(c.Interval.Duration))
case *influxql.StringLiteral:
// If literal looks like a date time then parse it as a time literal.
if lit.IsTimeLiteral() {
t, err := lit.ToTimeLiteral(stmt.Location)
if err != nil {
return err
}
c.Interval.Offset = t.Val.Sub(t.Val.Truncate(c.Interval.Duration))
} else {
return errors.New("time dimension offset must be duration or now()")
}
default:
return errors.New("time dimension offset must be duration or now()")
}
}
}
case *influxql.Wildcard:
case *influxql.RegexLiteral:
default:
return errors.New("only time and tag dimensions allowed")
}
}
return nil
}
// validateFields validates that the fields are mutually compatible with each other.
// This runs at the end of compilation but before linking.
func (c *compiledStatement) validateFields() error {
// Validate that at least one field has been selected.
if len(c.Fields) == 0 {
return errors.New("at least 1 non-time field must be queried")
}
// Ensure there are not multiple calls if top/bottom is present.
if len(c.FunctionCalls) > 1 && c.TopBottomFunction != "" {
return fmt.Errorf("selector function %s() cannot be combined with other functions", c.TopBottomFunction)
} else if len(c.FunctionCalls) == 0 {
switch c.FillOption {
case influxql.NoFill:
return errors.New("fill(none) must be used with a function")
case influxql.LinearFill:
return errors.New("fill(linear) must be used with a function")
}
if !c.Interval.IsZero() && !c.InheritedInterval {
return errors.New("GROUP BY requires at least one aggregate function")
}
}
// If a distinct() call is present, ensure there is exactly one function.
if c.HasDistinct && (len(c.FunctionCalls) != 1 || c.HasAuxiliaryFields) {
return errors.New("aggregate function distinct() cannot be combined with other functions or fields")
}
// Validate we are using a selector or raw query if auxiliary fields are required.
if c.HasAuxiliaryFields {
if !c.OnlySelectors {
return fmt.Errorf("mixing aggregate and non-aggregate queries is not supported")
} else if len(c.FunctionCalls) > 1 {
return fmt.Errorf("mixing multiple selector functions with tags or fields is not supported")
}
}
return nil
}
// subquery compiles and validates a compiled statement for the subquery using
// this compiledStatement as the parent.
func (c *compiledStatement) subquery(stmt *influxql.SelectStatement) error {
subquery := newCompiler(c.Options)
if err := subquery.preprocess(stmt); err != nil {
return err
}
// Substitute now() into the subquery condition. Then use ConditionExpr to
// validate the expression. Do not store the results. We have no way to store
// and read those results at the moment.
valuer := influxql.NowValuer{Now: c.Options.Now, Location: stmt.Location}
stmt.Condition = influxql.Reduce(stmt.Condition, &valuer)
// If the ordering is different and the sort field was specified for the subquery,
// throw an error.
if len(stmt.SortFields) != 0 && subquery.Ascending != c.Ascending {
return errors.New("subqueries must be ordered in the same direction as the query itself")
}
subquery.Ascending = c.Ascending
// Find the intersection between this time range and the parent.
// If the subquery doesn't have a time range, this causes it to
// inherit the parent's time range.
subquery.TimeRange = subquery.TimeRange.Intersect(c.TimeRange)
// If the fill option is null, set it to none so we don't waste time on
// null values with a redundant fill iterator.
if !subquery.Interval.IsZero() && subquery.FillOption == influxql.NullFill {
subquery.FillOption = influxql.NoFill
}
// Inherit the grouping interval if the subquery has none.
if !c.Interval.IsZero() && subquery.Interval.IsZero() {
subquery.Interval = c.Interval
subquery.InheritedInterval = true
}
return subquery.compile(stmt)
}
func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions) (PreparedStatement, error) {
// If this is a query with a grouping, there is a bucket limit, and the minimum time has not been specified,
// we need to limit the possible time range that can be used when mapping shards but not when actually executing
// the select statement. Determine the shard time range here.
timeRange := c.TimeRange
if sopt.MaxBucketsN > 0 && !c.stmt.IsRawQuery && timeRange.MinTimeNano() == influxql.MinTime {
interval, err := c.stmt.GroupByInterval()
if err != nil {
return nil, err
}
offset, err := c.stmt.GroupByOffset()
if err != nil {
return nil, err
}
if interval > 0 {
// Determine the last bucket using the end time.
opt := IteratorOptions{
Interval: Interval{
Duration: interval,
Offset: offset,
},
}
last, _ := opt.Window(c.TimeRange.MaxTimeNano() - 1)
// Determine the time difference using the number of buckets.
// Determine the maximum difference between the buckets based on the end time.
maxDiff := last - models.MinNanoTime
if maxDiff/int64(interval) > int64(sopt.MaxBucketsN) {
timeRange.Min = time.Unix(0, models.MinNanoTime)
} else {
timeRange.Min = time.Unix(0, last-int64(interval)*int64(sopt.MaxBucketsN-1))
}
}
}
// Create an iterator creator based on the shards in the cluster.
shards, err := shardMapper.MapShards(c.stmt.Sources, timeRange, sopt)
if err != nil {
return nil, err
}
// Rewrite wildcards, if any exist.
stmt, err := c.stmt.RewriteFields(shards)
if err != nil {
shards.Close()
return nil, err
}
// Determine base options for iterators.
opt, err := newIteratorOptionsStmt(stmt, sopt)
if err != nil {
shards.Close()
return nil, err
}
opt.StartTime, opt.EndTime = c.TimeRange.MinTimeNano(), c.TimeRange.MaxTimeNano()
opt.Ascending = c.Ascending
if sopt.MaxBucketsN > 0 && !stmt.IsRawQuery && c.TimeRange.MinTimeNano() > influxql.MinTime {
interval, err := stmt.GroupByInterval()
if err != nil {
shards.Close()
return nil, err
}
if interval > 0 {
// Determine the start and end time matched to the interval (may not match the actual times).
first, _ := opt.Window(opt.StartTime)
last, _ := opt.Window(opt.EndTime - 1)
// Determine the number of buckets by finding the time span and dividing by the interval.
buckets := (last - first + int64(interval)) / int64(interval)
if int(buckets) > sopt.MaxBucketsN {
shards.Close()
return nil, fmt.Errorf("max-select-buckets limit exceeded: (%d/%d)", buckets, sopt.MaxBucketsN)
}
}
}
columns := stmt.ColumnNames()
return &preparedStatement{
stmt: stmt,
opt: opt,
ic: shards,
columns: columns,
}, nil
}