forked from polarsignals/frostdb
/
ordered_synchronizer.go
250 lines (225 loc) 路 7.48 KB
/
ordered_synchronizer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package physicalplan
import (
"context"
"errors"
"fmt"
"sync"
"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/dreamsxin/frostdb/dynparquet"
"github.com/dreamsxin/frostdb/pqarrow/arrowutils"
"github.com/dreamsxin/frostdb/query/logicalplan"
)
// OrderedSynchronizer implements synchronizing ordered input from multiple
// goroutines. The strategy used is that any input that calls Callback must wait
// for all the other inputs to call Callback, since an ordered result cannot
// be produced until all inputs have pushed data. Another strategy would be to
// store the pushed records, but that requires fully copying all the data for
// safety.
type OrderedSynchronizer struct {
pool memory.Allocator
orderByExprs []logicalplan.Expr
// orderByCols []int
orderByCols []arrowutils.SortingColumn
sync struct {
mtx sync.Mutex
lastSchema *arrow.Schema
data []arrow.Record
// inputsWaiting is an integer that keeps track of the number of inputs
// waiting on the wait channel. It cannot be an atomic because it
// sometimes needs to be compared to inputsRunning.
inputsWaiting int
// inputsRunning is an integer that keeps track of the number of inputs
// that have not called Finish yet.
inputsRunning int
}
wait chan struct{}
next PhysicalPlan
}
func NewOrderedSynchronizer(pool memory.Allocator, inputs int, orderByExprs []logicalplan.Expr) *OrderedSynchronizer {
o := &OrderedSynchronizer{
pool: pool,
orderByExprs: orderByExprs,
wait: make(chan struct{}),
}
o.sync.inputsRunning = inputs
return o
}
func (o *OrderedSynchronizer) Close() {
o.next.Close()
}
func (o *OrderedSynchronizer) Callback(ctx context.Context, r arrow.Record) error {
o.sync.mtx.Lock()
o.sync.data = append(o.sync.data, r)
o.sync.inputsWaiting++
inputsWaiting := o.sync.inputsWaiting
inputsRunning := o.sync.inputsRunning
o.sync.mtx.Unlock()
if inputsWaiting != inputsRunning {
select {
case <-o.wait:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
o.sync.mtx.Lock()
defer o.sync.mtx.Unlock()
o.sync.inputsWaiting--
// This is the last input to call Callback, merge the records.
mergedRecord, err := o.mergeRecordsLocked()
if err != nil {
return err
}
// Note that we hold the mutex while calling Callback because we want to
// ensure that Callback is called in an ordered fashion since we could race
// with a call to Callback in Finish.
return o.next.Callback(ctx, mergedRecord)
}
func (o *OrderedSynchronizer) Finish(ctx context.Context) error {
o.sync.mtx.Lock()
defer o.sync.mtx.Unlock()
o.sync.inputsRunning--
running := o.sync.inputsRunning
if running > 0 && running == o.sync.inputsWaiting {
// All other goroutines are currently waiting to be woken up. We need to
// merge the records.
mergedRecord, err := o.mergeRecordsLocked()
if err != nil {
return err
}
return o.next.Callback(ctx, mergedRecord)
}
if running < 0 {
return errors.New("too many OrderedSynchronizer Finish calls")
}
if running > 0 {
return nil
}
return o.next.Finish(ctx)
}
// mergeRecordsLocked must be called while holding o.sync.mtx. It merges the
// records found in o.sync.data and unblocks all the inputs waiting on o.wait.
func (o *OrderedSynchronizer) mergeRecordsLocked() (arrow.Record, error) {
if err := o.ensureSameSchema(o.sync.data); err != nil {
return nil, err
}
mergedRecord, err := arrowutils.MergeRecords(o.pool, o.sync.data, o.orderByCols, 0)
if err != nil {
return nil, err
}
// Now that the records have been merged, we can wake up the other input
// goroutines. Since we have exactly o.sync.inputsWaiting waiting on the
// channel, send the corresponding number of messages. Since we are also
// holding the records mutex during this broadcast, fast inputs won't be
// able to re-enter Callback until the mutex is released, so won't
// mistakenly read another input's signal.
for i := 0; i < o.sync.inputsWaiting; i++ {
o.wait <- struct{}{}
}
// Reset inputsWaiting.
o.sync.inputsWaiting = 0
o.sync.data = o.sync.data[:0]
return mergedRecord, nil
}
// ensureSameSchema ensures that all the records have the same schema. In cases
// where the schema is not equal, virtual null columns are inserted in the
// records with the missing column. When we have static schemas in the execution
// engine, steps like these should be unnecessary.
func (o *OrderedSynchronizer) ensureSameSchema(records []arrow.Record) error {
var needSchemaRecalculation bool
for i := range records {
if !records[i].Schema().Equal(o.sync.lastSchema) {
needSchemaRecalculation = true
break
}
}
if !needSchemaRecalculation {
return nil
}
orderCols := make([]map[string]arrow.Field, len(o.orderByExprs))
leftoverCols := make(map[string]arrow.Field)
for i, orderCol := range o.orderByExprs {
orderCols[i] = make(map[string]arrow.Field)
for _, r := range records {
for j := 0; j < r.Schema().NumFields(); j++ {
field := r.Schema().Field(j)
if ok := orderCol.MatchColumn(field.Name); ok {
orderCols[i][field.Name] = field
} else {
leftoverCols[field.Name] = field
}
}
}
}
newFields := make([]arrow.Field, 0, len(orderCols))
for _, colsFound := range orderCols {
if len(colsFound) == 0 {
// An expected order by field is missing from the records, this
// field will just be considered to be null.
continue
}
if len(colsFound) == 1 {
for _, field := range colsFound {
newFields = append(newFields, field)
}
continue
}
// These columns are dynamic columns and should be merged to follow
// the physical sort order.
colNames := make([]string, 0, len(colsFound))
for name := range colsFound {
colNames = append(colNames, name)
}
// MergeDeduplicatedDynCols will return the dynamic column names in
// the order that they sort physically.
for _, name := range dynparquet.MergeDeduplicatedDynCols(colNames) {
newFields = append(newFields, colsFound[name])
}
}
o.orderByCols = o.orderByCols[:0]
for i := range newFields {
o.orderByCols = append(o.orderByCols, arrowutils.SortingColumn{Index: i})
}
for _, field := range leftoverCols {
newFields = append(newFields, field)
}
// This is the schema that all records must respect in order to be merged.
schema := arrow.NewSchema(newFields, nil)
for i := range records {
otherSchema := records[i].Schema()
if schema.Equal(records[i].Schema()) {
continue
}
var columns []arrow.Array
for j := 0; j < schema.NumFields(); j++ {
field := schema.Field(j)
if otherFields := otherSchema.FieldIndices(field.Name); otherFields != nil {
if len(otherFields) > 1 {
fieldsFound, _ := otherSchema.FieldsByName(field.Name)
return fmt.Errorf(
"found multiple fields %v for name %s",
fieldsFound,
field.Name,
)
}
columns = append(columns, records[i].Column(otherFields[0]))
} else {
// Note that this VirtualNullArray will be read from, but the
// merged output will be a physical null array, so there is no
// virtual->physical conversion necessary before we return data.
columns = append(columns, arrowutils.MakeVirtualNullArray(field.Type, int(records[i].NumRows())))
}
}
records[i] = array.NewRecord(schema, columns, records[i].NumRows())
}
o.sync.lastSchema = schema
return nil
}
func (o *OrderedSynchronizer) SetNext(next PhysicalPlan) {
o.next = next
}
func (o *OrderedSynchronizer) Draw() *Diagram {
return &Diagram{Details: "OrderedSynchronizer", Child: o.next.Draw()}
}