forked from pingcap/br
-
Notifications
You must be signed in to change notification settings - Fork 0
/
glue.go
172 lines (151 loc) · 4.75 KB
/
glue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package gluetidb
import (
"bytes"
"context"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
pd "github.com/tikv/pd/client"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/gluetikv"
)
const (
defaultCapOfCreateTable = 512
defaultCapOfCreateDatabase = 64
brComment = `/*from(br)*/`
)
// New makes a new tidb glue.
func New() Glue {
log.Debug("enabling no register config")
config.UpdateGlobal(func(conf *config.Config) {
conf.SkipRegisterToDashboard = true
})
return Glue{}
}
// Glue is an implementation of glue.Glue using a new TiDB session.
type Glue struct {
tikvGlue gluetikv.Glue
}
type tidbSession struct {
se session.Session
}
// GetDomain implements glue.Glue.
func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
se, err := session.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
dom, err := session.GetDomain(store)
if err != nil {
return nil, errors.Trace(err)
}
// create stats handler for backup and restore.
err = dom.UpdateTableStatsLoop(se)
if err != nil {
return nil, errors.Trace(err)
}
return dom, nil
}
// CreateSession implements glue.Glue.
func (Glue) CreateSession(store kv.Storage) (glue.Session, error) {
se, err := session.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
tiSession := &tidbSession{
se: se,
}
return tiSession, nil
}
// Open implements glue.Glue.
func (g Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
return g.tikvGlue.Open(path, option)
}
// OwnsStorage implements glue.Glue.
func (Glue) OwnsStorage() bool {
return true
}
// StartProgress implements glue.Glue.
func (g Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return g.tikvGlue.StartProgress(ctx, cmdName, total, redirectLog)
}
// Record implements glue.Glue.
func (g Glue) Record(name string, value uint64) {
g.tikvGlue.Record(name, value)
}
// GetVersion implements glue.Glue.
func (g Glue) GetVersion() string {
return g.tikvGlue.GetVersion()
}
// Execute implements glue.Session.
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.ExecuteInternal(ctx, sql)
return errors.Trace(err)
}
// CreateDatabase implements glue.Session.
func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateDatabase(schema)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
schema = schema.Clone()
if len(schema.Charset) == 0 {
schema.Charset = mysql.DefaultCharset
}
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore, true)
}
// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
// Clone() does not clone partitions yet :(
table = table.Clone()
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore, true)
}
// Close implements glue.Session.
func (gs *tidbSession) Close() {
gs.se.Close()
}
// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo.
func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) {
table := tbl.Clone()
table.AutoIncID = 0
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateTable(gs.se, tbl, autoid.Allocators{}, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}
// showCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo.
func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) {
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateDatabase(gs.se, db, true, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}