Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner/spannertest): support LEFT OUTER JOIN with ON clause #2945

Merged
merged 1 commit into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions spanner/spannertest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ by ascending esotericism:

- expression functions
- more aggregation functions
- more joins types (INNER, CROSS, FULL, RIGHT)
- INSERT/UPDATE DML statements
- SELECT HAVING
- case insensitivity
- alternate literal types (esp. strings)
- STRUCT types
- SELECT HAVING
- joins
- transaction simulation
- expression type casting, coercion
- subselects
Expand Down
185 changes: 172 additions & 13 deletions spanner/spannertest/db_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pulling from a table (FROM tbl), filtering (WHERE expr), re-ordering (ORDER BY e
or other transformations.

The order of operations among those supported by Cloud Spanner is
FROM + JOIN + set ops [TODO: JOIN and set ops]
FROM + JOIN + set ops [TODO: set ops]
WHERE
GROUP BY
aggregation
Expand Down Expand Up @@ -129,6 +129,11 @@ func (raw *rawIter) add(src row, colIndexes []int) {
raw.rows = append(raw.rows, src.copyData(colIndexes))
}

// clone makes a shallow copy.
func (raw *rawIter) clone() *rawIter {
return &rawIter{cols: raw.cols, rows: raw.rows}
}

func toRawIter(ri rowIter) (*rawIter, error) {
if raw, ok := ri.(*rawIter); ok {
return raw, nil
Expand Down Expand Up @@ -350,23 +355,16 @@ func (d *database) evalSelect(sel spansql.Select, params queryParams) (ri rowIte
return nil, fmt.Errorf("selecting with more than one FROM clause not yet supported")
}
if len(sel.From) == 1 {
sft, ok := sel.From[0].(spansql.SelectFromTable)
if !ok {
return nil, fmt.Errorf("selecting with FROM clause of type %T not yet supported", sel.From[0])
}
tableName := sft.Table
t, err := d.table(tableName)
var unlock func()
var err error
ec, ri, unlock, err = d.evalSelectFrom(ec, sel.From[0])
if err != nil {
return nil, err
}
t.mu.Lock()
defer t.mu.Unlock()
ti := &tableIter{t: t, alias: sft.Alias}
ri = ti
ec.cols = ti.Cols()
defer unlock()

// On the way out, convert the result to a rawIter
// so that the table may be safely unlocked.
// so that any locked tables may be safely unlocked.
defer func() {
if evalErr == nil {
ri, evalErr = toRawIter(ri)
Expand Down Expand Up @@ -587,6 +585,167 @@ func (d *database) evalSelect(sel spansql.Select, params queryParams) (ri rowIte
return ri, nil
}

func (d *database) evalSelectFrom(ec evalContext, sf spansql.SelectFrom) (evalContext, rowIter, func(), error) {
switch sf := sf.(type) {
default:
return ec, nil, nil, fmt.Errorf("selecting with FROM clause of type %T not yet supported", sf)
case spansql.SelectFromTable:
t, err := d.table(sf.Table)
if err != nil {
return ec, nil, nil, err
}
t.mu.Lock()
ti := &tableIter{t: t}
if sf.Alias != "" {
ti.alias = sf.Alias
} else {
// There is an implicit alias using the table name.
// https://cloud.google.com/spanner/docs/query-syntax#implicit_aliases
ti.alias = sf.Table
}
ec.cols = ti.Cols()
return ec, ti, t.mu.Unlock, nil
case spansql.SelectFromJoin:
// TODO: Avoid the toRawIter calls here by rethinking how locking works throughout evalSelect,
// then doing the RHS recursive evalSelectFrom in joinIter.Next on demand.

lhsEC, lhs, unlock, err := d.evalSelectFrom(ec, sf.LHS)
if err != nil {
return ec, nil, nil, err
}
lhsRaw, err := toRawIter(lhs)
unlock()
if err != nil {
return ec, nil, nil, err
}

rhsEC, rhs, unlock, err := d.evalSelectFrom(ec, sf.RHS)
if err != nil {
return ec, nil, nil, err
}
rhsRaw, err := toRawIter(rhs)
unlock()
if err != nil {
return ec, nil, nil, err
}

// Construct a merged evalContext.
// TODO: Remove ambiguous names here? Or catch them when evaluated?
ec.cols = append(append([]colInfo(nil), lhsEC.cols...), rhsEC.cols...) // force a copy
ec.row = make(row, len(ec.cols)) // row is used as scratch space in joinIter.Next.
// TODO: aliases might need work?

return ec, &joinIter{
sfj: sf,
ec: ec,

lhs: lhsRaw,
rhsOrig: rhsRaw,
}, func() {}, nil
}
}

type joinIter struct {
sfj spansql.SelectFromJoin // TODO: or less of this?
ec evalContext // combined context
scratch row // used for synthetic lhs+rhs row

// lhs is scanned (consumed), but rhs is cloned for each lhs row.
lhs, rhsOrig *rawIter

lhsRow row // current row from lhs, or nil if it is time to advance
rhs *rawIter // current clone of rhs
any bool // true if any rhs rows have matched lhsRow
}

func (ji *joinIter) Cols() []colInfo { return ji.ec.cols }

func (ji *joinIter) nextLeft() error {
var err error
ji.lhsRow, err = ji.lhs.Next()
if err != nil {
return err
}
ji.rhs = ji.rhsOrig.clone()
ji.any = false
return nil
}

func (ji *joinIter) Next() (row, error) {
// TODO: More join types.
if ji.sfj.Type != spansql.LeftJoin {
return nil, fmt.Errorf("TODO: can't yet evaluate join of type %v in %s", ji.sfj.Type, ji.sfj.SQL())
}

/*
The result of a LEFT OUTER JOIN (or simply LEFT JOIN) for two
from_items always retains all rows of the left from_item in the
JOIN clause, even if no rows in the right from_item satisfy the
join predicate.

LEFT indicates that all rows from the left from_item are
returned; if a given row from the left from_item does not join
to any row in the right from_item, the row will return with
NULLs for all columns from the right from_item. Rows from the
right from_item that do not join to any row in the left
from_item are discarded.
*/
if ji.lhsRow == nil {
dsymonds marked this conversation as resolved.
Show resolved Hide resolved
if err := ji.nextLeft(); err != nil {
return nil, err
}
}

for {
rhsRow, err := ji.rhs.Next()
if err == io.EOF {
if !ji.any {
copy(ji.ec.row, ji.lhsRow)
for i := len(ji.lhsRow); i < len(ji.ec.row); i++ {
ji.ec.row[i] = nil
}
ji.lhsRow = nil
return ji.ec.row, nil
}

// Finished the current LHS row;
// advance to next one.
if err := ji.nextLeft(); err != nil {
return nil, err
}
continue
}
if err != nil {
return nil, err
}
copy(ji.ec.row, ji.lhsRow)
copy(ji.ec.row[len(ji.lhsRow):], rhsRow)
match, err := ji.evalCond()
if err != nil {
return nil, err
}
if !match {
continue
}
ji.any = true
return ji.ec.row, nil
}
}

func (ji *joinIter) evalCond() (bool, error) {
if ji.sfj.On != nil {
return ji.ec.evalBoolExpr(ji.sfj.On)
}

if len(ji.sfj.Using) > 0 {
// TODO: USING in JOIN condition
return false, fmt.Errorf("USING clause in JOIN condition not yet supported")
}

// If there's no ON or USING condition, it matches.
return true, nil
}

// externalRowSorter implements sort.Interface for a slice of rows
// with an external sort key.
type externalRowSorter struct {
Expand Down
48 changes: 48 additions & 0 deletions spanner/spannertest/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,23 @@ func TestTableData(t *testing.T) {
},
PrimaryKey: []spansql.KeyPart{{Column: "LastName"}, {Column: "OpponentID"}}, // TODO: is this right?
},
// JoinA and JoinB are "A" and "B" from https://cloud.google.com/spanner/docs/query-syntax#join_types.
{
Name: "JoinA",
Columns: []spansql.ColumnDef{
{Name: "w", Type: spansql.Type{Base: spansql.Int64}},
{Name: "x", Type: spansql.Type{Base: spansql.String}},
},
PrimaryKey: []spansql.KeyPart{{Column: "w"}, {Column: "x"}},
},
{
Name: "JoinB",
Columns: []spansql.ColumnDef{
{Name: "y", Type: spansql.Type{Base: spansql.Int64}},
{Name: "z", Type: spansql.Type{Base: spansql.String}},
},
PrimaryKey: []spansql.KeyPart{{Column: "y"}, {Column: "z"}},
},
} {
st := db.ApplyDDL(ct)
if st.Code() != codes.OK {
Expand All @@ -334,6 +351,24 @@ func TestTableData(t *testing.T) {
if err != nil {
t.Fatalf("Inserting data: %v", err)
}
err = db.Insert(tx, "JoinA", []spansql.ID{"w", "x"}, []*structpb.ListValue{
listV(stringV("1"), stringV("a")),
listV(stringV("2"), stringV("b")),
listV(stringV("3"), stringV("c")),
listV(stringV("3"), stringV("d")),
})
if err != nil {
t.Fatalf("Inserting data: %v", err)
}
err = db.Insert(tx, "JoinB", []spansql.ID{"y", "z"}, []*structpb.ListValue{
listV(stringV("2"), stringV("k")),
listV(stringV("3"), stringV("m")),
listV(stringV("3"), stringV("n")),
listV(stringV("4"), stringV("p")),
})
if err != nil {
t.Fatalf("Inserting data: %v", err)
}
if _, err := tx.Commit(); err != nil {
t.Fatalf("Commiting changes: %v", err)
}
Expand Down Expand Up @@ -556,6 +591,19 @@ func TestTableData(t *testing.T) {
{"Sam"},
},
},
// Joins.
{
`SELECT * FROM JoinA LEFT OUTER JOIN JoinB AS B ON JoinA.w = B.y`,
nil,
[][]interface{}{
{int64(1), "a", nil, nil},
{int64(2), "b", int64(2), "k"},
{int64(3), "c", int64(3), "m"},
{int64(3), "c", int64(3), "n"},
{int64(3), "d", int64(3), "m"},
{int64(3), "d", int64(3), "n"},
},
},
// Regression test for aggregating no rows; it used to return an empty row.
// https://github.com/googleapis/google-cloud-go/issues/2793
{
Expand Down