Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .linkspector.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ ignorePatterns:
- pattern: '^https://dl.acm.org/doi/pdf/10.1145/984549.984551$'
- pattern: '^https://www.researchgate.net/publication/221325979_Union_Types_for_Semistructured_Data$'
- pattern: '^https://db.cs.cmu.edu/papers/2024/whatgoesaround-sigmodrec2024.pdf$'
- pattern: '^https://openproceedings.org/2017/conf/edbt/paper-62.pdf$'
6 changes: 3 additions & 3 deletions book/src/super-sql/operators/fuse.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Because all values of the input must be read to compute the fused type,
---

_Fuse two records_
```mdtest-spq
```mdtest-spq fusion
# spq
fuse
# input
Expand All @@ -43,7 +43,7 @@ fusion({a?:_::int64,b?:2},<{b:int64}>)
---

_Fuse records with type variation_
```mdtest-spq
```mdtest-spq fusion
# spq
fuse
# input
Expand All @@ -57,7 +57,7 @@ fuse
---

_Fuse records with complex type variation_
```mdtest-spq {data-layout="stacked"}
```mdtest-spq fusion {data-layout="stacked"}
# spq
fuse
# input
Expand Down
1 change: 1 addition & 0 deletions cli/outputflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (f *Flags) SetFormatFlags(fs *flag.FlagSet) {
fs.BoolVar(&f.forceBinary, "B", false, "allow Super Binary to be sent to a terminal output")
fs.BoolVar(&f.jsonPretty, "J", false, "use formatted JSON output independent of -f option")
fs.BoolVar(&f.jsonShortcut, "j", false, "use line-oriented JSON output independent of -f option")
fs.BoolVar(&f.SUPFusion, "fusion", false, "display fusion values (fusion values are otherwise auto-defused)")
fs.BoolVar(&f.supPretty, "S", false, "use formatted Super JSON output independent of -f option")
fs.BoolVar(&f.supShortcut, "s", false, "use line-oriented Super JSON output independent of -f option")
}
Expand Down
2 changes: 1 addition & 1 deletion db/ztests/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ script: |
super db load -q -use poolB b.sup
super db -S -c 'from :pools | drop id | sort name | drop ts'
echo ===
super db -S -c 'from poolA@main:objects | {nameof:nameof(this),...this} | drop id'
super db -fusion -S -c 'from poolA@main:objects | {nameof:nameof(this),...this} | drop id'
super db -S -c 'from poolA:log | cut nameof(this)'

inputs:
Expand Down
4 changes: 4 additions & 0 deletions mdtest/mdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,14 @@ func parseMarkdown(source []byte) (map[string]string, []*Test, error) {
})
case "mdtest-spq":
var fails bool
var fusion bool
var runtime string
for _, word := range fcbInfoWords(fcb, source)[1:] {
switch {
case word == "fails":
fails = true
case word == "fusion":
fusion = true
case strings.HasPrefix(word, "runtime="):
runtime = strings.TrimPrefix(word, "runtime=")
if runtime != "vam" && runtime != "sam" {
Expand All @@ -263,6 +266,7 @@ func parseMarkdown(source []byte) (map[string]string, []*Test, error) {
Input: sections[2],
SPQ: sections[1],
Runtime: runtime,
Fusion: fusion,
})
}
return ast.WalkContinue, nil
Expand Down
8 changes: 6 additions & 2 deletions mdtest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type Test struct {
Runtime string // "sam", "vam", or "" for both

// For SPQ tests
Input string
SPQ string
Input string
SPQ string
Fusion bool // If true do not defuse output
}

// Run runs the test, returning nil on success.
Expand Down Expand Up @@ -52,6 +53,9 @@ func (t *Test) run(runtime string) error {
var c *exec.Cmd
if t.SPQ != "" {
c = exec.Command("super", "-s", "-c", t.SPQ)
if t.Fusion {
c.Args = append(c.Args, "-fusion")
}
if s := t.Input; strings.TrimSpace(s) != "" {
c.Args = append(c.Args, "-")
c.Stdin = strings.NewReader(s)
Expand Down
2 changes: 2 additions & 0 deletions runtime/ztests/expr/array-expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ input: |
{a:0,b:"1"::(int64|string),c:null::(int64|string|null)}
{a:0,b:1,c:fusion([2]::(null|[int64]),<[int64]>)}

output-flags: -fusion

output: |
[error("missing"),error("quiet"),null]
[[1,2],null,[3,4]]
Expand Down
2 changes: 2 additions & 0 deletions runtime/ztests/expr/function/nullif.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ input: |
[error(0), null]
[error(0), error(1)]

output-flags: -fusion

output: |
null
null
Expand Down
2 changes: 2 additions & 0 deletions runtime/ztests/expr/function/upcast.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ input: |
type n101=int64
[1::n101,<fusion(n101|string)>]

output-flags: -fusion

output: |
error({message:"upcast: value not a subtype of [int8|string]",on:[1,"a"]})
[1::int8,"a"]
Expand Down
2 changes: 2 additions & 0 deletions runtime/ztests/expr/fusion-all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ input: |
{x:1}
<string>

output-flags: -fusion

output: |
fusion(0x02::all,<int64>)
fusion(0x666f6f::all,<string>)
Expand Down
2 changes: 1 addition & 1 deletion runtime/ztests/expr/search-glob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ output: |
{a:"foox",b:"there"}
{a:"hello",b:"foox"}
{a:"",b:"foo"}
fusion({a?:_::string,b?:"fool"},<{b:string}>)
{b:"fool"}
2 changes: 1 addition & 1 deletion runtime/ztests/expr/search-nested-field-regexp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ input: |
output: |
{a:[{bar:"foo"}]}
{a:[{car:"foo"}]}
fusion({car:"foo"}::(null|{car:string}),<{car:string}>)
{car:"foo"}
{a:[]::[{bar:null}]}
4 changes: 2 additions & 2 deletions runtime/ztests/expr/search-nested-field.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ input: |
output: |
{a:[{b:"foo"}]}
{a:[{c:"foo"},{b:1}]}
fusion({a:1,b?:_::string},<{a:int64}>)
fusion({a:2,b?:"foo"},<{a:int64,b:string}>)
{a:1}
{a:2,b:"foo"}
{a:[]::[{b:null}]}
2 changes: 1 addition & 1 deletion runtime/ztests/expr/search-primitives.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ input: |
output: |
"foo"
"foo"
fusion("foo"::(int64|string),<string>)
"foo"
2 changes: 2 additions & 0 deletions runtime/ztests/op/aggregate/collect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ input: |
1::foo
fusion(1::(int64|string),<int64>)

output-flags: -fusion

output: |
type foo=int64
[{a:1},{a:2},{b:1.5},error("missing"),1::foo,fusion(1::(int64|string),<int64>)]
18 changes: 18 additions & 0 deletions runtime/ztests/op/fuse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ input: |
{a:"goodnight",b:123::int32}
{a:null,b:null,c:null}

output-flags: -fusion

output: |
fusion({a:fusion("hello"::(string|null|none),<string>),b:fusion("world"::(int32|string|null),<string>),c:fusion(_::(string|null|none),<none>)},<{a:string,b:string}>)
fusion({a:fusion(_::(string|null|none),<none>),b:fusion("goodnight"::(int32|string|null),<string>),c:fusion("gracie"::(string|null|none),<string>)},<{b:string,c:string}>)
Expand All @@ -24,6 +26,8 @@ input: |
[{a:3,b:3}]
[{a:null,b:null}]

output-flags: -fusion

output: |
fusion([fusion({a:fusion(1::(int64|null|none),<int64>),b:fusion(_::(int64|null|none),<none>)},<{a:int64}>)],<[{a:int64}]>)
fusion([fusion({a:fusion(_::(int64|null|none),<none>),b:fusion(2::(int64|null|none),<int64>)},<{b:int64}>)],<[{b:int64}]>)
Expand All @@ -44,6 +48,8 @@ input: |
[1]
["s"]

output-flags: -fusion

output: |
fusion({a:fusion(1::(int64|string),<int64>)}::(int64|string|{a:fusion(int64|string)}|[fusion(int64|string)]),<{a:int64}>)
fusion({a:fusion("s"::(int64|string),<string>)}::(int64|string|{a:fusion(int64|string)}|[fusion(int64|string)]),<{a:string}>)
Expand All @@ -62,6 +68,8 @@ input: |
{r:{y:4::int32,z:5::int32},s:"world",r2:{x:6::int32}}
{a:null,r:{x:null,y:null,z:null},s:null,r2:null}

output-flags: -fusion

output: |
fusion({a:fusion("hello"::(string|null|none),<string>),r:fusion({x:fusion(1::int32::(int32|null|none),<int32>),y:fusion(2::int32::(int32|null),<int32>),z:fusion(_::(int32|null|none),<none>)},<{x:int32,y:int32}>),s:fusion(_::(string|null|none),<none>),r2:fusion(_::(null|none|{x:int32}),<none>)},<{a:string,r:{x:int32,y:int32}}>)
fusion({a:fusion(_::(string|null|none),<none>),r:fusion({x:fusion(_::(int32|null|none),<none>),y:fusion(4::int32::(int32|null),<int32>),z:fusion(5::int32::(int32|null|none),<int32>)},<{y:int32,z:int32}>),s:fusion("world"::(string|null|none),<string>),r2:fusion({x:6::int32}::(null|none|{x:int32}),<{x:int32}>)},<{r:{y:int32,z:int32},s:string,r2:{x:int32}}>)
Expand All @@ -75,6 +83,8 @@ input: |
error(1)
error("s")

output-flags: -fusion

output: |
fusion(error(fusion(1::(int64|string),<int64>)),<error(int64)>)
fusion(error(fusion("s"::(int64|string),<string>)),<error(string)>)
Expand All @@ -88,6 +98,8 @@ input: |
["foo"]
[null]

output-flags: -fusion

output: |
fusion([fusion(1::(int64|string|null),<int64>),fusion(2::(int64|string|null),<int64>)],<[int64]>)
fusion([fusion("foo"::(int64|string|null),<string>)],<[string]>)
Expand All @@ -102,6 +114,8 @@ input: |
{a:["foo"]}
{a:[null]}

output-flags: -fusion

output: |
{a:fusion([fusion(1::(int64|string|null),<int64>),fusion(2::(int64|string|null),<int64>)],<[int64]>)}
{a:fusion([fusion("foo"::(int64|string|null),<string>)],<[string]>)}
Expand Down Expand Up @@ -137,6 +151,8 @@ input: |
type er2=error(er1)
error(1)::er2

output-flags: -fusion

output: |
type a1=int64
type a2=[a1]
Expand Down Expand Up @@ -175,6 +191,8 @@ input: |
[fusion({x?:1,y?:_::int64},<{x:int64}>),fusion({x?:_::int64,y?:2},<{y:int64}>)]
[1,fusion(2::(int64|null),<int64>)]

output-flags: -fusion

output: |
{x:1}
fusion({x?:1,y?:_::int64},<{x:int64}>)
Expand Down
2 changes: 2 additions & 0 deletions runtime/ztests/op/unnest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ input: |
[fusion(12::(int64|string),<int64>),"13"]
null

output-flags: -fusion

output: |
1
2
Expand Down
2 changes: 1 addition & 1 deletion service/ztests/csv-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ outputs:
hello
- name: stderr
data: |
CSV output requires uniform records but multiple types encountered (consider 'fuse')
CSV output requires uniform records but multiple types encountered (consider 'blend')
2 changes: 1 addition & 1 deletion service/ztests/query-csv-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ outputs:
hello
- name: stderr
data: |
CSV output requires uniform records but multiple types encountered (consider 'fuse')
CSV output requires uniform records but multiple types encountered (consider 'blend')
63 changes: 47 additions & 16 deletions sio/anyio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"fmt"
"io"

"github.com/brimdata/super"
"github.com/brimdata/super/csup"
"github.com/brimdata/super/runtime/vam/expr"
"github.com/brimdata/super/runtime/vam/expr/function"
"github.com/brimdata/super/sio/arrowio"
"github.com/brimdata/super/sio/bsupio"
"github.com/brimdata/super/sio/csvio"
Expand All @@ -20,18 +23,19 @@ import (
)

type WriterOpts struct {
Format string
BSUP *bsupio.WriterOpts // Nil means use defaults via bsupio.NewWriter.
CSV csvio.WriterOpts
DB dbio.WriterOpts
JSON jsonio.WriterOpts
SUP supio.WriterOpts
Format string
SUPFusion bool
BSUP *bsupio.WriterOpts // Nil means use defaults via bsupio.NewWriter.
CSV csvio.WriterOpts
DB dbio.WriterOpts
JSON jsonio.WriterOpts
SUP supio.WriterOpts
}

func NewWriter(w io.WriteCloser, opts WriterOpts) (vio.PushCloser, error) {
switch opts.Format {
case "arrows":
return arrowio.NewWriter(w), nil
return newDefuser(arrowio.NewWriter(w)), nil
case "bsup":
if opts.BSUP == nil {
return bsupio.NewWriter(w), nil
Expand All @@ -40,31 +44,58 @@ func NewWriter(w io.WriteCloser, opts WriterOpts) (vio.PushCloser, error) {
case "csup":
return csup.NewSerializer(w), nil
case "csv":
return csvio.NewWriter(w, opts.CSV), nil
return newDefuser(csvio.NewWriter(w, opts.CSV)), nil
case "db":
return dbio.NewWriter(w, opts.DB), nil
return newDefuser(dbio.NewWriter(w, opts.DB)), nil
case "json":
return jsonio.NewWriter(w, opts.JSON), nil
return newDefuser(jsonio.NewWriter(w, opts.JSON)), nil
case "line":
return lineio.NewWriter(w), nil
return newDefuser(lineio.NewWriter(w)), nil
case "null":
return &nullWriter{}, nil
case "parquet":
return parquetio.NewWriter(w), nil
return newDefuser(parquetio.NewWriter(w)), nil
case "sup", "":
return supio.NewWriter(w, opts.SUP), nil
w := vio.PushCloser(supio.NewWriter(w, opts.SUP))
if !opts.SUPFusion {
w = newDefuser(w)
}
return w, nil
case "table":
return tableio.NewWriter(w), nil
return newDefuser(tableio.NewWriter(w)), nil
case "tsv":
opts.CSV.Delim = '\t'
return csvio.NewWriter(w, opts.CSV), nil
return newDefuser(csvio.NewWriter(w, opts.CSV)), nil
case "zeek":
return zeekio.NewWriter(w), nil
return newDefuser(zeekio.NewWriter(w)), nil
default:
return nil, fmt.Errorf("unknown format: %s", opts.Format)
}
}

type defuser struct {
vio.PushCloser
defuse expr.Function
}

func newDefuser(w vio.PushCloser) vio.PushCloser {
return &defuser{PushCloser: w, defuse: function.NewDefuse(super.NewContext())}
}

func (d *defuser) Push(vec vector.Any) error {
label, ok := vec.(*vector.Labeled)
if ok {
vec = label.Any
}
if vec != nil {
vec = vector.Apply(vector.ApplyNone, d.defuse.Call, vec)
}
if ok {
vec = &vector.Labeled{Any: vec, Label: label.Label}
}
return d.PushCloser.Push(vec)
}

type nullWriter struct{}

func (*nullWriter) Push(vector.Any) error {
Expand Down
2 changes: 1 addition & 1 deletion sio/csvio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/brimdata/super/vector"
)

var ErrNotDataFrame = errors.New("CSV output requires uniform records but multiple types encountered (consider 'fuse')")
var ErrNotDataFrame = errors.New("CSV output requires uniform records but multiple types encountered (consider 'blend')")

type Writer struct {
writer io.WriteCloser
Expand Down
8 changes: 5 additions & 3 deletions sio/csvio/ztests/fusion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ input: |
output-flags: -f csv

output: |
x,y
1,
,foo
x
1

error: |
CSV output requires uniform records but multiple types encountered (consider 'blend')
Loading