/
infoschema.go
654 lines (627 loc) · 23.3 KB
/
infoschema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"fmt"
"math/bits"
"reflect"
"sort"
"strconv"
"strings"
"time"
"cloud.google.com/go/civil"
sp "cloud.google.com/go/spanner"
_ "github.com/lib/pq" // we will use database/sql package instead of using this package directly
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/schema"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl"
"github.com/GoogleCloudPlatform/spanner-migration-tool/streaming"
)
// InfoSchemaImpl postgres specific implementation for InfoSchema.
type InfoSchemaImpl struct {
Db *sql.DB
SourceProfile profiles.SourceProfile
TargetProfile profiles.TargetProfile
IsSchemaUnique *bool
}
func (isi InfoSchemaImpl) populateSchemaIsUnique(schemaAndNames []common.SchemaAndName) {
schemaSet := make(map[string]struct{})
for _, table := range schemaAndNames {
schemaSet[table.Schema] = struct{}{}
}
if len(schemaSet) == 1 {
*isi.IsSchemaUnique = true
} else {
*isi.IsSchemaUnique = false
}
}
// StartChangeDataCapture is used for automatic triggering of Datastream job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error) {
mp := make(map[string]interface{})
var (
tableList []string
err error
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
if err != nil {
err = fmt.Errorf("error fetching the tableList to setup datastream migration, defaulting to all tables: %v", err)
}
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Pg.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
}
mp["streamingCfg"] = streamingCfg
return mp, err
}
// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)
dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return internal.DataflowOutput{}, err
}
return dfOutput, nil
}
// GetToDdl function below implement the common.InfoSchema interface.
func (isi InfoSchemaImpl) GetToDdl() common.ToDdl {
return ToDdlImpl{}
}
// GetTableName returns table name.
func (isi InfoSchemaImpl) GetTableName(schema string, tableName string) string {
if *isi.IsSchemaUnique { // Drop schema name as prefix if only one schema is detected.
return tableName
} else if schema == "public" {
return tableName
}
return fmt.Sprintf("%s.%s", schema, tableName)
}
// GetRowsFromTable returns a sql Rows object for a table.
func (isi InfoSchemaImpl) GetRowsFromTable(conv *internal.Conv, tableId string) (interface{}, error) {
// PostgreSQL schema and name can be arbitrary strings.
// Ideally we would pass schema/name as a query parameter,
// but PostgreSQL doesn't support this. So we quote it instead.
isSchemaNamePrefixed := strings.HasPrefix(conv.SrcSchema[tableId].Name, conv.SrcSchema[tableId].Schema+".")
var tableName string
if isSchemaNamePrefixed {
tableName = strings.TrimPrefix(conv.SrcSchema[tableId].Name, conv.SrcSchema[tableId].Schema+".")
} else {
tableName = conv.SrcSchema[tableId].Name
}
q := fmt.Sprintf(`SELECT * FROM "%s"."%s";`, conv.SrcSchema[tableId].Schema, tableName)
rows, err := isi.Db.Query(q)
if err != nil {
return nil, err
}
return rows, err
}
// ProcessDataRows performs data conversion for source database
// 'db'. For each table, we extract data using a "SELECT *" query,
// convert the data to Spanner data (based on the source and Spanner
// schemas), and write it to Spanner. If we can't get/process data
// for a table, we skip that table and process the remaining tables.
//
// Note that the database/sql library has a somewhat complex model for
// returning data from rows.Scan. Scalar values can be returned using
// the native value used by the underlying driver (by passing
// *interface{} to rows.Scan), or they can be converted to specific go
// types. Array values are always returned as []byte, a string
// encoding of the array values. This string encoding is
// database/driver specific. For example, for PostgreSQL, array values
// are returned in the form "{v1,v2,..,vn}", where each v1,v2,...,vn
// is a PostgreSQL encoding of the respective array value.
//
// We choose to do all type conversions explicitly ourselves so that
// we can generate more targeted error messages: hence we pass
// *interface{} parameters to row.Scan.
func (isi InfoSchemaImpl) ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, colIds []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error {
srcTableName := conv.SrcSchema[tableId].Name
rowsInterface, err := isi.GetRowsFromTable(conv, tableId)
if err != nil {
conv.Unexpected(fmt.Sprintf("Couldn't get data for table %s : err = %s", srcTableName, err))
return err
}
rows := rowsInterface.(*sql.Rows)
defer rows.Close()
srcCols, _ := rows.Columns()
v, iv := buildVals(len(srcCols))
colNameIdMap := internal.GetSrcColNameIdMap(conv.SrcSchema[tableId])
for rows.Next() {
err := rows.Scan(iv...)
if err != nil {
conv.Unexpected(fmt.Sprintf("Couldn't process sql data row: %s", err))
// Scan failed, so we don't have any data to add to bad rows.
conv.StatsAddBadRow(srcTableName, conv.DataMode())
continue
}
newValues, err1 := common.PrepareValues(conv, tableId, colNameIdMap, colIds, srcCols, v)
cvtCols, cvtVals, err2 := convertSQLRow(conv, tableId, colIds, srcSchema, spSchema, newValues)
if err1 != nil || err2 != nil {
conv.Unexpected(fmt.Sprintf("Couldn't process sql data row: %s", err))
conv.StatsAddBadRow(srcTableName, conv.DataMode())
conv.CollectBadRow(srcTableName, srcCols, valsToStrings(v))
continue
}
conv.WriteRow(srcTableName, conv.SpSchema[tableId].Name, cvtCols, cvtVals)
}
return nil
}
// ConvertSQLRow performs data conversion for a single row of data
// returned from a 'SELECT *' query. ConvertSQLRow assumes that
// srcCols, spCols and srcVals all have the same length. Note that
// ConvertSQLRow returns cols as well as converted values. This is
// because cols can change when we add a column (synthetic primary
// key) or because we drop columns (handling of NULL values).
func convertSQLRow(conv *internal.Conv, tableId string, colIds []string, srcSchema schema.Table, spSchema ddl.CreateTable, srcVals []interface{}) ([]string, []interface{}, error) {
var vs []interface{}
var cs []string
for i, colId := range colIds {
srcCd, ok1 := srcSchema.ColDefs[colId]
spCd, ok2 := spSchema.ColDefs[colId]
if !ok1 || !ok2 {
return nil, nil, fmt.Errorf("data conversion: can't find schema for column id %s of table %s", colId, conv.SrcSchema[tableId].Name)
}
if srcVals[i] == nil {
continue // Skip NULL values (nil is used by database/sql to represent NULL values).
}
var spVal interface{}
var err error
if spCd.T.IsArray {
spVal, err = cvtSQLArray(conv, srcCd, spCd, srcVals[i])
} else {
spVal, err = cvtSQLScalar(conv, srcCd, spCd, srcVals[i])
}
if err != nil { // Skip entire row if we hit error.
return nil, nil, fmt.Errorf("can't convert sql data for column id %s of table %s: %w", colIds, conv.SrcSchema[tableId].Name, err)
}
vs = append(vs, spVal)
cs = append(cs, spCd.Name)
}
if aux, ok := conv.SyntheticPKeys[tableId]; ok {
cs = append(cs, conv.SpSchema[tableId].ColDefs[aux.ColId].Name)
vs = append(vs, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence)))))
aux.Sequence++
conv.SyntheticPKeys[tableId] = aux
}
return cs, vs, nil
}
// GetRowCount with number of rows in each table.
func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error) {
// PostgreSQL schema and name can be arbitrary strings.
// Ideally we would pass schema/name as a query parameter,
// but PostgreSQL doesn't support this. So we quote it instead.
q := fmt.Sprintf(`SELECT COUNT(*) FROM "%s"."%s";`, table.Schema, table.Name)
rows, err := isi.Db.Query(q)
if err != nil {
return 0, err
}
defer rows.Close()
var count int64
if rows.Next() {
err := rows.Scan(&count)
return count, err
}
return 0, nil //Check if 0 is ok to return
}
// GetTables return list of tables in the selected database.
// TODO: All of the queries to get tables and table data should be in
// a single transaction to ensure we obtain a consistent snapshot of
// schema information and table data (pg_dump does something
// similar).
func (isi InfoSchemaImpl) GetTables() ([]common.SchemaAndName, error) {
ignored := make(map[string]bool)
// Ignore all system tables: we just want to convert user tables.
for _, s := range []string{"information_schema", "postgres", "pg_catalog", "pg_temp_1", "pg_toast", "pg_toast_temp_1"} {
ignored[s] = true
}
q := "SELECT table_schema, table_name FROM information_schema.tables where table_type = 'BASE TABLE'"
rows, err := isi.Db.Query(q)
if err != nil {
return nil, fmt.Errorf("couldn't get tables: %w", err)
}
defer rows.Close()
var tableSchema, tableName string
var tables []common.SchemaAndName
for rows.Next() {
rows.Scan(&tableSchema, &tableName)
if !ignored[tableSchema] {
tables = append(tables, common.SchemaAndName{Schema: tableSchema, Name: tableName})
}
}
isi.populateSchemaIsUnique(tables)
return tables, nil
}
// GetColumns returns a list of Column objects and names
func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error) {
q := `SELECT c.column_name, c.data_type, e.data_type, c.is_nullable, c.column_default, c.character_maximum_length, c.numeric_precision, c.numeric_scale
FROM information_schema.COLUMNS c LEFT JOIN information_schema.element_types e
ON ((c.table_catalog, c.table_schema, c.table_name, 'TABLE', c.dtd_identifier)
= (e.object_catalog, e.object_schema, e.object_name, e.object_type, e.collection_type_identifier))
where table_schema = $1 and table_name = $2 ORDER BY c.ordinal_position;`
cols, err := isi.Db.Query(q, table.Schema, table.Name)
if err != nil {
return nil, nil, fmt.Errorf("couldn't get schema for table %s.%s: %s", table.Schema, table.Name, err)
}
defer cols.Close()
colDefs := make(map[string]schema.Column)
var colIds []string
var colName, dataType, isNullable string
var colDefault, elementDataType sql.NullString
var charMaxLen, numericPrecision, numericScale sql.NullInt64
for cols.Next() {
err := cols.Scan(&colName, &dataType, &elementDataType, &isNullable, &colDefault, &charMaxLen, &numericPrecision, &numericScale)
if err != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
continue
}
ignored := schema.Ignored{}
for _, c := range constraints[colName] {
// c can be UNIQUE, PRIMARY KEY, FOREIGN KEY,
// or CHECK (based on msql, sql server, postgres docs).
// We've already filtered out PRIMARY KEY.
switch c {
case "CHECK":
ignored.Check = true
case "FOREIGN KEY", "PRIMARY KEY", "UNIQUE":
// Nothing to do here -- these are handled elsewhere.
}
}
ignored.Default = colDefault.Valid
colId := internal.GenerateColumnId()
c := schema.Column{
Id: colId,
Name: colName,
Type: toType(dataType, elementDataType, charMaxLen, numericPrecision, numericScale),
NotNull: common.ToNotNull(conv, isNullable),
Ignored: ignored,
}
colDefs[colId] = c
colIds = append(colIds, colId)
}
return colDefs, colIds, nil
}
// GetConstraints returns a list of primary keys and by-column map of
// other constraints. Note: we need to preserve ordinal order of
// columns in primary key constraints.
// Note that foreign key constraints are handled in getForeignKeys.
func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) ([]string, map[string][]string, error) {
q := `SELECT k.COLUMN_NAME, t.CONSTRAINT_TYPE
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS t
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS k
ON t.CONSTRAINT_NAME = k.CONSTRAINT_NAME AND t.CONSTRAINT_SCHEMA = k.CONSTRAINT_SCHEMA
WHERE k.TABLE_SCHEMA = $1 AND k.TABLE_NAME = $2 ORDER BY k.ordinal_position;`
rows, err := isi.Db.Query(q, table.Schema, table.Name)
if err != nil {
return nil, nil, err
}
defer rows.Close()
var primaryKeys []string
var col, constraint string
m := make(map[string][]string)
for rows.Next() {
err := rows.Scan(&col, &constraint)
if err != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
continue
}
if col == "" || constraint == "" {
conv.Unexpected(fmt.Sprintf("Got empty col or constraint"))
continue
}
switch constraint {
case "PRIMARY KEY":
primaryKeys = append(primaryKeys, col)
default:
m[col] = append(m[col], constraint)
}
}
return primaryKeys, m, nil
}
// GetForeignKeys returns a list of all the foreign key constraints.
func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error) {
q := `SELECT
schema_name AS "TABLE_SCHEMA",
cl.relname AS "TABLE_NAME",
att2.attname AS "COLUMN_NAME",
att.attname AS "REF_COLUMN_NAME",
conname AS "CONSTRAINT_NAME"
FROM (SELECT
UNNEST(con1.conkey) AS "parent",
UNNEST(con1.confkey) AS "child",
con1.confrelid,
con1.conrelid,
con1.conname,
ns.nspname AS schema_name
FROM PG_CLASS cl
JOIN PG_NAMESPACE ns ON cl.relnamespace = ns.oid
JOIN PG_CONSTRAINT con1 ON con1.conrelid = cl.oid
WHERE ns.nspname = $1 AND cl.relname = $2 AND con1.contype = 'f') con
JOIN PG_ATTRIBUTE att ON
att.attrelid = con.confrelid AND att.attnum = con.child
JOIN PG_CLASS cl ON
cl.oid = con.confrelid
JOIN PG_ATTRIBUTE att2 ON
att2.attrelid = con.conrelid AND att2.attnum = con.parent;`
rows, err := isi.Db.Query(q, table.Schema, table.Name)
if err != nil {
return nil, err
}
defer rows.Close()
var refTable common.SchemaAndName
var col, refCol, fKeyName string
fKeys := make(map[string]common.FkConstraint)
var keyNames []string
for rows.Next() {
err := rows.Scan(&refTable.Schema, &refTable.Name, &col, &refCol, &fKeyName)
if err != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
continue
}
tableName := isi.GetTableName(refTable.Schema, refTable.Name)
if _, found := fKeys[fKeyName]; found {
fk := fKeys[fKeyName]
fk.Cols = append(fk.Cols, col)
fk.Refcols = append(fk.Refcols, refCol)
fKeys[fKeyName] = fk
continue
}
fKeys[fKeyName] = common.FkConstraint{Name: fKeyName, Table: tableName, Refcols: []string{refCol}, Cols: []string{col}}
keyNames = append(keyNames, fKeyName)
}
sort.Strings(keyNames)
for _, k := range keyNames {
foreignKeys = append(foreignKeys,
schema.ForeignKey{
Id: internal.GenerateForeignkeyId(),
Name: fKeys[k].Name,
ColumnNames: fKeys[k].Cols,
ReferTableName: fKeys[k].Table,
ReferColumnNames: fKeys[k].Refcols})
}
return foreignKeys, nil
}
// GetIndexes return a list of all indexes for the specified table.
// Note: Extracting index definitions from PostgreSQL information schema tables is complex.
// See https://stackoverflow.com/questions/6777456/list-all-index-names-column-names-and-its-table-name-of-a-postgresql-database/44460269#44460269
// for background.
func (isi InfoSchemaImpl) GetIndexes(conv *internal.Conv, table common.SchemaAndName, colNameIdMap map[string]string) ([]schema.Index, error) {
q := `SELECT
irel.relname AS index_name,
a.attname AS column_name,
1 + Array_position(i.indkey, a.attnum) AS column_position,
i.indisunique AS is_unique,
CASE o.OPTION & 1 WHEN 1 THEN 'DESC' ELSE 'ASC' END AS order
FROM pg_index AS i
JOIN pg_class AS trel
ON trel.oid = i.indrelid
JOIN pg_namespace AS tnsp
ON trel.relnamespace = tnsp.oid
JOIN pg_class AS irel
ON irel.oid = i.indexrelid
CROSS JOIN LATERAL UNNEST (i.indkey) WITH ordinality AS c (colnum, ordinality)
LEFT JOIN LATERAL UNNEST (i.indoption) WITH ordinality AS o (OPTION, ordinality)
ON c.ordinality = o.ordinality
JOIN pg_attribute AS a
ON trel.oid = a.attrelid
AND a.attnum = c.colnum
WHERE tnsp.nspname= $1
AND trel.relname= $2
AND i.indisprimary = false
GROUP BY tnsp.nspname,
trel.relname,
irel.relname,
a.attname,
array_position(i.indkey, a.attnum),
o.OPTION,i.indisunique
ORDER BY irel.relname, array_position(i.indkey, a.attnum);`
rows, err := isi.Db.Query(q, table.Schema, table.Name)
if err != nil {
return nil, err
}
defer rows.Close()
var name, column, sequence, isUnique, collation string
indexMap := make(map[string]schema.Index)
var indexNames []string
var indexes []schema.Index
for rows.Next() {
if err := rows.Scan(&name, &column, &sequence, &isUnique, &collation); err != nil {
conv.Unexpected(fmt.Sprintf("Can't scan: %v", err))
continue
}
if _, found := indexMap[name]; !found {
indexNames = append(indexNames, name)
indexMap[name] = schema.Index{
Id: internal.GenerateIndexesId(),
Name: name,
Unique: (isUnique == "true")}
}
index := indexMap[name]
index.Keys = append(index.Keys, schema.Key{
ColId: colNameIdMap[column],
Desc: (collation == "DESC")})
indexMap[name] = index
}
for _, k := range indexNames {
indexes = append(indexes, indexMap[k])
}
return indexes, nil
}
func toType(dataType string, elementDataType sql.NullString, charLen sql.NullInt64, numericPrecision, numericScale sql.NullInt64) schema.Type {
switch {
case dataType == "ARRAY" && elementDataType.Valid:
return schema.Type{Name: elementDataType.String, ArrayBounds: []int64{-1}}
// TODO: handle error cases.
// TODO: handle case of multiple array bounds.
case charLen.Valid:
return schema.Type{Name: dataType, Mods: []int64{charLen.Int64}}
case numericPrecision.Valid && numericScale.Valid && numericScale.Int64 != 0:
return schema.Type{Name: dataType, Mods: []int64{numericPrecision.Int64, numericScale.Int64}}
case numericPrecision.Valid:
return schema.Type{Name: dataType, Mods: []int64{numericPrecision.Int64}}
default:
return schema.Type{Name: dataType}
}
}
func cvtSQLArray(conv *internal.Conv, srcCd schema.Column, spCd ddl.ColumnDef, val interface{}) (interface{}, error) {
a, ok := val.([]byte)
if !ok {
return nil, fmt.Errorf("can't convert array values to []byte")
}
return convArray(spCd.T, srcCd.Type.Name, conv.Location, string(a))
}
// cvtSQLScalar converts a values returned from a SQL query to a
// Spanner value. In principle, we could just hand the values we get
// from the driver over to Spanner and have the Spanner client handle
// conversions and errors. However we handle the conversions
// explicitly ourselves so that we can generate more targeted error
// messages. Note that the caller is responsible for handling nil
// values (used to represent NULL). We handle each of the remaining
// cases of values returned by the database/sql library:
//
// bool
// []byte
// int64
// float64
// string
// time.Time
func cvtSQLScalar(conv *internal.Conv, srcCd schema.Column, spCd ddl.ColumnDef, val interface{}) (interface{}, error) {
switch spCd.T.Name {
case ddl.Bool:
switch v := val.(type) {
case bool:
return v, nil
case string:
return convBool(v)
}
case ddl.Bytes:
switch v := val.(type) {
case []byte:
return v, nil
}
case ddl.Date:
// The PostgreSQL driver uses time.Time to represent
// dates. Note that the database/sql library doesn't
// document how dates are represented, so maybe this
// isn't a driver issue, but a generic database/sql
// issue. We explicitly convert from time.Time to
// civil.Date (used by the Spanner client library).
switch v := val.(type) {
case string:
return convDate(v)
case time.Time:
return civil.DateOf(v), nil
}
case ddl.Int64:
switch v := val.(type) {
case []byte: // Parse as int64.
return convInt64(string(v))
case int64:
return v, nil
case float64: // Truncate.
return int64(v), nil
case string: // Parse as int64.
return convInt64(v)
}
case ddl.Float64:
switch v := val.(type) {
case []byte: // Note: PostgreSQL uses []byte for numeric.
return convFloat64(string(v))
case int64:
return float64(v), nil
case float64:
return v, nil
case string:
return convFloat64(v)
}
case ddl.Numeric:
switch v := val.(type) {
case []byte: // Note: PostgreSQL uses []byte for numeric.
return convNumeric(conv, string(v))
}
case ddl.String:
switch v := val.(type) {
case bool:
return strconv.FormatBool(v), nil
case []byte:
return string(v), nil
case int64:
return strconv.FormatInt(v, 10), nil
case float64:
return strconv.FormatFloat(v, 'g', -1, 64), nil
case string:
return v, nil
case time.Time:
return v.String(), nil
}
case ddl.Timestamp:
switch v := val.(type) {
case string:
return convTimestamp(srcCd.Type.Name, conv.Location, v)
case time.Time:
return v, nil
}
case ddl.JSON:
switch v := val.(type) {
case string:
return string(v), nil
case []uint8:
return string(v), nil
}
}
return nil, fmt.Errorf("can't convert value of type %s to Spanner type %s", reflect.TypeOf(val), reflect.TypeOf(spCd.T))
}
// buildVals contructs interface{} value containers to scan row
// results into. Returns both the underlying containers (as a slice)
// as well as an interface{} of pointers to containers to pass to
// rows.Scan.
func buildVals(n int) (v []interface{}, iv []interface{}) {
v = make([]interface{}, n)
for i := range v {
iv = append(iv, &v[i])
}
return v, iv
}
func valsToStrings(vals []interface{}) []string {
toString := func(val interface{}) string {
if val == nil {
return "NULL"
}
switch v := val.(type) {
case *interface{}:
val = *v
}
return fmt.Sprintf("%v", val)
}
var s []string
for _, v := range vals {
s = append(s, toString(v))
}
return s
}