This repository has been archived by the owner on Feb 16, 2022. It is now read-only.
/
exec.go
179 lines (164 loc) · 7.26 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Copyright 2019 eBay Inc.
// Primary authors: Simon Fell, Diego Ongaro,
// Raymond Kroeker, and Sathish Kandasamy.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
"context"
"fmt"
"github.com/ebay/akutan/blog"
"github.com/ebay/akutan/facts/cache"
"github.com/ebay/akutan/query/planner/plandef"
"github.com/ebay/akutan/viewclient/lookups"
opentracing "github.com/opentracing/opentracing-go"
)
// Execute takes a query as described by the output of the query planner and
// executes it. Results are returned as 1..N ResultChunk's on the provided
// results channel. If the query generates no result rows, a single ResultChunk
// is generated that contains the column info, but no rows.
//
// This function blocks until all results are generated on the results channel
// or there was an error. The results channel is closed when this function
// returns regardless of outcomes. 'events' will receive callbacks during query
// execution, you can pass nil if you don't need them.
func Execute(ctx context.Context, events Events, index uint64, cache cache.FactCache, views lookups.All, query *plandef.Plan, resCh chan<- ResultChunk) error {
if events == nil {
events = ignoreEvents{}
}
rootOp := buildOperator(events, index, cache, views, query)
// we always need a root emptyResultOp, which generates an empty
// ResultChunk with columns in the event the query has no results. This is
// required to achieve the semantics for the API query service.
rootOp = &emptyResultOp{rootOp}
return rootOp.run(ctx, new(defaultBinder), resCh)
}
// queryOperator represents an executable stage of the query. There is typically
// one queryOperator instance for each operator in the query plan (although that
// is not a requirement).
type queryOperator interface {
// run() executes the query operator, generating its output to the supplied
// result channel. This blocks until all its values have been produced, or
// there was an error. In either case the result channel is closed before
// run returns.
//
// run can be asked to perform bulk requests by being passed a valueBinder
// with a len() > 1. The queryOperator in this case should perform its
// operator with each binding value. Results include the binding offset in
// them.
run(context.Context, valueBinder, chan<- ResultChunk) error
// All ResultChunk's generated by this queryOperator will have these columns
// in this order. Note this doesn't use plandef.VarSet as those are sorted
// and we need to be able to have other orders, e.g. the order of columns
// from a projection should match the order in the query.
columns() Columns
}
// buildOperator takes the query plan and builds a parallel tree of queryOperators.
func buildOperator(events Events, index blog.Index, cache cache.FactCache, views lookups.All, query *plandef.Plan) queryOperator {
inputOps := make([]queryOperator, len(query.Inputs))
for i, input := range query.Inputs {
inputOps[i] = buildOperator(events, index, cache, views, input)
}
switch op := query.Operator.(type) {
case *plandef.Ask:
return &decoratedOp{events, newAsk(op, inputOps)}
case *plandef.Enumerate:
return &decoratedOp{events, &enumerateOp{op}}
case *plandef.ExternalIDs:
return &decoratedOp{events, newExternalIDs(index, views, op, inputOps)}
case *plandef.OrderByOp:
return &decoratedOp{events, newOrderByOp(op, inputOps)}
case *plandef.LimitAndOffsetOp:
return &decoratedOp{events, newLimitAndOffsetOp(op, inputOps)}
case *plandef.DistinctOp:
return &decoratedOp{events, newDistinctOp(op, inputOps)}
case *plandef.HashJoin:
return &decoratedOp{events, newHashJoin(op, inputOps)}
case *plandef.LoopJoin:
return &decoratedOp{events, newLoopJoin(op, inputOps)}
case *plandef.InferPO:
return &decoratedOp{events, newInferPO(index, views, op, inputOps)}
case *plandef.InferSP:
return &decoratedOp{events, newInferSP(index, views, op, inputOps)}
case *plandef.InferSPO:
return &decoratedOp{events, newInferSPO(index, views, cache, op, inputOps)}
case *plandef.LookupPO:
return &decoratedOp{events, newLookupPO(index, views, op, inputOps)}
case *plandef.LookupPOCmp:
return &decoratedOp{events, newLookupPOCmp(index, views, op, inputOps)}
case *plandef.LookupS:
return &decoratedOp{events, newLookupS(index, views, op, inputOps)}
case *plandef.LookupSP:
return &decoratedOp{events, newLookupSP(index, views, op, inputOps)}
case *plandef.LookupSPO:
return &decoratedOp{events, newLookupSPO(index, views, op, inputOps)}
case *plandef.Projection:
return &decoratedOp{events, newProjection(op, inputOps)}
case *plandef.SelectLit:
return &decoratedOp{events, newSelectLitOp(op, inputOps)}
case *plandef.SelectVar:
return &decoratedOp{events, newSelectVarOp(op, inputOps)}
}
panic(fmt.Sprintf("Unexpected operator in query executor: %T (%v)", query.Operator, query.Operator))
}
// operator defines an executable operator instance. It sends its output to the
// provided results instance. These will be used with the decoratedOp struct
// to turn them into queryOperators.
type operator interface {
execute(context.Context, valueBinder, results) error
operator() plandef.Operator
// columns returns the list of columns generated by this operator. All
// ResultChunks generated by this operator will have their column results in
// this order.
columns() Columns
}
// results is used by operator implementations to send their output results to.
// Typically resultChunkBuilder is used as an implementation of this.
// Implementations of add should be concurrent safe. The values in rowValues
// should be in the same order the as operators declared columns.
type results interface {
add(ctx context.Context, offset uint32, f FactSet, rowValues []Value)
setFinalStatistics(stats FinalStatistics)
}
// decoratedOp uses the supplied operator implementation to implement
// queryOperator. The resulting queryOperator will signal the relevant events,
// and deal with some other common house keeping that every operator needs.
type decoratedOp struct {
events Events
op operator
}
// decoratedOp implements queryOperator
var _ queryOperator = &decoratedOp{}
func (d *decoratedOp) columns() Columns {
return d.op.columns()
}
func (d *decoratedOp) run(ctx context.Context, binder valueBinder, resCh chan<- ResultChunk) error {
span, ctx := opentracing.StartSpanFromContext(ctx, d.op.operator().String())
span.SetTag(bulkCountTag, binder.len())
clock := d.events.Clock()
startedAt := clock.Now()
rb := newChunkBuilder(resCh, d.op.columns())
err := d.op.execute(ctx, binder, rb)
outStats := rb.flush(ctx)
event := OpCompletedEvent{
Operator: d.op.operator(),
InputBulkCount: binder.len(),
StartedAt: startedAt,
EndedAt: clock.Now(),
Output: outStats,
Err: err,
}
d.events.OpCompleted(event)
close(resCh)
span.Finish()
return err
}