forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
shard_mapper.go
263 lines (220 loc) · 6.2 KB
/
shard_mapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package cluster
import (
"encoding/json"
"fmt"
"math/rand"
"net"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/services/meta"
"github.com/influxdb/influxdb/tsdb"
)
// ShardMapper is responsible for providing mappers for requested shards. It is
// responsible for creating those mappers from the local store, or reaching
// out to another node on the cluster.
type ShardMapper struct {
ForceRemoteMapping bool // All shards treated as remote. Useful for testing.
Node *influxdb.Node
MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
}
TSDBStore interface {
CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error)
}
timeout time.Duration
pool *clientPool
}
// NewShardMapper returns a mapper of local and remote shards.
func NewShardMapper(timeout time.Duration) *ShardMapper {
return &ShardMapper{
pool: newClientPool(),
timeout: timeout,
}
}
// CreateMapper returns a Mapper for the given shard ID.
func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
// Create a remote mapper if the local node doesn't own the shard.
if !sh.OwnedBy(s.Node.ID) || s.ForceRemoteMapping {
// Pick a node in a pseudo-random manner.
conn, err := s.dial(sh.Owners[rand.Intn(len(sh.Owners))].NodeID)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(s.timeout))
return NewRemoteMapper(conn, sh.ID, stmt, chunkSize), nil
}
// If it is local then return the mapper from the store.
m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}
return m, nil
}
func (s *ShardMapper) dial(nodeID uint64) (net.Conn, error) {
ni, err := s.MetaClient.DataNode(nodeID)
if err != nil {
return nil, err
}
conn, err := net.Dial("tcp", ni.TCPHost)
if err != nil {
return nil, err
}
// Write the cluster multiplexing header byte
conn.Write([]byte{MuxHeader})
return conn, nil
}
// RemoteMapper implements the tsdb.Mapper interface. It connects to a remote node,
// sends a query, and interprets the stream of data that comes back.
type RemoteMapper struct {
shardID uint64
stmt influxql.Statement
chunkSize int
tagsets []string
fields []string
conn net.Conn
bufferedResponse *MapShardResponse
unmarshallers []tsdb.UnmarshalFunc // Mapping-specific unmarshal functions.
}
// NewRemoteMapper returns a new remote mapper using the given connection.
func NewRemoteMapper(c net.Conn, shardID uint64, stmt influxql.Statement, chunkSize int) *RemoteMapper {
return &RemoteMapper{
conn: c,
shardID: shardID,
stmt: stmt,
chunkSize: chunkSize,
}
}
// Open connects to the remote node and starts receiving data.
func (r *RemoteMapper) Open() (err error) {
defer func() {
if err != nil {
r.conn.Close()
}
}()
// Build Map request.
var request MapShardRequest
request.SetShardID(r.shardID)
request.SetQuery(r.stmt.String())
request.SetChunkSize(int32(r.chunkSize))
// Marshal into protocol buffers.
buf, err := request.MarshalBinary()
if err != nil {
return err
}
// Write request.
if err := WriteTLV(r.conn, mapShardRequestMessage, buf); err != nil {
return err
}
// Read the response.
_, buf, err = ReadTLV(r.conn)
if err != nil {
return err
}
// Unmarshal response.
r.bufferedResponse = &MapShardResponse{}
if err := r.bufferedResponse.UnmarshalBinary(buf); err != nil {
return err
}
if r.bufferedResponse.Code() != 0 {
return fmt.Errorf("error code %d: %s", r.bufferedResponse.Code(), r.bufferedResponse.Message())
}
// Decode the first response to get the TagSets.
r.tagsets = r.bufferedResponse.TagSets()
r.fields = r.bufferedResponse.Fields()
// Set up each mapping function for this statement.
if stmt, ok := r.stmt.(*influxql.SelectStatement); ok {
for _, c := range stmt.FunctionCalls() {
fn, err := tsdb.InitializeUnmarshaller(c)
if err != nil {
return err
}
r.unmarshallers = append(r.unmarshallers, fn)
}
}
return nil
}
// TagSets returns the TagSets
func (r *RemoteMapper) TagSets() []string {
return r.tagsets
}
// Fields returns RemoteMapper's Fields
func (r *RemoteMapper) Fields() []string {
return r.fields
}
// NextChunk returns the next chunk read from the remote node to the client.
func (r *RemoteMapper) NextChunk() (chunk interface{}, err error) {
var response *MapShardResponse
if r.bufferedResponse != nil {
response = r.bufferedResponse
r.bufferedResponse = nil
} else {
response = &MapShardResponse{}
// Read the response.
_, buf, err := ReadTLV(r.conn)
if err != nil {
return nil, err
}
// Unmarshal response.
if err := response.UnmarshalBinary(buf); err != nil {
return nil, err
}
if response.Code() != 0 {
return nil, fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}
}
if response.Data() == nil {
return nil, nil
}
moj := &tsdb.MapperOutputJSON{}
if err := json.Unmarshal(response.Data(), moj); err != nil {
return nil, err
}
mvj := []*tsdb.MapperValueJSON{}
if err := json.Unmarshal(moj.Values, &mvj); err != nil {
return nil, err
}
// Prep the non-JSON version of Mapper output.
mo := &tsdb.MapperOutput{
Name: moj.Name,
Tags: moj.Tags,
Fields: moj.Fields,
CursorKey: moj.CursorKey,
}
if len(mvj) == 1 && len(mvj[0].AggData) > 0 {
// The MapperValue is carrying aggregate data, so run it through the
// custom unmarshallers for the map functions through which the data
// was mapped.
aggValues := []interface{}{}
for i, b := range mvj[0].AggData {
v, err := r.unmarshallers[i](b)
if err != nil {
return nil, err
}
aggValues = append(aggValues, v)
}
mo.Values = []*tsdb.MapperValue{&tsdb.MapperValue{
Time: mvj[0].Time,
Value: aggValues,
Tags: mvj[0].Tags,
}}
} else {
// Must be raw data instead.
for _, v := range mvj {
var rawValue interface{}
if err := json.Unmarshal(v.RawData, &rawValue); err != nil {
return nil, err
}
mo.Values = append(mo.Values, &tsdb.MapperValue{
Time: v.Time,
Value: rawValue,
Tags: v.Tags,
})
}
}
return mo, nil
}
// Close the Mapper
func (r *RemoteMapper) Close() {
r.conn.Close()
}