forked from vanadium-archive/go.v23
/
result_streams.go
103 lines (91 loc) · 2.69 KB
/
result_streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal
import (
ds "v.io/v23/query/engine/datasource"
"v.io/v23/query/engine/internal/query_parser"
"v.io/v23/query/syncql"
"v.io/v23/vdl"
"v.io/v23/vom"
)
// Select result stream
type selectResultStreamImpl struct {
db ds.Database
selectStatement *query_parser.SelectStatement
resultCount int64 // results served so far (needed for limit clause)
skippedCount int64 // skipped so far (needed for offset clause)
keyValueStream ds.KeyValueStream
k string
v *vom.RawBytes
err error
}
func (rs *selectResultStreamImpl) Advance() bool {
if rs.selectStatement.Limit != nil && rs.resultCount >= rs.selectStatement.Limit.Limit.Value {
return false
}
for rs.keyValueStream.Advance() {
k, v := rs.keyValueStream.KeyValue()
// EvalWhereUsingOnlyKey
// INCLUDE: the row should be included in the results
// EXCLUDE: the row should NOT be included
// FETCH_VALUE: the value and/or type of the value are required to make determination.
rv := EvalWhereUsingOnlyKey(rs.db, rs.selectStatement.Where, k)
var match bool
switch rv {
case INCLUDE:
match = true
case EXCLUDE:
match = false
case FETCH_VALUE:
match = Eval(rs.db, k, vdl.ValueOf(v), rs.selectStatement.Where.Expr)
}
if match {
if rs.selectStatement.ResultsOffset == nil || rs.selectStatement.ResultsOffset.ResultsOffset.Value <= rs.skippedCount {
rs.k = k
rs.v = v
rs.resultCount++
return true
} else {
rs.skippedCount++
}
}
}
if err := rs.keyValueStream.Err(); err != nil {
rs.err = syncql.NewErrKeyValueStreamError(rs.db.GetContext(), rs.selectStatement.Off, err)
}
return false
}
func (rs *selectResultStreamImpl) Result() []*vom.RawBytes {
return ComposeProjection(rs.db, rs.k, vdl.ValueOf(rs.v), rs.selectStatement.Select)
}
func (rs *selectResultStreamImpl) Err() error {
return rs.err
}
func (rs *selectResultStreamImpl) Cancel() {
rs.keyValueStream.Cancel()
}
// Delete result stream
type deleteResultStreamImpl struct {
db ds.Database
deleteStatement *query_parser.DeleteStatement
deleteCursor int64 // zero or one
deleteCount int64
err error
}
func (rs *deleteResultStreamImpl) Advance() bool {
if rs.deleteCursor == 0 {
rs.deleteCursor++
return true
}
return false
}
func (rs *deleteResultStreamImpl) Result() []*vom.RawBytes {
return []*vom.RawBytes{vom.RawBytesOf(rs.deleteCount)}
}
func (rs *deleteResultStreamImpl) Err() error {
return rs.err
}
func (rs *deleteResultStreamImpl) Cancel() {
rs.deleteCursor++
}