Skip to content

Commit

Permalink
Use faster library for JSON output.
Browse files Browse the repository at this point in the history
  • Loading branch information
cube2222 committed Apr 22, 2022
1 parent 8813d76 commit 52e1f02
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 21 deletions.
6 changes: 4 additions & 2 deletions datasources/csv/execution.go
Expand Up @@ -9,6 +9,8 @@ import (
"strconv"
"time"

"github.com/valyala/fastjson/fastfloat"

. "github.com/cube2222/octosql/execution"
"github.com/cube2222/octosql/octosql"
"github.com/cube2222/octosql/physical"
Expand Down Expand Up @@ -68,15 +70,15 @@ func (d *DatasourceExecuting) Run(ctx ExecutionContext, produce ProduceFn, metaS
}

if octosql.Int.Is(d.fields[i].Type) == octosql.TypeRelationIs {
integer, err := strconv.ParseInt(str, 10, 64)
integer, err := fastfloat.ParseInt64(str)
if err == nil {
values[i] = octosql.NewInt(int(integer))
continue
}
}

if octosql.Float.Is(d.fields[i].Type) == octosql.TypeRelationIs {
float, err := strconv.ParseFloat(str, 64)
float, err := fastfloat.Parse(str)
if err == nil {
values[i] = octosql.NewFloat(float)
continue
Expand Down
6 changes: 3 additions & 3 deletions execution/nodes/group_by.go
Expand Up @@ -138,15 +138,15 @@ func (g *GroupBy) Run(ctx ExecutionContext, produce ProduceFn, metaSend MetaSend
}

if err := g.trigger(ProduceFromExecutionContext(ctx), aggregates, previouslySentValues, trigger, record.EventTime, produce); err != nil {
return fmt.Errorf("couldn't trigger keys on record receive")
return fmt.Errorf("couldn't trigger keys on record receive: %w", err)
}

return nil
}, func(ctx ProduceContext, msg MetadataMessage) error {
if msg.Type == MetadataMessageTypeWatermark {
trigger.WatermarkReceived(msg.Watermark)
if err := g.trigger(ctx, aggregates, previouslySentValues, trigger, msg.Watermark, produce); err != nil {
return fmt.Errorf("couldn't trigger keys on watermark")
return fmt.Errorf("couldn't trigger keys on watermark: %w", err)
}
}
return metaSend(ctx, msg)
Expand All @@ -157,7 +157,7 @@ func (g *GroupBy) Run(ctx ExecutionContext, produce ProduceFn, metaSend MetaSend
trigger.EndOfStreamReached()
// TODO: What should be put here as the event time? WatermarkMaxValue kind of makes sense. But on the other hand, if this is then i.e. StreamJoin'ed with something then it would make everything MaxValue. But only if this is Batch. If it's grouping by event time then the event times will be correct.
if err := g.trigger(ProduceFromExecutionContext(ctx), aggregates, previouslySentValues, trigger, WatermarkMaxValue, produce); err != nil {
return fmt.Errorf("couldn't trigger keys on end of stream")
return fmt.Errorf("couldn't trigger keys on end of stream: %w", err)
}

return nil
Expand Down
1 change: 0 additions & 1 deletion go.mod
Expand Up @@ -28,7 +28,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.6.0
github.com/pmezard/go-difflib v1.0.0
github.com/segmentio/encoding v0.2.7
github.com/shopspring/decimal v1.2.0 // indirect
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
github.com/spf13/cobra v1.2.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Expand Up @@ -284,8 +284,6 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/encoding v0.2.7 h1:TKxEiKbernCFCTFW5wnSlE21kIQpqcY/ABXjhc9YeJU=
github.com/segmentio/encoding v0.2.7/go.mod h1:MJjRE6bMDocliO2FyFC2Dusp+uYdBfHWh5Bw7QyExto=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
Expand Down
10 changes: 7 additions & 3 deletions octosql/values.go
Expand Up @@ -134,8 +134,8 @@ func (value Value) Compare(other Value) int {
}

case TypeIDString:
left := strings.ToLower(value.Str)
right := strings.ToLower(other.Str)
left := value.Str
right := other.Str
if left < right {
return -1
} else if left > right {
Expand Down Expand Up @@ -359,7 +359,8 @@ func (value Value) append(builder *strings.Builder) {

func (value Value) ToRawGoValue(t Type) interface{} {
// TODO: Add complex types.
switch t.TypeID {
// TODO: Fix union handling.
switch value.TypeID {
case TypeIDNull:
return nil
case TypeIDInt:
Expand All @@ -375,6 +376,7 @@ func (value Value) ToRawGoValue(t Type) interface{} {
case TypeIDDuration:
return value.Duration
case TypeIDList:
// TODO: Fix union handling.
if t.List.Element == nil {
return []interface{}{}
}
Expand All @@ -384,12 +386,14 @@ func (value Value) ToRawGoValue(t Type) interface{} {
}
return out
case TypeIDStruct:
// TODO: Fix union handling.
out := make(map[string]interface{}, len(value.Struct))
for i := range value.Struct {
out[t.Struct.Fields[i].Name] = value.Struct[i].ToRawGoValue(t.Struct.Fields[i].Type)
}
return out
case TypeIDTuple:
// TODO: Fix union handling.
out := make([]interface{}, len(value.Tuple))
for i := range value.Tuple {
out[i] = value.Tuple[i].ToRawGoValue(t.Tuple.Elements[i])
Expand Down
81 changes: 72 additions & 9 deletions outputs/formats/json_format.go
@@ -1,24 +1,29 @@
package formats

import (
"fmt"
"io"
"log"
"time"

"github.com/segmentio/encoding/json"
"github.com/valyala/fastjson"

"github.com/cube2222/octosql/octosql"
"github.com/cube2222/octosql/physical"
)

type JSONFormatter struct {
encoder *json.Encoder
fields []physical.SchemaField
buf []byte
arena *fastjson.Arena
w io.Writer
fields []physical.SchemaField
}

func NewJSONFormatter(w io.Writer) *JSONFormatter {
encoder := json.NewEncoder(w)

return &JSONFormatter{
encoder: encoder,
buf: make([]byte, 0, 1024),
arena: new(fastjson.Arena),
w: w,
}
}

Expand All @@ -27,12 +32,70 @@ func (t *JSONFormatter) SetSchema(schema physical.Schema) {
}

func (t *JSONFormatter) Write(values []octosql.Value) error {
out := make(map[string]interface{}, len(values))
obj := t.arena.NewObject()
for i := range t.fields {
out[t.fields[i].Name] = values[i].ToRawGoValue(t.fields[i].Type)
obj.Set(t.fields[i].Name, ValueToJson(t.arena, t.fields[i].Type, values[i]))
}

return t.encoder.Encode(out)
t.buf = obj.MarshalTo(t.buf)
t.buf = append(t.buf, '\n')
t.w.Write(t.buf)
t.buf = t.buf[:0]
t.arena.Reset()
return nil
}

func ValueToJson(arena *fastjson.Arena, t octosql.Type, value octosql.Value) *fastjson.Value {
if t.TypeID == octosql.TypeIDUnion {
for i := range t.Union.Alternatives {
if t.Union.Alternatives[i].TypeID == value.TypeID {
return ValueToJson(arena, t.Union.Alternatives[i], value)
}
}
log.Printf("Invalid value of type '%s' for union type '%s'. Using null.", value.TypeID.String(), t.String())
return arena.NewNull()
}

switch value.TypeID {
case octosql.TypeIDNull:
return arena.NewNull()
case octosql.TypeIDInt:
return arena.NewNumberInt(value.Int)
case octosql.TypeIDFloat:
return arena.NewNumberFloat64(value.Float)
case octosql.TypeIDBoolean:
if value.Boolean {
return arena.NewTrue()
} else {
return arena.NewFalse()
}
case octosql.TypeIDString:
return arena.NewString(value.Str)
case octosql.TypeIDTime:
return arena.NewString(value.Time.Format(time.RFC3339))
case octosql.TypeIDDuration:
return arena.NewString(value.Duration.String())
case octosql.TypeIDList:
arr := arena.NewArray()
for i := range value.List {
arr.SetArrayItem(i, ValueToJson(arena, *t.List.Element, value.List[i]))
}
return arr
case octosql.TypeIDStruct:
arr := arena.NewObject()
for i := range value.Struct {
arr.Set(t.Struct.Fields[i].Name, ValueToJson(arena, t.Struct.Fields[i].Type, value.Struct[i]))
}
return arr
case octosql.TypeIDTuple:
arr := arena.NewArray()
for i := range value.Tuple {
arr.SetArrayItem(i, ValueToJson(arena, t.Tuple.Elements[i], value.Tuple[i]))
}
return arr
default:
panic(fmt.Sprintf("invalid octosql value type to print: %s", value.TypeID.String()))
}
}

func (t *JSONFormatter) Close() error {
Expand Down
2 changes: 2 additions & 0 deletions tests/scenarios/outputs/json/fixtures/simple.json
@@ -0,0 +1,2 @@
{"field1": "value", "field2": 42, "field3": {"field4": "eulav", "field5": 24}, "field7": [42, 43, {"field8": "value8"}]}
{"field1": "value", "field2": 42, "field3": {"field5": "eulav", "field6": "value"}}
3 changes: 3 additions & 0 deletions tests/scenarios/outputs/json/fixtures/union.json
@@ -0,0 +1,3 @@
{"id": 0, "value": 42}
{"id": 0, "value": "42"}
{"id": 0, "value": [42, "42"]}
Empty file.
1 change: 1 addition & 0 deletions tests/scenarios/outputs/json/simple.in
@@ -0,0 +1 @@
octosql "SELECT * FROM fixtures/simple.json" --output json
2 changes: 2 additions & 0 deletions tests/scenarios/outputs/json/simple.out
@@ -0,0 +1,2 @@
{"simple.field1":"value","simple.field2":42,"simple.field3":{"field4":"eulav","field5":24,"field6":null},"simple.field7":[42,43,{"field8":"value8"}]}
{"simple.field1":"value","simple.field2":42,"simple.field3":{"field4":null,"field5":"eulav","field6":"value"},"simple.field7":null}
Empty file.
1 change: 1 addition & 0 deletions tests/scenarios/outputs/json/union.in
@@ -0,0 +1 @@
octosql "SELECT * FROM fixtures/union.json" --output json
3 changes: 3 additions & 0 deletions tests/scenarios/outputs/json/union.out
@@ -0,0 +1,3 @@
{"union.id":0,"union.value":42}
{"union.id":0,"union.value":"42"}
{"union.id":0,"union.value":[42,"42"]}
2 changes: 1 addition & 1 deletion tests/tester/main.go
Expand Up @@ -157,7 +157,7 @@ func diffTest(testCase string, print bool) bool {
})
if stderrDiff != "" {
if print {
fmt.Println(stdoutDiff)
fmt.Println(stderrDiff)
}
diff = true
}
Expand Down

0 comments on commit 52e1f02

Please sign in to comment.