forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
meta_executor.go
171 lines (145 loc) · 4.12 KB
/
meta_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package cluster
import (
"fmt"
"log"
"net"
"os"
"sync"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)
const (
metaExecutorWriteTimeout = 5 * time.Second
metaExecutorMaxWriteConnections = 10
)
// MetaExecutor executes meta queries on all data nodes.
type MetaExecutor struct {
mu sync.RWMutex
timeout time.Duration
pool *clientPool
maxConnections int
Logger *log.Logger
Node *influxdb.Node
nodeExecutor interface {
executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error
}
MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
DataNodes() ([]meta.NodeInfo, error)
}
}
// NewMetaExecutor returns a new initialized *MetaExecutor.
func NewMetaExecutor() *MetaExecutor {
m := &MetaExecutor{
timeout: metaExecutorWriteTimeout,
pool: newClientPool(),
maxConnections: metaExecutorMaxWriteConnections,
Logger: log.New(os.Stderr, "[meta-executor] ", log.LstdFlags),
}
m.nodeExecutor = m
return m
}
// remoteNodeError wraps an error with context about a node that
// returned the error.
type remoteNodeError struct {
id uint64
err error
}
func (e remoteNodeError) Error() string {
return fmt.Sprintf("partial success, node %d may be down (%s)", e.id, e.err)
}
// ExecuteStatement executes a single InfluxQL statement on all nodes in the cluster concurrently.
func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error {
// Get a list of all nodes the query needs to be executed on.
nodes, err := m.MetaClient.DataNodes()
if err != nil {
return err
} else if len(nodes) < 1 {
return nil
}
// Start a goroutine to execute the statement on each of the remote nodes.
var wg sync.WaitGroup
errs := make(chan error, len(nodes)-1)
for _, node := range nodes {
if m.Node.ID == node.ID {
continue // Don't execute statement on ourselves.
}
wg.Add(1)
go func(node meta.NodeInfo) {
defer wg.Done()
if err := m.nodeExecutor.executeOnNode(stmt, database, &node); err != nil {
errs <- remoteNodeError{id: node.ID, err: err}
}
}(node)
}
// Wait on n-1 nodes to execute the statement and respond.
wg.Wait()
select {
case err = <-errs:
return err
default:
return nil
}
}
// executeOnNode executes a single InfluxQL statement on a single node.
func (m *MetaExecutor) executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error {
// We're executing on a remote node so establish a connection.
c, err := m.dial(node.ID)
if err != nil {
return err
}
conn, ok := c.(*pooledConn)
if !ok {
panic("wrong connection type in MetaExecutor")
}
// Return connection to pool by "closing" it.
defer conn.Close()
// Build RPC request.
var request ExecuteStatementRequest
request.SetStatement(stmt.String())
request.SetDatabase(database)
// Marshal into protocol buffer.
buf, err := request.MarshalBinary()
if err != nil {
return err
}
// Send request.
conn.SetWriteDeadline(time.Now().Add(m.timeout))
if err := WriteTLV(conn, executeStatementRequestMessage, buf); err != nil {
conn.MarkUnusable()
return err
}
// Read the response.
conn.SetReadDeadline(time.Now().Add(m.timeout))
_, buf, err = ReadTLV(conn)
if err != nil {
conn.MarkUnusable()
return err
}
// Unmarshal response.
var response ExecuteStatementResponse
if err := response.UnmarshalBinary(buf); err != nil {
return err
}
if response.Code() != 0 {
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}
return nil
}
// dial returns a connection to a single node in the cluster.
func (m *MetaExecutor) dial(nodeID uint64) (net.Conn, error) {
// If we don't have a connection pool for that addr yet, create one
_, ok := m.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: m.pool, timeout: m.timeout}
factory.metaClient = m.MetaClient
p, err := NewBoundedPool(1, m.maxConnections, m.timeout, factory.dial)
if err != nil {
return nil, err
}
m.pool.setPool(nodeID, p)
}
return m.pool.conn(nodeID)
}