-
Notifications
You must be signed in to change notification settings - Fork 40
/
frames.go
214 lines (196 loc) · 6.48 KB
/
frames.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package frames
import (
"context"
"fmt"
"io"
"sync"
"github.com/Azure/azure-kusto-go/kusto/data/types"
"github.com/Azure/azure-kusto-go/kusto/data/value"
"github.com/Azure/azure-kusto-go/kusto/data/errors"
)
const (
// TypeDataTable is the .FrameType that indicates a Kusto DataTable.
TypeDataTable = "DataTable"
// TypeDataSetCompletion is the .FrameType that indicates a Kusto DataSetCompletion.
TypeDataSetCompletion = "DataSetCompletion"
// TypeDataSetHeader is the .FrameType that indicates a Kusto DataSetHeader.
TypeDataSetHeader = "DataSetHeader"
// TypeTableHeader is the .FrameType that indicates a Kusto TableHeader.
TypeTableHeader = "TableHeader"
// TypeTableFragment is the .FrameType that indicates a Kusto TableFragment.
TypeTableFragment = "TableFragment"
// TypeTableProgress is the .FrameType that indicates a Kusto TableProgress.
TypeTableProgress = "TableProgress"
// TypeTableCompletion is the .FrameType that indicates a Kusto TableCompletion.
TypeTableCompletion = "TableCompletion"
)
// These constants represent keys for fields when unmarshalling various JSON dicts representing Kusto frames.
const (
FieldFrameType = "FrameType"
FieldTableID = "TableId"
FieldTableKind = "TableKind"
FieldTableName = "TableName"
FieldColumns = "Columns"
FieldRows = "Rows"
FieldColumnName = "ColumnName"
FieldColumnType = "ColumnType"
FieldCount = "FieldCount"
FieldTableFragmentType = "TableFragmentType"
FieldTableProgress = "TableProgress"
FieldRowCount = "RowCount"
)
// TableKind describes the kind of table.
type TableKind string
const (
// QueryProperties is a dataTable.TableKind that contains properties about the query itself.
// The dataTable.TableName is usually ExtendedProperties.
QueryProperties TableKind = "QueryProperties"
// PrimaryResult is a dataTable.TableKind that contains the query information the user wants.
// The dataTable.TableName is PrimaryResult.
PrimaryResult TableKind = "PrimaryResult"
// QueryCompletionInformation contains information on how long the query took.
// The dataTable.TableName is QueryCompletionInformation.
QueryCompletionInformation TableKind = "QueryCompletionInformation"
QueryTraceLog TableKind = "QueryTraceLog"
QueryPerfLog TableKind = "QueryPerfLog"
TableOfContents TableKind = "TableOfContents"
QueryPlan TableKind = "QueryPlan"
ExtendedProperties TableKind = "@ExtendedProperties"
UnknownTableKind TableKind = "Unknown"
)
var tkDetection = map[TableKind]bool{
QueryProperties: true,
PrimaryResult: true,
QueryCompletionInformation: true,
QueryTraceLog: true,
QueryPerfLog: true,
TableOfContents: true,
QueryPlan: true,
UnknownTableKind: true,
}
// Decoder provides a function that will decode an incoming data stream and return a channel of Frame objects.
type Decoder interface {
// Decode decodes an io.Reader representing a stream of Kusto frames into our Frame representation.
// The type and order of frames is dependent on the REST interface version and the progressive frame settings.
Decode(ctx context.Context, r io.ReadCloser, op errors.Op) chan Frame
}
// Frame is a type of Kusto frame as defined in the reference document.
type Frame interface {
IsFrame()
}
// Pool provides a package level pool of map[string]interface{} to lower our allocations for decoding.
var Pool = sync.Pool{
New: func() interface{} {
return make(map[string]interface{}, 10)
},
}
// PoolCh provides a package level channel that sends a unused map to the package pool,
// allowing all instances of decoder to share the same map pool.
var PoolCh = make(chan map[string]interface{}, 100)
// poolIn provides a background goroutine that pushes unused maps into our pool for resuse.
func poolIn() {
for m := range PoolCh {
for k := range m {
delete(m, k)
}
Pool.Put(m)
}
}
// init starts our poolIn background goroutine.
func init() {
// TODO(jdoak): At some point I will need to sit down and find the optimal value.
for i := 0; i < 5; i++ {
go poolIn()
}
}
// Error is not actually a Kusto frame, but is used to signal the end of a stream
// where we encountered an error. Error implements error.
type Error struct {
Msg string
}
// Error implements error.Error().
func (e Error) Error() string {
return e.Msg
}
func (Error) IsFrame() {}
// Errorf write a frames.Error to ch with fmt.Sprint(s, a...).
func Errorf(ctx context.Context, ch chan Frame, s string, a ...interface{}) {
select {
case <-ctx.Done():
case ch <- Error{Msg: fmt.Sprintf(s, a...)}:
}
}
// Conversion has keys that are Kusto data types, represented by CT* constants
// to functions that convert the JSON value into our concrete KustoValue types.
var Conversion = map[types.Column]func(i interface{}) (value.Kusto, error){
types.Bool: func(i interface{}) (value.Kusto, error) {
v := value.Bool{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
types.DateTime: func(i interface{}) (value.Kusto, error) {
v := value.DateTime{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
types.Dynamic: func(i interface{}) (value.Kusto, error) {
v := value.Dynamic{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
types.GUID: func(i interface{}) (value.Kusto, error) {
v := value.GUID{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
types.Int: func(i interface{}) (value.Kusto, error) {
v := value.Int{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
types.Long: func(i interface{}) (value.Kusto, error) {
v := value.Long{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
types.Real: func(i interface{}) (value.Kusto, error) {
v := value.Real{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
types.String: func(i interface{}) (value.Kusto, error) {
v := value.String{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
types.Timespan: func(i interface{}) (value.Kusto, error) {
v := value.Timespan{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
types.Decimal: func(i interface{}) (value.Kusto, error) {
v := value.Decimal{}
if err := v.Unmarshal(i); err != nil {
return nil, err
}
return v, nil
},
}