forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
schemamanager.go
155 lines (134 loc) · 4.55 KB
/
schemamanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schemamanager
import (
"encoding/json"
"fmt"
"time"
"golang.org/x/net/context"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
)
const (
// SchemaChangeDirName is the key name in the ControllerFactory params.
// It specifies the schema change directory.
SchemaChangeDirName = "schema_change_dir"
// SchemaChangeUser is the key name in the ControllerFactory params.
// It specifies the user who submits this schema change.
SchemaChangeUser = "schema_change_user"
)
// ControllerFactory takes a set params and construct a Controller instance.
type ControllerFactory func(params map[string]string) (Controller, error)
var (
controllerFactories = make(map[string]ControllerFactory)
)
// Controller is responsible for getting schema change for a
// certain keyspace and also handling various events happened during schema
// change.
type Controller interface {
Open(ctx context.Context) error
Read(ctx context.Context) (sqls []string, err error)
Close()
Keyspace() string
OnReadSuccess(ctx context.Context) error
OnReadFail(ctx context.Context, err error) error
OnValidationSuccess(ctx context.Context) error
OnValidationFail(ctx context.Context, err error) error
OnExecutorComplete(ctx context.Context, result *ExecuteResult) error
}
// Executor applies schema changes to underlying system
type Executor interface {
Open(ctx context.Context, keyspace string) error
Validate(ctx context.Context, sqls []string) error
Execute(ctx context.Context, sqls []string) *ExecuteResult
Close()
}
// ExecuteResult contains information about schema management state
type ExecuteResult struct {
FailedShards []ShardWithError
SuccessShards []ShardResult
CurSQLIndex int
Sqls []string
ExecutorErr string
TotalTimeSpent time.Duration
}
// ShardWithError contains information why a shard failed to execute given sql
type ShardWithError struct {
Shard string
Err string
}
// ShardResult contains sql execute information on a particular shard
type ShardResult struct {
Shard string
Result *querypb.QueryResult
// Position is a replication position that is guaranteed to be after the
// schema change was applied. It can be used to wait for slaves to receive
// the schema change via replication.
Position string
}
// Run applies schema changes on Vitess through VtGate.
func Run(ctx context.Context, controller Controller, executor Executor) error {
if err := controller.Open(ctx); err != nil {
log.Errorf("failed to open data sourcer: %v", err)
return err
}
defer controller.Close()
sqls, err := controller.Read(ctx)
if err != nil {
log.Errorf("failed to read data from data sourcer: %v", err)
controller.OnReadFail(ctx, err)
return err
}
controller.OnReadSuccess(ctx)
if len(sqls) == 0 {
return nil
}
keyspace := controller.Keyspace()
if err := executor.Open(ctx, keyspace); err != nil {
log.Errorf("failed to open executor: %v", err)
return err
}
defer executor.Close()
if err := executor.Validate(ctx, sqls); err != nil {
log.Errorf("validation fail: %v", err)
controller.OnValidationFail(ctx, err)
return err
}
if err := controller.OnValidationSuccess(ctx); err != nil {
return err
}
result := executor.Execute(ctx, sqls)
if err := controller.OnExecutorComplete(ctx, result); err != nil {
return err
}
if result.ExecutorErr != "" || len(result.FailedShards) > 0 {
out, _ := json.MarshalIndent(result, "", " ")
return fmt.Errorf("Schema change failed, ExecuteResult: %v\n", string(out))
}
return nil
}
// RegisterControllerFactory register a control factory.
func RegisterControllerFactory(name string, factory ControllerFactory) {
if _, ok := controllerFactories[name]; ok {
panic(fmt.Sprintf("register a registered key: %s", name))
}
controllerFactories[name] = factory
}
// GetControllerFactory gets a ControllerFactory.
func GetControllerFactory(name string) (ControllerFactory, error) {
factory, ok := controllerFactories[name]
if !ok {
return nil, fmt.Errorf("there is no data sourcer factory with name: %s", name)
}
return factory, nil
}