forked from vitessio/vitess
/
vertical_split_task.go
113 lines (99 loc) · 3.93 KB
/
vertical_split_task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package automation
import (
"strings"
automationpb "github.com/youtube/vitess/go/vt/proto/automation"
"github.com/youtube/vitess/go/vt/topo/topoproto"
)
// VerticalSplitTask is a cluster operation to split out specific tables of one
// keyspace to a different keyspace.
type VerticalSplitTask struct {
}
// Run is part of the Task interface.
func (t *VerticalSplitTask) Run(parameters map[string]string) ([]*automationpb.TaskContainer, string, error) {
// Required parameters.
// Example: source_keyspace
sourceKeyspace := parameters["source_keyspace"]
// Example: destination_keyspace
destKeyspace := parameters["dest_keyspace"]
// Example: 10-20
shards := strings.Split(parameters["shard_list"], ",")
// Example: table1,table2
tables := parameters["tables"]
// Example: localhost:15000
vtctldEndpoint := parameters["vtctld_endpoint"]
// Example: localhost:15001
vtworkerEndpoint := parameters["vtworker_endpoint"]
// Optional parameters.
// Example: 1
minHealthyRdonlyTablets := parameters["min_healthy_rdonly_tablets"]
var newTasks []*automationpb.TaskContainer
copySchemaTasks := NewTaskContainer()
for _, shard := range shards {
AddTask(copySchemaTasks, "CopySchemaShardTask", map[string]string{
"source_keyspace_and_shard": topoproto.KeyspaceShardString(sourceKeyspace, shard),
"dest_keyspace_and_shard": topoproto.KeyspaceShardString(destKeyspace, shard),
"vtctld_endpoint": vtctldEndpoint,
"tables": tables,
})
}
newTasks = append(newTasks, copySchemaTasks)
vSplitCloneTasks := NewTaskContainer()
for _, shard := range shards {
// TODO(mberlin): Add a semaphore as argument to limit the parallism.
AddTask(vSplitCloneTasks, "VerticalSplitCloneTask", map[string]string{
"dest_keyspace": destKeyspace,
"shard": shard,
"tables": tables,
"vtworker_endpoint": vtworkerEndpoint,
"min_healthy_rdonly_tablets": minHealthyRdonlyTablets,
})
}
newTasks = append(newTasks, vSplitCloneTasks)
// TODO(mberlin): When the framework supports nesting tasks, these wait tasks should be run before each SplitDiff.
waitTasks := NewTaskContainer()
for _, shard := range shards {
AddTask(waitTasks, "WaitForFilteredReplicationTask", map[string]string{
"keyspace": destKeyspace,
"shard": shard,
"max_delay": "30s",
"vtctld_endpoint": vtctldEndpoint,
})
}
newTasks = append(newTasks, waitTasks)
// TODO(mberlin): Run all SplitDiffTasks in parallel which do not use overlapping source shards for the comparison.
for _, shard := range shards {
vSplitDiffTask := NewTaskContainer()
AddTask(vSplitDiffTask, "VerticalSplitDiffTask", map[string]string{
"dest_keyspace": destKeyspace,
"shard": shard,
"vtworker_endpoint": vtworkerEndpoint,
"min_healthy_rdonly_tablets": minHealthyRdonlyTablets,
})
newTasks = append(newTasks, vSplitDiffTask)
}
for _, servedType := range []string{"rdonly", "replica", "master"} {
migrateServedTypesTasks := NewTaskContainer()
for _, shard := range shards {
AddTask(migrateServedTypesTasks, "MigrateServedFromTask", map[string]string{
"dest_keyspace": destKeyspace,
"shard": shard,
"type": servedType,
"vtctld_endpoint": vtctldEndpoint,
})
}
newTasks = append(newTasks, migrateServedTypesTasks)
}
return newTasks, "", nil
}
// RequiredParameters is part of the Task interface.
func (t *VerticalSplitTask) RequiredParameters() []string {
return []string{"source_keyspace", "dest_keyspace", "shard_list",
"tables", "vtctld_endpoint", "vtworker_endpoint"}
}
// OptionalParameters is part of the Task interface.
func (t *VerticalSplitTask) OptionalParameters() []string {
return []string{"min_healthy_rdonly_tablets"}
}