forked from dolthub/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
orchestrator.go
148 lines (129 loc) · 4.69 KB
/
orchestrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletmanager
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strconv"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/timer"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
"github.com/youtube/vitess/go/vt/topo/topoproto"
)
var (
orcAddr = flag.String("orc_api_url", "", "Address of Orchestrator's HTTP API (e.g. http://host:port/api/). Leave empty to disable Orchestrator integration.")
orcTimeout = flag.Duration("orc_timeout", 30*time.Second, "Timeout for calls to Orchestrator's HTTP API")
orcInterval = flag.Duration("orc_discover_interval", 0, "How often to ping Orchestrator's HTTP API endpoint to tell it we exist. 0 means never.")
)
type orcClient struct {
apiRoot *url.URL
httpClient *http.Client
}
// newOrcClient creates a client for the Orchestrator HTTP API.
// It should only be called after flags have been parsed.
func newOrcClient() (*orcClient, error) {
if *orcAddr == "" {
// Orchestrator integration is disabled.
return nil, nil
}
apiRoot, err := url.Parse(*orcAddr)
if err != nil {
return nil, fmt.Errorf("can't parse -orc_api_url flag value (%v): %v", *orcAddr, err)
}
return &orcClient{
apiRoot: apiRoot,
httpClient: &http.Client{Timeout: *orcTimeout},
}, nil
}
// DiscoverLoop periodically calls orc.discover() until process termination.
// The Tablet is read from the given agent each time before calling discover().
// Usually this will be launched as a background goroutine.
func (orc *orcClient) DiscoverLoop(agent *ActionAgent) {
if *orcInterval == 0 {
// 0 means never.
return
}
log.Infof("Starting periodic Orchestrator self-registration: API URL = %v, interval = %v", *orcAddr, *orcInterval)
// Randomly vary the interval by +/- 25% to reduce the potential for spikes.
ticker := timer.NewRandTicker(*orcInterval, *orcInterval/4)
// Remember whether we've most recently succeeded or failed.
var lastErr error
for {
// Do the first attempt immediately.
err := orc.Discover(agent.Tablet())
// Only log if we're transitioning between success and failure states.
if (err != nil) != (lastErr != nil) {
if err != nil {
log.Warningf("Orchestrator self-registration attempt failed: %v", err)
} else {
log.Infof("Orchestrator self-registration succeeded.")
}
}
lastErr = err
// Wait for the next tick.
// The only way to stop the loop is to terminate the process.
<-ticker.C
}
}
// Discover executes a single attempt to self-register with Orchestrator.
func (orc *orcClient) Discover(tablet *topodatapb.Tablet) error {
host, port, err := mysqlHostPort(tablet)
if err != nil {
return err
}
_, err = orc.apiGet("discover", host, port)
return err
}
// BeginMaintenance tells Orchestrator not to touch the given tablet
// until we call EndMaintenance().
func (orc *orcClient) BeginMaintenance(tablet *topodatapb.Tablet, reason string) error {
host, port, err := mysqlHostPort(tablet)
if err != nil {
return err
}
_, err = orc.apiGet("begin-maintenance", host, port, "vitess", reason)
return err
}
// EndMaintenance tells Orchestrator to remove the maintenance block on the
// given tablet, which should have been placed there by BeginMaintenance().
func (orc *orcClient) EndMaintenance(tablet *topodatapb.Tablet) error {
host, port, err := mysqlHostPort(tablet)
if err != nil {
return err
}
_, err = orc.apiGet("end-maintenance", host, port)
return err
}
func mysqlHostPort(tablet *topodatapb.Tablet) (host, port string, err error) {
mysqlPort := int(tablet.PortMap["mysql"])
if mysqlPort == 0 {
return "", "", fmt.Errorf("MySQL port is unknown for tablet %v (mysqld may not be running yet)", topoproto.TabletAliasString(tablet.Alias))
}
return tablet.Hostname, strconv.Itoa(mysqlPort), nil
}
// apiGet calls the given Orchestrator API endpoint.
// The final, assembled path will be URL-escaped, but things like '/' inside a
// path part can still confuse the HTTP API. We can't do anything about that
// because Orchestrator's API chose to put variable values in path elements
// rather than query arguments.
func (orc *orcClient) apiGet(pathParts ...string) ([]byte, error) {
// Append pathParts to a copy of the apiRoot URL.
url := *orc.apiRoot
fullPath := make([]string, 0, len(pathParts)+1)
fullPath = append(fullPath, url.Path)
fullPath = append(fullPath, pathParts...)
url.Path = path.Join(fullPath...)
// Note that url.String() will URL-escape the path we gave it above.
resp, err := orc.httpClient.Get(url.String())
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}