forked from influxdata/influxdb
/
merge_channel_processor.go
158 lines (140 loc) · 4.75 KB
/
merge_channel_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package coordinator
import (
"errors"
"fmt"
"code.google.com/p/log4go"
"github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/engine"
"github.com/influxdb/influxdb/protocol"
)
// This struct is responsible for merging responses from multiple
// response channels and controlling the concurrency of querying by
// giving away `concurrency' channels at any given time. This is used
// in the coordinator to merge the responses received from different
// shards, which could be remote or local.
type MergeChannelProcessor struct {
next engine.Processor
c chan (<-chan *protocol.Response)
e chan error
}
// Return a new MergeChannelProcessor that will yield to `next'
func NewMergeChannelProcessor(next engine.Processor, concurrency int) *MergeChannelProcessor {
p := &MergeChannelProcessor{
next: next,
e: make(chan error, concurrency),
c: make(chan (<-chan *protocol.Response), concurrency),
}
// Fill `p.e' with `concurrency' nils, see NextChannel() for an
// explanation of why we do this.
for i := 0; i < concurrency; i++ {
p.e <- nil
}
return p
}
// Closes MergeChannelProcessor, this method has to make sure that all
// responses are received from the response channels. This is
// important since the protobuf client may block trying to insert a
// new response which will cause the entire node to stop receiving
// remote responses.
func (p *MergeChannelProcessor) Close() (err error) {
// Close the channels' channel. This will cause NextChannel() to
// panic if it was called after Close() is called and will cause
// ProcessChannels() to return since there are no more channels to
// read.
close(p.c)
// Read all errors reported by the ProcessChannels(), this for loop
// will not end until ProcessChannels() returns and `p.e' is closed
for e := range p.e {
if e != nil {
err = e
}
}
// At this point ProcessChannels() has returned and NextChannel()
// cannot be called. Go over all channels that were returned and
// make sure we read all responses on those channels.
for c := range p.c {
nextChannel:
for r := range c {
switch rt := r.GetType(); rt {
case protocol.Response_END_STREAM,
protocol.Response_HEARTBEAT:
break nextChannel
case protocol.Response_ERROR:
// If there were no errors from before, return this error
if err == nil {
err = errors.New(r.GetErrorMessage())
}
break nextChannel
case protocol.Response_QUERY:
continue // ignore the response, we're closing
default:
panic(fmt.Errorf("Unexpected response type: %s", rt))
}
}
}
return err
}
// Returns a new channel with buffer size `bs'. This method will block
// until there are channels available to return. Remember
// MergeChannelProcessor controls the concurrency of the query by
// guaranteeing no more than `concurrency' channels are given away at
// any given time.
func (p *MergeChannelProcessor) NextChannel(bs int) (chan<- *protocol.Response, error) {
// `p.e' serves two purpose in MergeChannelProcessor. To return
// errors received in ProcessChannels, and to control the
// concurrency. Initially `p.e' has `concurrency' nils in it which
// will allow the first `concurrency' calls to NextChannel() to
// return without any errors and without blocking. Successive calls
// to NextChannel() will wait until a nil or an error is inserted in
// `p.e'.
err := <-p.e
if err != nil {
return nil, err
}
c := make(chan *protocol.Response, bs)
p.c <- c
return c, nil
}
func (p *MergeChannelProcessor) String() string {
return fmt.Sprintf("MergeChannelProcessor (%d)", cap(p.e))
}
// Start processing the channels that are yielded by NextChannel() and
// yield the Series in the responses to the next Process.
func (p *MergeChannelProcessor) ProcessChannels() {
defer close(p.e)
for channel := range p.c {
if p.processChannel(channel) {
return
}
}
}
// Process responses from the given channel. Returns true if
// processing should stop for other channels. False otherwise.
func (p *MergeChannelProcessor) processChannel(channel <-chan *protocol.Response) bool {
for response := range channel {
log4go.Debug("%s received %s", p, response)
switch rt := response.GetType(); rt {
// all these types end the stream
case protocol.Response_HEARTBEAT,
protocol.Response_END_STREAM:
p.e <- nil
return false
case protocol.Response_ERROR:
err := common.NewQueryError(common.InvalidArgument, response.GetErrorMessage())
p.e <- err
return false
case protocol.Response_QUERY:
for _, s := range response.MultiSeries {
log4go.Debug("Yielding to %s: %s", p.next.Name(), s)
_, err := p.next.Yield(s)
if err != nil {
p.e <- err
return true
}
}
default:
panic(fmt.Errorf("Unknown response type: %s", rt))
}
}
panic(errors.New("Reached end of method"))
}