forked from polarsignals/frostdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
part.go
165 lines (145 loc) 路 4.12 KB
/
part.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package parts
import (
"io"
"sort"
"github.com/apache/arrow/go/v15/arrow"
"github.com/dreamsxin/frostdb/dynparquet"
)
type Part interface {
// Record returns the Arrow record for the part. If the part is not an Arrow
// record part, nil is returned.
Record() arrow.Record
Release()
Retain()
SerializeBuffer(schema *dynparquet.Schema, w dynparquet.ParquetWriter) error
AsSerializedBuffer(schema *dynparquet.Schema) (*dynparquet.SerializedBuffer, error)
NumRows() int64
Size() int64
CompactionLevel() int
TX() uint64
Least() (*dynparquet.DynamicRow, error)
Most() (*dynparquet.DynamicRow, error)
OverlapsWith(schema *dynparquet.Schema, otherPart Part) (bool, error)
Write(io.Writer) error
}
type basePart struct {
tx uint64
compactionLevel int
minRow *dynparquet.DynamicRow
maxRow *dynparquet.DynamicRow
release func()
}
func (p *basePart) CompactionLevel() int {
return p.compactionLevel
}
func (p *basePart) TX() uint64 { return p.tx }
type Option func(*basePart)
func WithCompactionLevel(level int) Option {
return func(p *basePart) {
p.compactionLevel = level
}
}
func WithRelease(release func()) Option {
return func(p *basePart) {
p.release = release
}
}
type PartSorter struct {
schema *dynparquet.Schema
parts []Part
err error
}
func NewPartSorter(schema *dynparquet.Schema, parts []Part) *PartSorter {
return &PartSorter{
schema: schema,
parts: parts,
}
}
func (p *PartSorter) Len() int {
return len(p.parts)
}
func (p *PartSorter) Less(i, j int) bool {
a, err := p.parts[i].Least()
if err != nil {
p.err = err
return false
}
b, err := p.parts[j].Least()
if err != nil {
p.err = err
return false
}
return p.schema.RowLessThan(a, b)
}
func (p *PartSorter) Swap(i, j int) {
p.parts[i], p.parts[j] = p.parts[j], p.parts[i]
}
func (p *PartSorter) Err() error {
return p.err
}
// FindMaximumNonOverlappingSet removes the minimum number of parts from the
// given slice in order to return the maximum non-overlapping set of parts.
// The function returns the non-overlapping parts first and any overlapping
// parts second. The parts returned are in sorted order according to their Least
// row.
func FindMaximumNonOverlappingSet(schema *dynparquet.Schema, parts []Part) ([]Part, []Part, error) {
if len(parts) < 2 {
return parts, nil, nil
}
sorter := NewPartSorter(schema, parts)
sort.Sort(sorter)
if sorter.Err() != nil {
return nil, nil, sorter.Err()
}
// Parts are now sorted according to their Least row.
prev := 0
prevEnd, err := parts[0].Most()
if err != nil {
return nil, nil, err
}
nonOverlapping := make([]Part, 0, len(parts))
overlapping := make([]Part, 0, len(parts))
var missing Part
for i := 1; i < len(parts); i++ {
start, err := parts[i].Least()
if err != nil {
return nil, nil, err
}
curEnd, err := parts[i].Most()
if err != nil {
return nil, nil, err
}
if schema.Cmp(prevEnd, start) <= 0 {
// No overlap, append the previous part and update end for the next
// iteration.
nonOverlapping = append(nonOverlapping, parts[prev])
prevEnd = curEnd
prev = i
continue
}
// This part overlaps with the previous part. Remove the part with
// the highest end row.
if schema.Cmp(prevEnd, curEnd) >= 0 {
overlapping = append(overlapping, parts[prev])
prevEnd = curEnd
prev = i
} else {
// The current part must be removed. Don't update prevEnd or prev,
// this will be used in the next iteration and must stay the same.
overlapping = append(overlapping, parts[i])
if i == len(parts)-1 { // This is the last iteration mark this one as missing
missing = parts[prev]
}
}
}
if len(overlapping) == 0 || overlapping[len(overlapping)-1] != parts[len(parts)-1] {
// The last part either did not overlap with its previous part, or
// overlapped but had a smaller end row than its previous part (so the
// previous part is in the overlapping slice). The last part must be
// appended to nonOverlapping.
nonOverlapping = append(nonOverlapping, parts[len(parts)-1])
} else if missing != nil {
overlapping = append(overlapping, missing)
}
return nonOverlapping, overlapping, nil
}