forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
passthrough_engine.go
93 lines (81 loc) · 2.6 KB
/
passthrough_engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package engine
// This engine buffers points and passes them through without modification. Works for queries
// that can't be aggregated locally or queries that don't require it like deletes and drops.
import (
log "code.google.com/p/log4go"
"github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/protocol"
)
type Passthrough struct {
next Processor
series *protocol.Series
maxPointsInResponse int
limiter *Limiter
// query statistics
runStartTime float64
runEndTime float64
pointsRead int64
pointsWritten int64
shardId int
shardLocal bool
}
func NewPassthroughEngine(next Processor, maxPointsInResponse int) *Passthrough {
return NewPassthroughEngineWithLimit(next, maxPointsInResponse, 0)
}
func NewPassthroughEngineWithLimit(next Processor, maxPointsInResponse, limit int) *Passthrough {
passthroughEngine := &Passthrough{
next: next,
maxPointsInResponse: maxPointsInResponse,
limiter: NewLimiter(limit),
runStartTime: 0,
runEndTime: 0,
pointsRead: 0,
pointsWritten: 0,
shardId: 0,
shardLocal: false, //that really doesn't matter if it is not EXPLAIN query
}
return passthroughEngine
}
func (self *Passthrough) Yield(seriesIncoming *protocol.Series) (bool, error) {
log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points))
self.limiter.calculateLimitAndSlicePoints(seriesIncoming)
if len(seriesIncoming.Points) == 0 {
return false, nil
}
if self.series == nil {
self.series = seriesIncoming
} else if self.series.GetName() != seriesIncoming.GetName() {
log.Debug("Yielding to %s: %s", self.next.Name(), self.series)
ok, err := self.next.Yield(self.series)
if !ok || err != nil {
return ok, err
}
self.series = seriesIncoming
} else if len(self.series.Points) > self.maxPointsInResponse {
log.Debug("Yielding to %s: %s", self.next.Name(), self.series)
ok, err := self.next.Yield(self.series)
if !ok || err != nil {
return ok, err
}
self.series = seriesIncoming
} else {
self.series = common.MergeSeries(self.series, seriesIncoming)
}
return !self.limiter.hitLimit(seriesIncoming.GetName()), nil
}
func (self *Passthrough) Close() error {
if self.series != nil && self.series.Name != nil {
log.Debug("Passthrough Yielding to %s: %s", self.next.Name(), self.series)
_, err := self.next.Yield(self.series)
if err != nil {
return err
}
}
return self.next.Close()
}
func (self *Passthrough) Name() string {
return "PassthroughEngine"
}
func (self *Passthrough) Next() Processor {
return self.next
}