forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mysqlctl.go
329 lines (286 loc) · 12.5 KB
/
mysqlctl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"flag"
"fmt"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl"
)
var (
port = flag.Int("port", 6612, "vtocc port")
mysqlPort = flag.Int("mysql_port", 3306, "mysql port")
tabletUid = flag.Uint("tablet_uid", 41983, "tablet uid")
mysqlSocket = flag.String("mysql_socket", "", "path to the mysql socket")
follow = flag.Bool("follow", false, "For init or start actions, keep mysqlctl running as long as the underlying server is running. If mysqlctl is told to stop, it stops the server.")
tabletAddr string
)
func waitForSignal(mysqld *mysqlctl.Mysqld, waitTime time.Duration) {
log.Infof("waiting for signal or server shutdown...")
sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
select {
case <-mysqld.Done():
log.Infof("server shut down on its own")
case <-sig:
log.Infof("signal received, shutting down server")
mysqld.Shutdown(true, waitTime)
}
}
func initCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for startup")
bootstrapArchive := subFlags.String("bootstrap_archive", "mysql-db-dir.tbz", "name of bootstrap archive within vitess/data/bootstrap directory")
skipSchema := subFlags.Bool("skip_schema", false, "don't apply initial schema")
subFlags.Parse(args)
if err := mysqld.Init(*waitTime, *bootstrapArchive, *skipSchema); err != nil {
log.Fatalf("failed init mysql: %v", err)
}
if *follow {
waitForSignal(mysqld, *waitTime)
}
}
func multisnapshotCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
concurrency := subFlags.Int("concurrency", 8, "how many compression jobs to run simultaneously")
spec := subFlags.String("spec", "-", "shard specification")
tablesString := subFlags.String("tables", "", "dump only this comma separated list of regexp for tables")
excludeTablesString := subFlags.String("exclude_tables", "", "do not dump this comma separated list of regexp for tables")
skipSlaveRestart := subFlags.Bool("skip_slave_restart", false, "after the snapshot is done, do not restart slave replication")
maximumFilesize := subFlags.Uint64("maximum_file_size", 128*1024*1024, "the maximum size for an uncompressed data file")
keyType := subFlags.String("key_type", "uint64", "type of the key column")
subFlags.Parse(args)
if subFlags.NArg() != 2 {
log.Fatalf("action multisnapshot requires <db name> <key name>")
}
shards, err := key.ParseShardingSpec(*spec)
if err != nil {
log.Fatalf("multisnapshot failed: %v", err)
}
var tables []string
if *tablesString != "" {
tables = strings.Split(*tablesString, ",")
}
var excludedTables []string
if *excludeTablesString != "" {
excludedTables = strings.Split(*excludeTablesString, ",")
}
kit := key.KeyspaceIdType(*keyType)
if !key.IsKeyspaceIdTypeInList(kit, key.AllKeyspaceIdTypes) {
log.Fatalf("invalid key_type")
}
filenames, err := mysqld.CreateMultiSnapshot(logutil.NewConsoleLogger(), shards, subFlags.Arg(0), subFlags.Arg(1), kit, tabletAddr, false, *concurrency, tables, excludedTables, *skipSlaveRestart, *maximumFilesize, nil)
if err != nil {
log.Fatalf("multisnapshot failed: %v", err)
} else {
log.Infof("manifest locations: %v", filenames)
}
}
func multiRestoreCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
starts := subFlags.String("starts", "", "starts of the key range")
ends := subFlags.String("ends", "", "ends of the key range")
fetchRetryCount := subFlags.Int("fetch_retry_count", 3, "how many times to retry a failed transfer")
concurrency := subFlags.Int("concurrency", 8, "how many concurrent db inserts to run simultaneously")
fetchConcurrency := subFlags.Int("fetch_concurrency", 4, "how many files to fetch simultaneously")
insertTableConcurrency := subFlags.Int("insert_table_concurrency", 4, "how many myisam tables to load into a single destination table simultaneously")
strategy := subFlags.String("strategy", "", "which strategy to use for restore, can contain:\n"+
" skipAutoIncrement(TTT): we won't add the AUTO_INCREMENT back to that table\n"+
" delayPrimaryKey: we won't add the primary key until after the table is populated\n"+
" delaySecondaryIndexes: we won't add the secondary indexes until after the table is populated\n"+
" useMyIsam: create the table as MyISAM, then convert it to InnoDB after population\n"+
" writeBinLogs: write all operations to the binlogs")
subFlags.Parse(args)
if subFlags.NArg() < 2 {
log.Fatalf("multirestore requires <destination_dbname> <source_host>[/<source_dbname>]... %v", args)
}
startArray := strings.Split(*starts, ",")
endArray := strings.Split(*ends, ",")
if len(startArray) != len(endArray) || len(startArray) != subFlags.NArg()-1 {
log.Fatalf("Need as many starts and ends as source URLs")
}
keyRanges := make([]key.KeyRange, len(startArray))
for i, s := range startArray {
var err error
keyRanges[i], err = key.ParseKeyRangeParts(s, endArray[i])
if err != nil {
log.Fatalf("Invalid start or end: %v", err)
}
}
dbName, dbis := subFlags.Arg(0), subFlags.Args()[1:]
sources := make([]*url.URL, len(dbis))
for i, dbi := range dbis {
if !strings.HasPrefix(dbi, "vttp://") && !strings.HasPrefix(dbi, "http://") {
dbi = "vttp://" + dbi
}
dbUrl, err := url.Parse(dbi)
if err != nil {
log.Fatalf("incorrect source url: %v", err)
}
sources[i] = dbUrl
}
if err := mysqld.MultiRestore(logutil.NewConsoleLogger(), dbName, keyRanges, sources, nil, *concurrency, *fetchConcurrency, *insertTableConcurrency, *fetchRetryCount, *strategy); err != nil {
log.Fatalf("multirestore failed: %v", err)
}
}
func restoreCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
dontWaitForSlaveStart := subFlags.Bool("dont_wait_for_slave_start", false, "won't wait for replication to start (useful when restoring from master server)")
fetchConcurrency := subFlags.Int("fetch_concurrency", 3, "how many files to fetch simultaneously")
fetchRetryCount := subFlags.Int("fetch_retry_count", 3, "how many times to retry a failed transfer")
subFlags.Parse(args)
if subFlags.NArg() != 1 {
log.Fatalf("Command restore requires <snapshot manifest file>")
}
rs, err := mysqlctl.ReadSnapshotManifest(subFlags.Arg(0))
if err == nil {
err = mysqld.RestoreFromSnapshot(logutil.NewConsoleLogger(), rs, *fetchConcurrency, *fetchRetryCount, *dontWaitForSlaveStart, nil)
}
if err != nil {
log.Fatalf("restore failed: %v", err)
}
}
func shutdownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for shutdown")
subFlags.Parse(args)
if mysqlErr := mysqld.Shutdown(true, *waitTime); mysqlErr != nil {
log.Fatalf("failed shutdown mysql: %v", mysqlErr)
}
}
func snapshotCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
concurrency := subFlags.Int("concurrency", 4, "how many compression jobs to run simultaneously")
subFlags.Parse(args)
if subFlags.NArg() != 1 {
log.Fatalf("Command snapshot requires <db name>")
}
filename, _, _, err := mysqld.CreateSnapshot(logutil.NewConsoleLogger(), subFlags.Arg(0), tabletAddr, false, *concurrency, false, nil)
if err != nil {
log.Fatalf("snapshot failed: %v", err)
} else {
log.Infof("manifest location: %v", filename)
}
}
func snapshotSourceStartCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
concurrency := subFlags.Int("concurrency", 4, "how many checksum jobs to run simultaneously")
subFlags.Parse(args)
if subFlags.NArg() != 1 {
log.Fatalf("Command snapshotsourcestart requires <db name>")
}
filename, slaveStartRequired, readOnly, err := mysqld.CreateSnapshot(logutil.NewConsoleLogger(), subFlags.Arg(0), tabletAddr, false, *concurrency, true, nil)
if err != nil {
log.Fatalf("snapshot failed: %v", err)
} else {
log.Infof("manifest location: %v", filename)
log.Infof("slave start required: %v", slaveStartRequired)
log.Infof("read only: %v", readOnly)
}
}
func snapshotSourceEndCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
slaveStartRequired := subFlags.Bool("slave_start", false, "will restart replication")
readWrite := subFlags.Bool("read_write", false, "will make the server read-write")
subFlags.Parse(args)
err := mysqld.SnapshotSourceEnd(*slaveStartRequired, !(*readWrite), true, map[string]string{})
if err != nil {
log.Fatalf("snapshotsourceend failed: %v", err)
}
}
func startCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
waitTime := subFlags.Duration("wait_time", mysqlctl.MysqlWaitTime, "how long to wait for startup")
subFlags.Parse(args)
if err := mysqld.Start(*waitTime); err != nil {
log.Fatalf("failed start mysql: %v", err)
}
if *follow {
waitForSignal(mysqld, *waitTime)
}
}
func teardownCmd(mysqld *mysqlctl.Mysqld, subFlags *flag.FlagSet, args []string) {
force := subFlags.Bool("force", false, "will remove the root directory even if mysqld shutdown fails")
subFlags.Parse(args)
if err := mysqld.Teardown(*force); err != nil {
log.Fatalf("failed teardown mysql (forced? %v): %v", *force, err)
}
}
type command struct {
name string
method func(*mysqlctl.Mysqld, *flag.FlagSet, []string)
params string
help string
}
var commands = []command{
command{"init", initCmd, "[-wait_time=20s] [-bootstrap_archive=mysql-db-dir.tbz] [-skip_schema]",
"Initalizes the directory structure and starts mysqld"},
command{"teardown", teardownCmd, "[-force]",
"Shuts mysqld down, and removes the directory"},
command{"start", startCmd, "[-wait_time=20s]",
"Starts mysqld on an already 'init'-ed directory"},
command{"shutdown", shutdownCmd, "[-wait_time=20s]",
"Shuts down mysqld, does not remove any file"},
command{"snapshot", snapshotCmd,
"[-concurrency=4] <db name>",
"Takes a full snapshot, copying the innodb data files"},
command{"snapshotsourcestart", snapshotSourceStartCmd,
"[-concurrency=4] <db name>",
"Enters snapshot server mode (mysqld stopped, serving innodb data files)"},
command{"snapshotsourceend", snapshotSourceEndCmd,
"[-slave_start] [-read_write]",
"Gets out of snapshot server mode"},
command{"restore", restoreCmd,
"[-fetch_concurrency=3] [-fetch_retry_count=3] [-dont_wait_for_slave_start] <snapshot manifest file>",
"Restores a full snapshot"},
command{"multirestore", multiRestoreCmd,
"[-force] [-concurrency=3] [-fetch_concurrency=4] [-insert_table_concurrency=4] [-fetch_retry_count=3] [-starts=start1,start2,...] [-ends=end1,end2,...] [-strategy=] <destination_dbname> <source_host>[/<source_dbname>]...",
"Restores a snapshot form multiple hosts"},
command{"multisnapshot", multisnapshotCmd, "[-concurrency=8] [-spec='-'] [-tables=''] [-exclude_tables=''] [-skip_slave_restart] [-maximum_file_size=134217728] <db name> <key name>",
"Makes a complete snapshot using 'select * into' commands."},
}
func main() {
defer logutil.Flush()
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [global parameters] command [command parameters]\n", os.Args[0])
fmt.Fprintf(os.Stderr, "\nThe global optional parameters are:\n")
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "\nThe commands are listed below. Use '%s <command> -h' for more help.\n\n", os.Args[0])
for _, cmd := range commands {
fmt.Fprintf(os.Stderr, " %s", cmd.name)
if cmd.params != "" {
fmt.Fprintf(os.Stderr, " %s", cmd.params)
}
fmt.Fprintf(os.Stderr, "\n")
}
fmt.Fprintf(os.Stderr, "\n")
}
dbconfigs.RegisterFlags()
flag.Parse()
tabletAddr = fmt.Sprintf("%v:%v", "localhost", *port)
mycnf := mysqlctl.NewMycnf(uint32(*tabletUid), *mysqlPort)
if *mysqlSocket != "" {
mycnf.SocketFile = *mysqlSocket
}
dbcfgs, err := dbconfigs.Init(mycnf.SocketFile)
if err != nil {
log.Fatalf("%v", err)
}
mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl)
defer mysqld.Close()
action := flag.Arg(0)
for _, cmd := range commands {
if cmd.name == action {
subFlags := flag.NewFlagSet(action, flag.ExitOnError)
subFlags.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s %s %s\n\n", os.Args[0], cmd.name, cmd.params)
fmt.Fprintf(os.Stderr, "%s\n\n", cmd.help)
subFlags.PrintDefaults()
}
cmd.method(mysqld, subFlags, flag.Args()[1:])
return
}
}
log.Fatalf("invalid action: %v", action)
}