Skip to content

Commit

Permalink
groot/rsql/rsqldrv: use rtree.Reader in lieu of Scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
sbinet committed Oct 6, 2020
1 parent d6ee5e8 commit dba7717
Show file tree
Hide file tree
Showing 2 changed files with 242 additions and 115 deletions.
111 changes: 79 additions & 32 deletions groot/rsql/rsqldrv/driver.go
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"io"
"reflect"
Expand Down Expand Up @@ -238,7 +239,10 @@ type driverRows struct {
deps []string // names of the columns to be read
vars []interface{} // values of the columns that were read

cursor *rtree.TreeScanner
reader *rtree.Reader
row rowCtx
rows chan rowCtx

eval expression
filter expression
}
Expand Down Expand Up @@ -315,7 +319,7 @@ func newDriverRows(ctx context.Context, conn *driverConn, stmt *sqlparser.Select
rows.deps = append(rows.deps, v.Name)
}

rows.cursor, err = rtree.NewTreeScannerVars(tree, vars...)
rows.reader, err = rtree.NewReader(tree, vars)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -349,6 +353,7 @@ func newDriverRows(ctx context.Context, conn *driverConn, stmt *sqlparser.Select
}
}

rows.start()
return rows, nil
}

Expand Down Expand Up @@ -544,7 +549,65 @@ func (r *driverRows) ColumnTypeNullable(i int) (nullable, ok bool) {

// Close closes the rows iterator.
func (r *driverRows) Close() error {
return r.cursor.Close()
return r.reader.Close()
}

type rowCtx struct {
ctx rtree.RCtx
vs interface{}
done chan int
err error
}

func (r *driverRows) start() {
r.rows = make(chan rowCtx)
r.row.ctx.Entry = -1
go func() {
defer close(r.rows)
err := r.reader.Read(func(ctx rtree.RCtx) error {
ectx := newExecCtx(r.conn, r.args)
vctx := make(map[interface{}]interface{})
for i, v := range r.vars {
vctx[r.deps[i]] = reflect.Indirect(reflect.ValueOf(v)).Interface()
}

switch r.filter {
case nil:
// no filter
default:
ok, err := r.filter.eval(ectx, vctx)
if err != nil {
//log.Printf("filter.eval: ok=%#v err=%v", ok, err)
return err
}
if !ok.(bool) {
return nil
}
}

vs, err := r.eval.eval(ectx, vctx)
// log.Printf("row.eval: v=%#v, err=%v n=%d", vs, err, len(dest))
if err != nil {
return fmt.Errorf("could not evaluate row values: %w", err)
}

evt := rowCtx{
ctx: ctx,
vs: vs,
err: nil,
done: make(chan int),
}

r.rows <- evt
<-evt.done
return nil
})
if err != nil {
r.rows <- rowCtx{err: err}
return
}
r.rows <- rowCtx{err: io.EOF}
}()
}

// Next is called to populate the next row of data into
Expand All @@ -557,41 +620,25 @@ func (r *driverRows) Close() error {
// should be taken when closing Rows not to modify
// a buffer held in dest.
func (r *driverRows) Next(dest []driver.Value) error {
if !r.cursor.Next() {
return io.EOF
}
err := r.cursor.Scan(r.vars...)
if err != nil {
return err
if r.row.ctx.Entry >= 0 {
close(r.row.done)
}

ectx := newExecCtx(r.conn, r.args)
vctx := make(map[interface{}]interface{})
for i, v := range r.vars {
vctx[r.deps[i]] = reflect.Indirect(reflect.ValueOf(v)).Interface()
row, ok := <-r.rows
r.row = row
if !ok {
return io.EOF
}

switch r.filter {
case nil:
// no filter
default:
ok, err := r.filter.eval(ectx, vctx)
if err != nil {
//log.Printf("filter.eval: ok=%#v err=%v", ok, err)
return err
}
if !ok.(bool) {
return r.Next(dest)
if row.err != nil {
switch {
case errors.Is(row.err, io.EOF):
return io.EOF
default:
return row.err
}
}

vs, err := r.eval.eval(ectx, vctx)
// log.Printf("row.eval: v=%#v, err=%v n=%d", vs, err, len(dest))
if err != nil {
return fmt.Errorf("could not evaluate row values: %w", err)
}

switch vs := vs.(type) {
switch vs := row.vs.(type) {
case []interface{}:
for i, v := range vs {
switch v := v.(type) {
Expand Down

0 comments on commit dba7717

Please sign in to comment.