forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
schema.go
254 lines (217 loc) · 6.65 KB
/
schema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schema
// Yes, this sucks. It's a tiny tiny package that needs to be on its own
// It contains a data structure that's shared between sqlparser & tabletserver
import (
"fmt"
"sync"
"time"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/sqlparser"
querypb "vitess.io/vitess/go/vt/proto/query"
)
// Table types
const (
NoType = iota
Sequence
Message
)
// TypeNames allows to fetch a the type name for a table.
// Count must match the number of table types.
var TypeNames = []string{
"none",
"sequence",
"message",
}
// TableColumn contains info about a table's column.
type TableColumn struct {
Name sqlparser.ColIdent
Type querypb.Type
IsAuto bool
Default sqltypes.Value
}
// Table contains info about a table.
type Table struct {
Name sqlparser.TableIdent
Columns []TableColumn
Indexes []*Index
PKColumns []int
Type int
// SequenceInfo contains info for sequence tables.
SequenceInfo *SequenceInfo
// MessageInfo contains info for message tables.
MessageInfo *MessageInfo
// These vars can be accessed concurrently.
TableRows sync2.AtomicInt64
DataLength sync2.AtomicInt64
IndexLength sync2.AtomicInt64
DataFree sync2.AtomicInt64
MaxDataLength sync2.AtomicInt64
}
// SequenceInfo contains info specific to sequence tabels.
// It must be locked before accessing the values inside.
// If CurVal==LastVal, we have to cache new values.
// When the schema is first loaded, the values are all 0,
// which will trigger caching on first use.
type SequenceInfo struct {
sync.Mutex
NextVal int64
LastVal int64
}
// MessageInfo contains info specific to message tables.
type MessageInfo struct {
// IDPKIndex is the index of the ID column
// in PKvalues. This is used to extract the ID
// value for message tables to discard items
// from the cache.
IDPKIndex int
// Fields stores the field info to be
// returned for subscribers.
Fields []*querypb.Field
// AckWaitDuration specifies how long to wait after
// the message was first sent. The back-off doubles
// every attempt.
AckWaitDuration time.Duration
// PurgeAfterDuration specifies the time after which
// a successfully acked message can be deleted.
PurgeAfterDuration time.Duration
// BatchSize specifies the max number of events to
// send per response.
BatchSize int
// CacheSize specifies the number of messages to keep
// in cache. Anything that cannot fit in the cache
// is sent as best effort.
CacheSize int
// PollInterval specifies the polling frequency to
// look for messages to be sent.
PollInterval time.Duration
}
// NewTable creates a new Table.
func NewTable(name string) *Table {
return &Table{
Name: sqlparser.NewTableIdent(name),
}
}
// Done must be called after columns and indexes are added to
// the table. It will build additional metadata like PKColumns.
func (ta *Table) Done() {
if !ta.HasPrimary() {
return
}
pkIndex := ta.Indexes[0]
ta.PKColumns = make([]int, len(pkIndex.Columns))
for i, pkCol := range pkIndex.Columns {
ta.PKColumns[i] = ta.FindColumn(pkCol)
}
}
// AddColumn adds a column to the Table.
func (ta *Table) AddColumn(name string, columnType querypb.Type, defval sqltypes.Value, extra string) {
index := len(ta.Columns)
ta.Columns = append(ta.Columns, TableColumn{Name: sqlparser.NewColIdent(name)})
ta.Columns[index].Type = columnType
if extra == "auto_increment" {
ta.Columns[index].IsAuto = true
// Ignore default value, if any
return
}
if defval.IsNull() {
return
}
// Schema values are trusted.
ta.Columns[index].Default = sqltypes.MakeTrusted(ta.Columns[index].Type, defval.Raw())
}
// FindColumn finds a column in the table. It returns the index if found.
// Otherwise, it returns -1.
func (ta *Table) FindColumn(name sqlparser.ColIdent) int {
for i, col := range ta.Columns {
if col.Name.Equal(name) {
return i
}
}
return -1
}
// GetPKColumn returns the pk column specified by the index.
func (ta *Table) GetPKColumn(index int) *TableColumn {
return &ta.Columns[ta.PKColumns[index]]
}
// AddIndex adds an index to the table.
func (ta *Table) AddIndex(name string, unique bool) (index *Index) {
index = NewIndex(name, unique)
ta.Indexes = append(ta.Indexes, index)
return index
}
// SetMysqlStats receives the values found in the mysql information_schema.tables table
func (ta *Table) SetMysqlStats(tr, dl, il, df, mdl sqltypes.Value) {
v, _ := sqltypes.ToInt64(tr)
ta.TableRows.Set(v)
v, _ = sqltypes.ToInt64(dl)
ta.DataLength.Set(v)
v, _ = sqltypes.ToInt64(il)
ta.IndexLength.Set(v)
v, _ = sqltypes.ToInt64(df)
ta.DataFree.Set(v)
v, _ = sqltypes.ToInt64(mdl)
ta.MaxDataLength.Set(v)
}
// HasPrimary returns true if the table has a primary key.
func (ta *Table) HasPrimary() bool {
return len(ta.Indexes) != 0 && ta.Indexes[0].Name.EqualString("primary")
}
// UniqueIndexes returns the number of unique indexes on the table
func (ta *Table) UniqueIndexes() int {
unique := 0
for _, idx := range ta.Indexes {
if idx.Unique {
unique++
}
}
return unique
}
// Index contains info about a table index.
type Index struct {
Name sqlparser.ColIdent
Unique bool
// Columns are the columns comprising the index.
Columns []sqlparser.ColIdent
// Cardinality[i] is the number of distinct values of Columns[i] in the
// table.
Cardinality []uint64
}
// NewIndex creates a new Index.
func NewIndex(name string, unique bool) *Index {
return &Index{Name: sqlparser.NewColIdent(name), Unique: unique}
}
// AddColumn adds a column to the index.
func (idx *Index) AddColumn(name string, cardinality uint64) {
idx.Columns = append(idx.Columns, sqlparser.NewColIdent(name))
if cardinality == 0 {
cardinality = uint64(len(idx.Cardinality) + 1)
}
idx.Cardinality = append(idx.Cardinality, cardinality)
}
// FindColumn finds a column in the index. It returns the index if found.
// Otherwise, it returns -1.
func (idx *Index) FindColumn(name sqlparser.ColIdent) int {
for i, colName := range idx.Columns {
if colName.Equal(name) {
return i
}
}
return -1
}
// String() pretty-prints TableColumn into a string.
func (c *TableColumn) String() string {
return fmt.Sprintf("{Name: '%v', Type: %v}", c.Name, c.Type)
}