-
Notifications
You must be signed in to change notification settings - Fork 0
/
insert.go
153 lines (137 loc) · 3.29 KB
/
insert.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package sql
import (
"fmt"
"github.com/elliotcourant/melogale/pkg/ast"
"github.com/elliotcourant/melogale/pkg/base"
"sort"
)
func (p *planner) Insert(stmt ast.InsertStmt) (PlanStage, error) {
tableName := *stmt.Relation.Relname
table, ok, err := p.GetTable(tableName)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("a table with name [%s] does not exist", tableName)
}
stage := make(PlanStage, 0)
columns := make([]base.Column, len(stmt.Cols.Items))
if len(stmt.Cols.Items) > 0 {
for i, col := range stmt.Cols.Items {
switch resTarget := col.(type) {
case ast.ResTarget:
columnName := *resTarget.Name
column, ok := table.Columns[columnName]
if !ok {
return nil, fmt.Errorf("a column with name [%s] does not exist for table [%s]", columnName, tableName)
}
columns[i] = column
default:
panic("invalid column")
}
}
} else {
for _, column := range table.Columns {
columns = append(columns, column)
}
sort.Slice(columns, func(i, j int) bool {
return columns[i].ColumnId < columns[j].ColumnId
})
}
insertPlan := &InsertTablePlan{
stmt: stmt,
table: table,
columns: columns,
}
switch v := stmt.SelectStmt.(type) {
case ast.SelectStmt:
if v.ValuesLists == nil || len(v.ValuesLists) == 0 {
panic("insert from select query not implemented")
} else {
valueRenderer, err := p.ValuesListRenderer(v, columns, insertPlan)
if err != nil {
return nil, err
}
stage = append(stage, valueRenderer...)
}
default:
panic(fmt.Sprintf("insert from [%T] not implemented", v))
}
stage = append(stage)
if len(table.Indexes) > 0 {
panic("add insert index plans")
}
return stage, nil
}
type InsertTablePlan struct {
stmt ast.InsertStmt
table base.Table
columns []base.Column
values []RowValue
}
func (i *InsertTablePlan) ReceiveRow(row RowValue) {
i.values = append(i.values, row)
}
func (i InsertTablePlan) Run(ctx ExecutionContext) error {
rows := make([]base.Row, 0)
for _, rowValue := range i.values {
row := base.Row{
TableId: i.table.TableId,
PrimaryKey: make([]base.Datum, 0),
Datums: map[uint8]base.Datum{},
}
for columnName, value := range rowValue {
col := i.table.Columns[columnName]
if col.Flags.IsPrimaryKey() {
row.PrimaryKey = append(row.PrimaryKey, value)
}
row.Datums[col.ColumnId] = value
}
rows = append(rows, row)
}
for _, row := range rows {
if err := ctx.Set(row.EncodeKey(), row.EncodeValue()); err != nil {
return err
}
}
return nil
}
func (i InsertTablePlan) Explain() Explanation {
panic("implement me")
}
type ValuesListPlan struct {
stmt ast.SelectStmt
table base.Table
columns []base.Column
}
func (v ValuesListPlan) Run(ctx ExecutionContext) error {
valueList := v.stmt.ValuesLists
for _, rowValues := range valueList {
row := RowValue{}
for c, cell := range rowValues {
col := v.columns[c]
datum := base.Datum{
Type: col.Type.Family,
Value: nil,
}
for {
switch val := cell.(type) {
case ast.A_Const:
cell = val.Val
continue
case ast.String:
datum.Value = val.Str
case ast.Integer:
datum.Value = val.Ival
}
break
}
row[col.Name] = datum
}
ctx.StoreValue(row)
}
return nil
}
func (v ValuesListPlan) Explain() Explanation {
panic("implement me")
}