forked from cockroachdb/cockroach
/
sorter.go
152 lines (138 loc) · 5.43 KB
/
sorter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package distsqlrun
import (
"sync"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/sql/mon"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
// sorter sorts the input rows according to the column ordering specified by ordering. Note
// that this is a no-grouping aggregator and therefore it does not produce a global ordering but
// simply guarantees an intra-stream ordering on the physical output stream.
type sorter struct {
processorBase
flowCtx *FlowCtx
// input is a row source without metadata; the metadata is directed straight
// to out.output.
input NoMetadataRowSource
// rawInput is the true input, not wrapped in a NoMetadataRowSource.
rawInput RowSource
ordering sqlbase.ColumnOrdering
matchLen uint32
// count is the maximum number of rows that the sorter will push to the
// ProcOutputHelper. 0 if the sorter should sort and push all the rows from
// the input.
count int64
// testingKnobMemLimit is used in testing to set a limit on the memory that
// should be used by the sortAllStrategy. Minimum value to enable is 1.
testingKnobMemLimit int64
// tempStorage is used to store rows when the working set is larger than can
// be stored in memory.
tempStorage engine.Engine
}
var _ Processor = &sorter{}
func newSorter(
flowCtx *FlowCtx, spec *SorterSpec, input RowSource, post *PostProcessSpec, output RowReceiver,
) (*sorter, error) {
count := int64(0)
if post.Limit != 0 {
// The sorter needs to produce Offset + Limit rows. The ProcOutputHelper
// will discard the first Offset ones.
count = int64(post.Limit) + int64(post.Offset)
}
s := &sorter{
flowCtx: flowCtx,
input: MakeNoMetadataRowSource(input, output),
rawInput: input,
ordering: convertToColumnOrdering(spec.OutputOrdering),
matchLen: spec.OrderingMatchLen,
count: count,
tempStorage: flowCtx.TempStorage,
}
if err := s.out.Init(post, input.Types(), &flowCtx.EvalCtx, output); err != nil {
return nil, err
}
return s, nil
}
// Run is part of the processor interface.
func (s *sorter) Run(ctx context.Context, wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}
ctx = log.WithLogTag(ctx, "Sorter", nil)
ctx, span := processorSpan(ctx, "sorter")
defer tracing.FinishSpan(span)
if log.V(2) {
log.Infof(ctx, "starting sorter run")
defer log.Infof(ctx, "exiting sorter run")
}
var sv memRowContainer
// Enable fall back to disk if the cluster setting is set or a memory limit
// has been set through testing.
st := s.flowCtx.Settings
useTempStorage := settingUseTempStorageSorts.Get(&st.SV) || s.testingKnobMemLimit > 0
rowContainerMon := s.flowCtx.EvalCtx.Mon
if s.matchLen == 0 && s.count == 0 && useTempStorage {
// We will use the sortAllStrategy in this case and potentially fall
// back to disk.
// Limit the memory use by creating a child monitor with a hard limit.
// The strategy will overflow to disk if this limit is not enough.
limit := s.testingKnobMemLimit
if limit <= 0 {
limit = settingWorkMemBytes.Get(&st.SV)
}
limitedMon := mon.MakeMonitorInheritWithLimit(
"sortall-limited", limit, s.flowCtx.EvalCtx.Mon,
)
limitedMon.Start(ctx, s.flowCtx.EvalCtx.Mon, mon.BoundAccount{})
defer limitedMon.Stop(ctx)
rowContainerMon = &limitedMon
}
sv.initWithMon(s.ordering, s.rawInput.Types(), &s.flowCtx.EvalCtx, rowContainerMon)
// Construct the optimal sorterStrategy.
var ss sorterStrategy
if s.matchLen == 0 {
if s.count == 0 {
// No specified ordering match length and unspecified limit; no
// optimizations are possible so we simply load all rows into memory and
// sort all values in-place. It has a worst-case time complexity of
// O(n*log(n)) and a worst-case space complexity of O(n).
ss = newSortAllStrategy(&sv, useTempStorage)
} else {
// No specified ordering match length but specified limit; we can optimize
// our sort procedure by maintaining a max-heap populated with only the
// smallest k rows seen. It has a worst-case time complexity of
// O(n*log(k)) and a worst-case space complexity of O(k).
ss = newSortTopKStrategy(&sv, s.count)
}
} else {
// Ordering match length is specified. We will be able to use existing
// ordering in order to avoid loading all the rows into memory. If we're
// scanning an index with a prefix matching an ordering prefix, we can only
// accumulate values for equal fields in this prefix, sort the accumulated
// chunk and then output.
// TODO(irfansharif): Add optimization for case where both ordering match
// length and limit is specified.
ss = newSortChunksStrategy(&sv)
}
sortErr := ss.Execute(ctx, s)
if sortErr != nil {
log.Errorf(ctx, "error sorting rows: %s", sortErr)
}
DrainAndClose(ctx, s.out.output, sortErr, s.rawInput)
}