Skip to content

Commit f0aa0ec

Browse files
committed
Start a repeatedPageIterator
1 parent a9f87cf commit f0aa0ec

File tree

2 files changed

+257
-0
lines changed

2 files changed

+257
-0
lines changed

pkg/firedb/query/repeated.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package query
2+
3+
import (
4+
"io"
5+
6+
"github.com/segmentio/parquet-go"
7+
8+
"github.com/grafana/fire/pkg/iter"
9+
)
10+
11+
type RepeatedRow[T any] struct {
12+
Row T
13+
Values []parquet.Value
14+
}
15+
16+
type repeatedPageIterator[T any] struct {
17+
rows iter.Iterator[T]
18+
column int
19+
readSize int
20+
21+
rgs []parquet.RowGroup
22+
startRowGroupRowNum int64
23+
24+
currentPage parquet.Page
25+
startPageRowNum int64
26+
27+
currRowNum int64
28+
29+
currentPages parquet.Pages
30+
valueReader parquet.ValueReader
31+
32+
err error
33+
currentValue RepeatedRow[T]
34+
buffer []parquet.Value
35+
originalBuffer []parquet.Value
36+
}
37+
38+
func NewRepeatedPageIterator[T any](
39+
rows iter.Iterator[T],
40+
rgs []parquet.RowGroup,
41+
column int,
42+
readSize int,
43+
) iter.Iterator[RepeatedRow[T]] {
44+
if readSize <= 0 {
45+
panic("readSize must be greater than 0")
46+
}
47+
buffer := make([]parquet.Value, readSize)
48+
return &repeatedPageIterator[T]{
49+
rows: rows,
50+
rgs: rgs,
51+
column: column,
52+
readSize: readSize,
53+
buffer: buffer,
54+
originalBuffer: buffer,
55+
}
56+
}
57+
58+
func (it *repeatedPageIterator[T]) seekRowNum() int64 {
59+
return any(it.rows.At()).(RowGetter).RowNumber()
60+
}
61+
62+
func (it *repeatedPageIterator[T]) Next() bool {
63+
// we should only next the first time and if we have reached a new row in the page.
64+
if !it.rows.Next() { // 1 [1 2] // 2
65+
return false
66+
}
67+
return it.next()
68+
}
69+
70+
func (it *repeatedPageIterator[T]) next() bool {
71+
for {
72+
for len(it.rgs) != 0 && (it.seekRowNum() >= (it.startRowGroupRowNum + it.rgs[0].NumRows())) {
73+
if !it.closeCurrentPages() {
74+
return false
75+
}
76+
it.startRowGroupRowNum += it.rgs[0].NumRows()
77+
it.rgs = it.rgs[1:]
78+
}
79+
if len(it.rgs) == 0 {
80+
return false
81+
}
82+
if it.currentPages == nil {
83+
it.currentPages = it.rgs[0].ColumnChunks()[it.column].Pages()
84+
}
85+
// read a new page.
86+
if it.currentPage == nil {
87+
// SeekToRow seek across and within pages. So the next position in the page will the be the row.
88+
if err := it.currentPages.SeekToRow(it.seekRowNum() - it.startRowGroupRowNum); err != nil {
89+
it.err = err
90+
it.currentPages = nil
91+
return false
92+
}
93+
it.startPageRowNum = it.seekRowNum()
94+
var err error
95+
it.currentPage, err = it.currentPages.ReadPage()
96+
if err != nil {
97+
if err == io.EOF {
98+
continue
99+
}
100+
it.err = err
101+
return false
102+
}
103+
it.valueReader = nil
104+
}
105+
// if there's no more value in that page we can skip it.
106+
if it.seekRowNum() >= it.startPageRowNum+it.currentPage.NumRows() {
107+
it.currentPage = nil
108+
continue
109+
}
110+
111+
if it.valueReader == nil {
112+
it.valueReader = it.currentPage.Values()
113+
}
114+
// reading values....
115+
it.buffer = it.originalBuffer
116+
n, err := it.valueReader.ReadValues(it.buffer)
117+
if err != nil && err != io.EOF {
118+
it.err = err
119+
return false
120+
}
121+
it.buffer = it.buffer[:n]
122+
// no more buffer, move to next page
123+
if len(it.buffer) == 0 {
124+
it.currentPage = nil
125+
continue
126+
}
127+
// slice the current amount of values.
128+
// todo start with 1 it's still the current row.
129+
// only one 0 => we don't increment the next rowNum
130+
// two zero => we increment the next rowNum
131+
132+
//
133+
it.currentValue.Row = it.rows.At()
134+
135+
next := 1
136+
for _, v := range it.buffer[1:] {
137+
if v.RepetitionLevel() == 0 {
138+
break
139+
}
140+
next++
141+
}
142+
it.currentValue.Values = it.buffer[:next]
143+
it.buffer = it.buffer[next:]
144+
if len(it.buffer) == 0 {
145+
it.valueReader = nil
146+
}
147+
return true
148+
}
149+
}
150+
151+
func (it *repeatedPageIterator[T]) closeCurrentPages() bool {
152+
if it.currentPages != nil {
153+
if err := it.currentPages.Close(); err != nil {
154+
it.err = err
155+
it.currentPages = nil
156+
return false
157+
}
158+
it.currentPages = nil
159+
}
160+
return true
161+
}
162+
163+
func (it *repeatedPageIterator[T]) At() RepeatedRow[T] {
164+
return it.currentValue
165+
}
166+
167+
func (it *repeatedPageIterator[T]) Err() error {
168+
return it.err
169+
}
170+
171+
func (it *repeatedPageIterator[T]) Close() error {
172+
if it.currentPages != nil {
173+
if err := it.currentPages.Close(); err != nil {
174+
return err
175+
}
176+
it.currentPages = nil
177+
}
178+
return nil
179+
}
180+
181+
/// => 2 4 [ 2 3 4] skip rows that I don't select
182+
// => 2 10 {[ 2 3 4 5 6 7 8 9 ] } // skip through values and skip all values at once if possible.
183+
// 2 5 6 15 {[1 2] [5 6] [10 11]} {[15]} skip through rows.
184+
// 2 10 {[1 2] [5 6] [10 11]} {[15]} // skip through pages

pkg/firedb/query/repeated_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package query
2+
3+
import (
4+
"testing"
5+
6+
"github.com/grafana/fire/pkg/iter"
7+
"github.com/segmentio/parquet-go"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
type RepeatedTestRow struct {
12+
Id int
13+
List []int64
14+
}
15+
16+
type testRowGetter struct {
17+
rowNumber int64
18+
}
19+
20+
func (t testRowGetter) RowNumber() int64 {
21+
return t.rowNumber
22+
}
23+
24+
func Test_RepeatedIterator(t *testing.T) {
25+
defaultReadSize := 100
26+
for _, tc := range []struct {
27+
name string
28+
rows []testRowGetter
29+
rgs [][]RepeatedTestRow
30+
expected []RepeatedRow[testRowGetter]
31+
readSize int
32+
}{
33+
{
34+
name: "single row group",
35+
rows: []testRowGetter{
36+
{0},
37+
{1},
38+
{2},
39+
},
40+
rgs: [][]RepeatedTestRow{
41+
{
42+
{1, []int64{1, 2, 3}},
43+
{2, []int64{4, 5, 6}},
44+
{3, []int64{7, 8, 9}},
45+
},
46+
},
47+
expected: []RepeatedRow[testRowGetter]{
48+
{testRowGetter{0}, []parquet.Value{parquet.ValueOf(1), parquet.ValueOf(2), parquet.ValueOf(3)}},
49+
{testRowGetter{1}, []parquet.Value{parquet.ValueOf(4), parquet.ValueOf(5), parquet.ValueOf(6)}},
50+
{testRowGetter{2}, []parquet.Value{parquet.ValueOf(7), parquet.ValueOf(8), parquet.ValueOf(9)}},
51+
},
52+
},
53+
} {
54+
tc := tc
55+
t.Run(tc.name, func(t *testing.T) {
56+
var groups []parquet.RowGroup
57+
for _, rg := range tc.rgs {
58+
buffer := parquet.NewBuffer()
59+
for _, row := range rg {
60+
require.NoError(t, buffer.Write(row))
61+
}
62+
groups = append(groups, buffer)
63+
}
64+
if tc.readSize == 0 {
65+
tc.readSize = defaultReadSize
66+
}
67+
actual, err := iter.Slice(NewRepeatedPageIterator(
68+
iter.NewSliceIterator(tc.rows), groups, 1, tc.readSize))
69+
require.NoError(t, err)
70+
require.Equal(t, tc.expected, actual)
71+
})
72+
}
73+
}

0 commit comments

Comments
 (0)