-
Notifications
You must be signed in to change notification settings - Fork 57
/
scanrow.go
133 lines (112 loc) · 3.91 KB
/
scanrow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package sqlutil
import (
"database/sql"
"fmt"
"reflect"
)
// A ScanRow is a container for SQL metadata for a single row.
// The row metadata is used to generate dataframe fields and a slice that can be used with sql.Scan
type ScanRow struct {
Columns []string
Types []reflect.Type
}
// NewScanRow creates a new ScanRow with a length of `length`. Use the `Set` function to manually set elements at specific indices.
func NewScanRow(length int) *ScanRow {
return &ScanRow{
Columns: make([]string, length),
Types: make([]reflect.Type, length),
}
}
// Append adds data to the end of the list of types and columns
func (s *ScanRow) Append(name string, colType reflect.Type) {
s.Columns = append(s.Columns, name)
s.Types = append(s.Types, colType)
}
// Set sets the internal data at i
func (s *ScanRow) Set(i int, name string, colType reflect.Type) {
s.Columns[i] = name
s.Types[i] = colType
}
// NewScannableRow creates a slice where each element is usable in a call to `(database/sql.Rows).Scan`
// aka a pointer
func (s *ScanRow) NewScannableRow() []interface{} {
values := make([]interface{}, len(s.Types))
for i, v := range s.Types {
if v.Kind() == reflect.Ptr {
n := reflect.New(v.Elem())
values[i] = n.Interface()
} else {
values[i] = reflect.New(v).Interface()
}
}
return values
}
// MakeScanRow creates a new scan row given the column types and names.
// Applicable converters will substitute the SQL scan type with the one provided by the converter.
// The list of returned converters is the same length as the SQL rows and corresponds with the rows at the same index. (e.g. value at slice element 3 corresponds with the converter at slice element 3)
// If no converter is provided for a row that has a type that does not fit into a dataframe, it is skipped.
func MakeScanRow(colTypes []*sql.ColumnType, colNames []string, converters ...Converter) (*RowConverter, error) {
// In the future we can probably remove this restriction. But right now we map names to Arrow Field Names.
// Arrow Field names must be unique: https://github.com/grafana/grafana-plugin-sdk-go/issues/59
seen := map[string]int{}
for i, name := range colNames {
if j, ok := seen[name]; ok {
return nil, fmt.Errorf(`duplicate column names are not allowed, found identical name "%v" at column indices %v and %v`, name, j, i)
}
seen[name] = i
}
rc := NewRowConverter()
// For each column, define a concrete type in the list of values
for i, colType := range colTypes {
colName := colNames[i]
colType = columnType(colType)
nullable, ok := colType.Nullable()
if !ok {
nullable = true // If we don't know if it is nullable, assume it is
}
for _, v := range converters {
if m := match(v, colType.DatabaseTypeName(), colName); m {
rc.append(colName, scanType(v, colType.ScanType()), v)
break
}
}
if !rc.hasConverter(i) {
v := NewDefaultConverter(colName, nullable, colType.ScanType())
rc.append(colName, scanType(v, colType.ScanType()), v)
}
}
return rc, nil
}
func scanType(v Converter, t reflect.Type) reflect.Type {
if v.InputScanType != nil {
return v.InputScanType
}
return t
}
type RowConverter struct {
Row *ScanRow
Converters []Converter
}
func NewRowConverter() *RowConverter {
return &RowConverter{Row: NewScanRow(0)}
}
func (r *RowConverter) append(name string, kind reflect.Type, conv Converter) {
r.Row.Append(name, kind)
r.Converters = append(r.Converters, conv)
}
func (r *RowConverter) hasConverter(i int) bool {
return len(r.Converters) > i
}
func (r *RowConverter) NewScannableRow() []any {
return r.Row.NewScannableRow()
}
func match(v Converter, dbType string, colName string) bool {
return (v.InputColumnName == colName && v.InputColumnName != "") ||
v.InputTypeName == dbType || (v.InputTypeRegex != nil && v.InputTypeRegex.MatchString(dbType))
}
func columnType(colType *sql.ColumnType) *sql.ColumnType {
if colType != nil {
return colType
}
return &sql.ColumnType{}
}