forked from alpacahq/marketstore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
insertintostatement.go
163 lines (144 loc) · 4.15 KB
/
insertintostatement.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package SQLParser
import (
"encoding/json"
"fmt"
"time"
"github.com/alpacahq/marketstore/executor"
"github.com/alpacahq/marketstore/utils/io"
)
type InsertIntoStatement struct {
ExecutableStatement
SelectRelation *SelectRelation
QueryText string
TableName string
ColumnAliases []string
}
func NewInsertIntoStatement(tableName, queryText string, selectRelation *SelectRelation) (is *InsertIntoStatement) {
is = new(InsertIntoStatement)
is.QueryText = queryText
is.TableName = tableName
is.SelectRelation = selectRelation
return is
}
func (is *InsertIntoStatement) Materialize() (outputColumnSeries *io.ColumnSeries, err error) {
// Call Materialize on any child relations
inputColumnSeries, err := is.SelectRelation.Materialize()
if err != nil {
return nil, err
}
// Check the input, report contents
if inputColumnSeries != nil {
if inputColumnSeries.Len() != 0 {
fmt.Printf("Query returned %d rows, inserting into: %s\n",
inputColumnSeries.Len(), is.TableName)
} else {
return nil, nil
}
}
/*
Map the target table's columns to the results
*/
targetMK := io.NewTimeBucketKey(is.TableName)
if targetMK == nil {
return nil, fmt.Errorf("Table name must be in the format `one/two/three`, have: %s",
is.TableName)
}
fi, err := executor.ThisInstance.CatalogDir.GetLatestTimeBucketInfoFromKey(targetMK)
if err != nil {
return nil, err
}
targetDSV := fi.GetDataShapesWithEpoch()
/*
Use column aliases to select required target columns in mapping
*/
var targetColumnNames []string // Final result column names will be here
if is.ColumnAliases != nil {
targetColumnNames = is.ColumnAliases
} else {
// Add the Epoch column name
targetColumnNames = append(targetColumnNames, "Epoch")
for _, shape := range targetDSV {
targetColumnNames = append(
targetColumnNames,
shape.Name,
)
}
}
/*
Target Column names now has the required columns in it
We now need to find those columns in the results
*/
inputColumnNames := io.GetNamesFromDSV(inputColumnSeries.GetDataShapes())
as, _ := io.NewAnySet(inputColumnNames)
if !as.Contains(targetColumnNames) {
// Calculate the remainder of names not present
targetSet, _ := io.NewAnySet(targetColumnNames)
residue := targetSet.Subtract(inputColumnNames)
return nil, fmt.Errorf(
"\nUnable to find these columns: %v needed for INSERT into target table %s\nTry %s",
residue,
is.TableName,
"using column aliases to exclude the needed columns from the select list.\n"+
"Example: if foo is foo(a,b,c,d) and bar is a,b,c:\n"+
"\tINSERT INTO foo (a, b, c) SELECT * FROM bar;",
)
}
// Get the time with nanoseconds included if available, prior to projection
indexTime := inputColumnSeries.GetTime()
// Columns are matched - Now project out all but the target column names
inputColumnSeries.Project(targetColumnNames)
/*
Write the data
*/
tgc := executor.ThisInstance.TXNPipe
catDir := executor.ThisInstance.CatalogDir
wal := executor.ThisInstance.WALFile
tbi, err := catDir.GetLatestTimeBucketInfoFromKey(targetMK)
if err != nil {
return nil, err
}
writer, err := executor.NewWriter(tbi, tgc, catDir)
if err != nil {
return nil, err
}
/*
Serialize the Column Series for writing, with the targetDSV controlling projections and coercion
*/
data, _ := io.SerializeColumnsToRows(inputColumnSeries, targetDSV, true)
if data == nil {
return nil, fmt.Errorf("Unable to pre-process data for insertion")
}
writer.WriteRecords(indexTime, data)
wal.RequestFlush()
outputColumnSeries = io.NewColumnSeries()
outputColumnSeries.AddColumn("Epoch",
[]int64{time.Now().UTC().Unix()})
outputColumnSeries.AddColumn("Rows Written",
[]float32{float32(inputColumnSeries.Len())})
return outputColumnSeries, nil
}
func (is *InsertIntoStatement) Explain() string {
if is != nil {
jsonStruct, _ := json.Marshal(*is)
return string(jsonStruct)
} else {
return "{}"
}
}
func (is *InsertIntoStatement) GetLeft() IMSTree {
if is.GetChildCount() == 0 {
return nil
} else {
return is.GetChild(0)
}
}
func (is *InsertIntoStatement) GetRight() IMSTree {
if is.GetChildCount() < 2 {
return nil
} else {
return is.GetChild(1)
}
}
/*
Utility Structures
*/