forked from vitessio/vitess
/
l2vtgategateway.go
252 lines (218 loc) · 7.76 KB
/
l2vtgategateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gateway
import (
"flag"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/flagutil"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/vttablet/queryservice"
"github.com/youtube/vitess/go/vt/vttablet/tabletconn"
querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
const (
gatewayImplementationL2VTGate = "l2vtgategateway"
)
var (
l2VTGateGatewayAddrs flagutil.StringListValue
)
func init() {
flag.Var(&l2VTGateGatewayAddrs, "l2vtgategateway_addrs", "Specifies a comma-separated list of 'addr|keyspace|shard_name or keyrange' values for l2vtgate locations")
RegisterCreator(gatewayImplementationL2VTGate, createL2VTGateGateway)
}
// l2VTGateConn is a connection to a backend l2vtgate pool
type l2VTGateConn struct {
// set at construction time
addr string
keyspace string
shard string
keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange
conn queryservice.QueryService
}
// l2VTGateGateway is the main gateway object
type l2VTGateGateway struct {
queryservice.QueryService
// retryCount is set at construction time
retryCount int
// mu protects all fields below.
mu sync.RWMutex
// connMap is the main map to find the right l2 vtgate pool.
// It is indexed by keyspace name.
connMap map[string][]*l2VTGateConn
// tabletConnMap is a map of address to queryservice.QueryService objects.
// It is used so we don't open multiple connections to the same backend.
tabletConnMap map[string]queryservice.QueryService
// statusAggregators is a map indexed by the key
// l2vtgate address + tablet type
statusAggregators map[string]*TabletStatusAggregator
}
func createL2VTGateGateway(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, cell string, retryCount int) Gateway {
lg := &l2VTGateGateway{
retryCount: retryCount,
connMap: make(map[string][]*l2VTGateConn),
tabletConnMap: make(map[string]queryservice.QueryService),
statusAggregators: make(map[string]*TabletStatusAggregator),
}
for _, a := range l2VTGateGatewayAddrs {
parts := strings.Split(a, "|")
if len(parts) != 3 {
log.Fatalf("invalid l2vtgategateway_addrs parameter: %v", a)
}
if err := lg.addL2VTGateConn(parts[0], parts[1], parts[2]); err != nil {
log.Fatalf("error adding l2vtgategateway_addrs value %v: %v", a, err)
}
}
lg.QueryService = queryservice.Wrap(nil, lg.withRetry)
return lg
}
// addL2VTGateConn adds a backend l2vtgate for the provided keyspace / shard.
func (lg *l2VTGateGateway) addL2VTGateConn(addr, keyspace, shard string) error {
lg.mu.Lock()
defer lg.mu.Unlock()
// extract keyrange if it's a range
canonical, kr, err := topo.ValidateShardName(shard)
if err != nil {
return fmt.Errorf("error parsing shard name %v: %v", shard, err)
}
// check for duplicates
for _, c := range lg.connMap[keyspace] {
if c.shard == canonical {
return fmt.Errorf("duplicate %v/%v entry", keyspace, shard)
}
}
// See if we already have a valid connection
conn, ok := lg.tabletConnMap[addr]
if !ok {
// Dial in the background, as specified by timeout=0.
conn, err = tabletconn.GetDialer()(&topodatapb.Tablet{
Hostname: addr,
}, 0)
if err != nil {
return err
}
lg.tabletConnMap[addr] = conn
}
lg.connMap[keyspace] = append(lg.connMap[keyspace], &l2VTGateConn{
addr: addr,
keyspace: keyspace,
shard: canonical,
keyRange: kr,
conn: conn,
})
return nil
}
// WaitForTablets is part of the Gateway interface. We don't implement it,
// as we don't have anything to wait for.
func (lg *l2VTGateGateway) WaitForTablets(ctx context.Context, tabletTypesToWait []topodatapb.TabletType) error {
return nil
}
// StreamHealth is currently not implemented.
// This function hides the inner implementation.
// TODO(alainjobart): Maybe we should?
func (lg *l2VTGateGateway) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
panic("not implemented")
}
// Close shuts down underlying connections.
// This function hides the inner implementation.
func (lg *l2VTGateGateway) Close(ctx context.Context) error {
lg.mu.Lock()
defer lg.mu.Unlock()
// This will wait for all on-going queries before returning.
for _, c := range lg.tabletConnMap {
c.Close(ctx)
}
lg.tabletConnMap = make(map[string]queryservice.QueryService)
lg.connMap = make(map[string][]*l2VTGateConn)
return nil
}
// CacheStatus returns a list of TabletCacheStatus per
// keyspace/shard/tablet_type.
func (lg *l2VTGateGateway) CacheStatus() TabletCacheStatusList {
lg.mu.RLock()
res := make(TabletCacheStatusList, 0, len(lg.statusAggregators))
for _, aggr := range lg.statusAggregators {
res = append(res, aggr.GetCacheStatus())
}
lg.mu.RUnlock()
sort.Sort(res)
return res
}
// getConn returns the right l2VTGateConn for a given keyspace / shard.
func (lg *l2VTGateGateway) getConn(keyspace, shard string) (*l2VTGateConn, error) {
lg.mu.RLock()
defer lg.mu.RUnlock()
canonical, kr, err := topo.ValidateShardName(shard)
if err != nil {
return nil, fmt.Errorf("invalid shard name: %v", shard)
}
for _, c := range lg.connMap[keyspace] {
if canonical == c.shard {
// Exact match (probably a non-sharded keyspace).
return c, nil
}
if kr != nil && c.keyRange != nil && key.KeyRangeIncludes(c.keyRange, kr) {
// The shard KeyRange is included in this destination's
// KeyRange, that's the destination we want.
return c, nil
}
}
return nil, fmt.Errorf("no configured destination for %v/%v", keyspace, shard)
}
// withRetry gets available connections and executes the action. If there are retryable errors,
// it retries retryCount times before failing. It does not retry if the connection is in
// the middle of a transaction. While returning the error check if it maybe a result of
// a resharding event, and set the re-resolve bit and let the upper layers
// re-resolve and retry.
func (lg *l2VTGateGateway) withRetry(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, inner func(context.Context, *querypb.Target, queryservice.QueryService) (error, bool)) error {
l2conn, err := lg.getConn(target.Keyspace, target.Shard)
if err != nil {
return fmt.Errorf("no configured destination for %v/%v: %v", target.Keyspace, target.Shard, err)
}
for i := 0; i < lg.retryCount+1; i++ {
startTime := time.Now()
var canRetry bool
err, canRetry = inner(ctx, target, l2conn.conn)
lg.updateStats(l2conn, target.TabletType, startTime, err)
if canRetry {
continue
}
break
}
return NewShardError(err, target, nil, inTransaction)
}
func (lg *l2VTGateGateway) updateStats(conn *l2VTGateConn, tabletType topodatapb.TabletType, startTime time.Time, err error) {
elapsed := time.Now().Sub(startTime)
aggr := lg.getStatsAggregator(conn, tabletType)
aggr.UpdateQueryInfo("", tabletType, elapsed, err != nil)
}
func (lg *l2VTGateGateway) getStatsAggregator(conn *l2VTGateConn, tabletType topodatapb.TabletType) *TabletStatusAggregator {
key := fmt.Sprintf("%v:%v", conn.addr, topoproto.TabletTypeLString(tabletType))
// get existing aggregator
lg.mu.RLock()
aggr, ok := lg.statusAggregators[key]
lg.mu.RUnlock()
if ok {
return aggr
}
// create a new one, but check again before the creation
lg.mu.Lock()
defer lg.mu.Unlock()
aggr, ok = lg.statusAggregators[key]
if ok {
return aggr
}
aggr = NewTabletStatusAggregator(conn.keyspace, conn.shard, tabletType, key)
lg.statusAggregators[key] = aggr
return aggr
}