forked from dolthub/go-mysql-server
/
table_copier.go
165 lines (131 loc) · 4.76 KB
/
table_copier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package plan
import (
"fmt"
"github.com/gabereiser/go-mysql-server/sql"
"github.com/gabereiser/go-mysql-server/sql/mysql_db"
"github.com/gabereiser/go-mysql-server/sql/types"
)
// TableCopier is a supporting node that allows for the optimization of copying tables. It should be used in two cases.
// 1) CREATE TABLE SELECT *
// 2) INSERT INTO SELECT * where the inserted table is empty. // TODO: Implement this optimization
type TableCopier struct {
source sql.Node
destination sql.Node
db sql.Database
options CopierProps
}
var _ sql.Databaser = (*TableCopier)(nil)
var _ sql.Node = (*TableCopier)(nil)
type CopierProps struct {
replace bool
ignore bool
}
func NewTableCopier(db sql.Database, createTableNode sql.Node, source sql.Node, prop CopierProps) *TableCopier {
return &TableCopier{
source: source,
destination: createTableNode,
db: db,
options: prop,
}
}
func (tc *TableCopier) WithDatabase(db sql.Database) (sql.Node, error) {
ntc := *tc
ntc.db = db
return &ntc, nil
}
func (tc *TableCopier) Database() sql.Database {
return tc.db
}
func (tc *TableCopier) RowIter(ctx *sql.Context, row sql.Row) (sql.RowIter, error) {
if _, ok := tc.destination.(*CreateTable); ok {
return tc.processCreateTable(ctx, row)
}
drt, ok := tc.destination.(*ResolvedTable)
if !ok {
return nil, fmt.Errorf("TableCopier only accepts CreateTable or ResolvedTable as the destination")
}
return tc.copyTableOver(ctx, tc.source.Schema()[0].Source, drt.Name())
}
func (tc *TableCopier) processCreateTable(ctx *sql.Context, row sql.Row) (sql.RowIter, error) {
ct := tc.destination.(*CreateTable)
_, err := ct.RowIter(ctx, row)
if err != nil {
return sql.RowsToRowIter(), err
}
table, tableExists, err := tc.db.GetTableInsensitive(ctx, ct.Name())
if err != nil {
return sql.RowsToRowIter(), err
}
if !tableExists {
return sql.RowsToRowIter(), fmt.Errorf("error: Newly created table does not exist")
}
if tc.createTableSelectCanBeCopied(table) {
return tc.copyTableOver(ctx, tc.source.Schema()[0].Source, table.Name())
}
// TODO: Improve parsing for CREATE TABLE SELECT to allow for IGNORE/REPLACE and custom specs
ii := NewInsertInto(tc.db, NewResolvedTable(table, tc.db, nil), tc.source, tc.options.replace, nil, nil, tc.options.ignore)
// Wrap the insert into a row update accumulator
roa := NewRowUpdateAccumulator(ii, UpdateTypeInsert)
return roa.RowIter(ctx, row)
}
// createTableSelectCanBeCopied determines whether the newly created table's data can just be copied from the source table
func (tc *TableCopier) createTableSelectCanBeCopied(tableNode sql.Table) bool {
// The differences in LIMIT between integrators prevent us from using a copy
if _, ok := tc.source.(*Limit); ok {
return false
}
// If the DB does not implement the TableCopierDatabase interface we cannot copy over the table.
if privDb, ok := tc.db.(mysql_db.PrivilegedDatabase); ok {
if _, ok := privDb.Unwrap().(sql.TableCopierDatabase); !ok {
return false
}
} else if _, ok := tc.db.(sql.TableCopierDatabase); !ok {
return false
}
// If there isn't a match in schema we cannot do a direct copy.
sourceSchema := tc.source.Schema()
tableNodeSchema := tableNode.Schema()
if len(sourceSchema) != len(tableNodeSchema) {
return false
}
for i, sn := range sourceSchema {
if sn.Name != tableNodeSchema[i].Name {
return false
}
}
return true
}
// copyTableOver is used when we can guarantee the destination table will have the same data as the source table.
func (tc *TableCopier) copyTableOver(ctx *sql.Context, sourceTable string, destinationTable string) (sql.RowIter, error) {
db, ok := tc.db.(sql.TableCopierDatabase)
if !ok {
return sql.RowsToRowIter(), sql.ErrTableCopyingNotSupported.New()
}
rowsUpdated, err := db.CopyTableData(ctx, sourceTable, destinationTable)
if err != nil {
return sql.RowsToRowIter(), err
}
return sql.RowsToRowIter([]sql.Row{{types.OkResult{RowsAffected: rowsUpdated, InsertID: 0, Info: nil}}}...), nil
}
func (tc *TableCopier) Schema() sql.Schema {
return tc.destination.Schema()
}
func (tc *TableCopier) Children() []sql.Node {
return nil
}
func (tc *TableCopier) WithChildren(...sql.Node) (sql.Node, error) {
return tc, nil
}
// CheckPrivileges implements the interface sql.Node.
func (tc *TableCopier) CheckPrivileges(ctx *sql.Context, opChecker sql.PrivilegedOperationChecker) bool {
//TODO: add a new branch when the INSERT optimization is added
return opChecker.UserHasPrivileges(ctx,
sql.NewPrivilegedOperation(tc.db.Name(), "", "", sql.PrivilegeType_Create)) &&
tc.source.CheckPrivileges(ctx, opChecker)
}
func (tc *TableCopier) Resolved() bool {
return tc.source.Resolved()
}
func (tc *TableCopier) String() string {
return fmt.Sprintf("TABLE_COPY SRC: %s into DST: %s", tc.source, tc.destination)
}