/
mapping.go
264 lines (249 loc) · 9.4 KB
/
mapping.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"fmt"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl"
)
// GetSpannerTable maps a source DB table name into a legal Spanner table
// name. Note that source DB column names can be essentially any string, but
// Spanner column names must use a limited character set. This means that
// getSpannerTable may have to change a name to make it legal, we must ensure
// that:
// a) the new table name is legal
// b) the new table name doesn't clash with other Spanner table names
// c) we consistently return the same name for this table.
//
// conv.UsedNames tracks Spanner names that have been used for table names, foreign key constraints
// and indexes. We use this to ensure we generate unique names when
// we map from source dbs to Spanner since Spanner requires all these names to be
// distinct and should not differ only in case.
func GetSpannerTable(conv *Conv, tableId string) (string, error) {
if tableId == "" {
return "", fmt.Errorf("bad parameter: table-id string is empty")
}
if sp, found := conv.SpSchema[tableId]; found {
return sp.Name, nil
}
srcTableName := conv.SrcSchema[tableId].Name
spTableName := getSpannerValidName(conv, srcTableName)
if spTableName != srcTableName {
VerbosePrintf("Mapping source DB table %s to Spanner table %s\n", srcTableName, spTableName)
logger.Log.Debug(fmt.Sprintf("Mapping source DB table %s to Spanner table %s\n", srcTableName, spTableName))
}
return spTableName, nil
}
// GetSpannerCol maps a source DB table/column into a legal Spanner column
// name. If mustExist is true, we return error if the column is new.
// Note that source DB column names can be essentially any string, but
// Spanner column names must use a limited character set. This means that
// getSpannerCol may have to change a name to make it legal, we must ensure
// that:
// a) the new col name is legal
// b) the new col name doesn't clash with other col names in the same table
// c) we consistently return the same name for the same col.
func GetSpannerCol(conv *Conv, tableId, colId string, spColDef map[string]ddl.ColumnDef) (string, error) {
if tableId == "" {
return "", fmt.Errorf("bad parameter: table id string is empty")
}
if colId == "" {
return "", fmt.Errorf("bad parameter: column id string is empty")
}
if spCol, found := spColDef[colId]; found {
return spCol.Name, nil
}
srcTable := conv.SrcSchema[tableId]
srcColName := srcTable.ColDefs[colId].Name
spColName, _ := FixName(srcColName)
usedColNames := map[string]bool{}
for _, spCol := range spColDef {
usedColNames[spCol.Name] = true
}
if _, found := usedColNames[spColName]; found {
// spColName has been used before i.e. FixName caused a collision.
// Add unique postfix: use number of cols in this table so far.
// However, there is a chance this has already been used,
// so need to iterate
id := len(spColDef)
for {
c := spColName + "_" + strconv.Itoa(id)
if _, found := usedColNames[c]; !found {
spColName = c
break
}
id++
}
}
if spColName != srcColName {
VerbosePrintf("Mapping source DB col %s (table %s) to Spanner col %s\n", srcColName, srcTable.Name, spColName)
logger.Log.Debug(fmt.Sprintf("Mapping source DB col %s (table %s) to Spanner col %s\n", srcColName, srcTable.Name, spColName))
}
return spColName, nil
}
// GetSpannerCols maps a slice of source columns into their corresponding
// Spanner columns using GetSpannerCol.
func GetSpannerCols(conv *Conv, tableId string, srcCols []string) ([]string, error) {
var spCols []string
for _, srcColName := range srcCols {
colId, err := GetColIdFromSrcName(conv.SrcSchema[tableId].ColDefs, srcColName)
if err != nil {
return nil, err
}
spCol, err := GetSpannerCol(conv, tableId, colId, conv.SpSchema[tableId].ColDefs)
if err != nil {
return nil, err
}
spCols = append(spCols, spCol)
}
return spCols, nil
}
// ToSpannerForeignKey maps source foreign key name to
// legal Spanner foreign key name.
// If the srcKeyName is empty string we can just return
// empty string without error.
// If the srcKeyName is not empty we need to make sure
// of the following things:
// a) the new foreign key name is legal
// b) the new foreign key name doesn't clash with other Spanner
//
// foreign key names
//
// Note that foreign key constraint names in Spanner have to be globally unique
// (across the database). But in some source databases, such as PostgreSQL,
// they only have to be unique for a table. Hence we must map each source
// constraint name to a unique spanner constraint name.
func ToSpannerForeignKey(conv *Conv, srcFkName string) string {
if srcFkName == "" {
return ""
}
return getSpannerValidName(conv, srcFkName)
}
// ToSpannerIndexName maps source index name to legal Spanner index name.
// We need to make sure of the following things:
// a) the new index name is legal
// b) the new index name doesn't clash with other Spanner
//
// index names
//
// Note that index key constraint names in Spanner have to be globally unique
// (across the database). But in some source databases, such as MySQL,
// they only have to be unique for a table. Hence we must map each source
// constraint name to a unique spanner constraint name.
func ToSpannerIndexName(conv *Conv, srcIndexName string) string {
return getSpannerValidName(conv, srcIndexName)
}
// conv.UsedNames tracks Spanner names that have been used for table names, foreign key constraints
// and indexes. We use this to ensure we generate unique names when
// we map from source dbs to Spanner since Spanner requires all these names to be
// distinct and should not differ only in case.
func getSpannerValidName(conv *Conv, srcName string) string {
spKeyName, _ := FixName(srcName)
if _, found := conv.UsedNames[strings.ToLower(spKeyName)]; found {
// spKeyName has been used before.
// Add unique postfix: use number of keys so far.
// However, there is a chance this has already been used,
// so need to iterate.
id := len(conv.UsedNames)
for {
c := spKeyName + "_" + strconv.Itoa(id)
if _, found := conv.UsedNames[strings.ToLower(c)]; !found {
spKeyName = c
break
}
id++
}
}
conv.UsedNames[strings.ToLower(spKeyName)] = true
return spKeyName
}
// ResolveRefs resolves all table and column references in foreign key constraints
// in the Spanner Schema. Note: Spanner requires that DDL references match
// the case of the referenced object, but this is not so for many source databases.
//
// TODO: Expand ResolveRefs to primary keys and indexes.
func ResolveRefs(conv *Conv) {
for table, spTable := range conv.SpSchema {
spTable.ForeignKeys = resolveFks(conv, table, spTable.ForeignKeys)
conv.SpSchema[table] = spTable
}
}
// resolveFks returns resolved version of fks.
// Foreign key constraints that can't be resolved are dropped.
func resolveFks(conv *Conv, table string, fks []ddl.Foreignkey) []ddl.Foreignkey {
var resolved []ddl.Foreignkey
for _, fk := range fks {
var err error
if fk.ColIds, err = resolveColRefs(conv, table, fk.ColIds); err != nil {
conv.Unexpected(fmt.Sprintf("Can't resolve Columns in foreign key constraint: %s", err))
delete(conv.UsedNames, strings.ToLower(fk.Name))
continue
}
if fk.ReferTableId, err = resolveTableRef(conv, fk.ReferTableId); err != nil {
conv.Unexpected(fmt.Sprintf("Can't resolve ReferTable in foreign key constraint: %s", err))
delete(conv.UsedNames, strings.ToLower(fk.Name))
continue
}
if fk.ReferColumnIds, err = resolveColRefs(conv, fk.ReferTableId, fk.ReferColumnIds); err != nil {
conv.Unexpected(fmt.Sprintf("Can't resolve ReferColumnIds in foreign key constraint: %s", err))
delete(conv.UsedNames, strings.ToLower(fk.Name))
continue
}
resolved = append(resolved, fk)
}
return resolved
}
func resolveTableRef(conv *Conv, tableRef string) (string, error) {
if _, ok := conv.SpSchema[tableRef]; ok {
return tableRef, nil
}
// Do case-insensitive search for tableRef.
tr := strings.ToLower(tableRef)
for t := range conv.SpSchema {
if strings.ToLower(t) == tr {
return t, nil
}
}
return "", fmt.Errorf("can't resolve table %v", tableRef)
}
func resolveColRefs(conv *Conv, tableRef string, colRefs []string) ([]string, error) {
table, err := resolveTableRef(conv, tableRef)
if err != nil {
return nil, err
}
resolveColRef := func(colRef string) (string, error) {
if _, ok := conv.SpSchema[table].ColDefs[colRef]; ok {
return colRef, nil
}
// Do case-insensitive search for colRef.
cr := strings.ToLower(colRef)
for _, c := range conv.SpSchema[table].ColIds {
if strings.ToLower(c) == cr {
return c, nil
}
}
return "", fmt.Errorf("can't resolve column: table=%v, column=%v", tableRef, colRef)
}
var cols []string
for _, colRef := range colRefs {
c, err := resolveColRef(colRef)
if err != nil {
return nil, err
}
cols = append(cols, c)
}
return cols, nil
}