forked from influxdata/flux
-
Notifications
You must be signed in to change notification settings - Fork 1
/
preview.go
143 lines (122 loc) · 3.51 KB
/
preview.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package experimental
import (
"fmt"
"github.com/InfluxCommunity/flux"
"github.com/InfluxCommunity/flux/array"
"github.com/InfluxCommunity/flux/arrow"
"github.com/InfluxCommunity/flux/execute"
"github.com/InfluxCommunity/flux/execute/table"
"github.com/InfluxCommunity/flux/plan"
"github.com/InfluxCommunity/flux/runtime"
"github.com/apache/arrow/go/v7/arrow/memory"
)
const PreviewKind = "experimental.preview"
type PreviewOpSpec struct {
NRows int64
NTables int64
}
func init() {
previewSignature := runtime.MustLookupBuiltinType("experimental", "preview")
runtime.RegisterPackageValue("experimental", "preview", flux.MustValue(flux.FunctionValue(PreviewKind, createPreviewOpSpec, previewSignature)))
plan.RegisterProcedureSpec(PreviewKind, newPreviewProcedure, PreviewKind)
execute.RegisterTransformation(PreviewKind, createPreviewTransformation)
}
func createPreviewOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
if err := a.AddParentFromArgs(args); err != nil {
return nil, err
}
spec := new(PreviewOpSpec)
if nrows, ok, err := args.GetInt("nrows"); err != nil {
return nil, err
} else if ok {
spec.NRows = nrows
} else {
spec.NRows = 5
}
if ntables, ok, err := args.GetInt("ntables"); err != nil {
return nil, err
} else if ok {
spec.NTables = ntables
} else {
spec.NTables = 5
}
return spec, nil
}
func (s *PreviewOpSpec) Kind() flux.OperationKind {
return PreviewKind
}
type PreviewProcedureSpec struct {
plan.DefaultCost
NRows int64
NTables int64
}
func newPreviewProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
s, ok := qs.(*PreviewOpSpec)
if !ok {
return nil, fmt.Errorf("invalid spec type %T", qs)
}
p := &PreviewProcedureSpec{
NRows: s.NRows,
NTables: s.NTables,
}
return p, nil
}
func (s *PreviewProcedureSpec) Kind() plan.ProcedureKind {
return PreviewKind
}
func (s *PreviewProcedureSpec) Copy() plan.ProcedureSpec {
ns := *s
return &ns
}
func createPreviewTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
s, ok := spec.(*PreviewProcedureSpec)
if !ok {
return nil, nil, fmt.Errorf("invalid spec type %T", spec)
}
return NewPreviewTransformation(id, s, a.Allocator())
}
type previewTransformation struct {
nrows int64
ntables int64
}
func NewPreviewTransformation(id execute.DatasetID, spec *PreviewProcedureSpec, mem memory.Allocator) (execute.Transformation, execute.Dataset, error) {
tr := &previewTransformation{
nrows: spec.NRows,
ntables: spec.NTables,
}
return execute.NewNarrowStateTransformation[any](id, tr, mem)
}
func (t *previewTransformation) Process(chunk table.Chunk, state interface{}, d *execute.TransportDataset, mem memory.Allocator) (interface{}, bool, error) {
n, ok := state.(int64)
if !ok {
if t.ntables == 0 {
return nil, false, nil
}
t.ntables--
n = t.nrows
}
if int64(chunk.Len()) <= n {
chunk.Retain()
if err := d.Process(chunk); err != nil {
return nil, false, err
}
n -= int64(chunk.Len())
return n, true, nil
}
buffer := arrow.TableBuffer{
GroupKey: chunk.Key(),
Columns: chunk.Cols(),
Values: make([]array.Array, chunk.NCols()),
}
for i := range chunk.Cols() {
buffer.Values[i] = arrow.Slice(chunk.Values(i), 0, n)
}
out := table.ChunkFromBuffer(buffer)
if err := d.Process(out); err != nil {
return nil, false, err
}
return 0, true, nil
}
func (t *previewTransformation) Close() error {
return nil
}