/
read_writer.go
73 lines (58 loc) · 1.49 KB
/
read_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package v1
import (
"io"
"sort"
"github.com/parquet-go/parquet-go"
)
type PersisterName interface {
Name() string
}
type Persister[T any] interface {
PersisterName
Schema() *parquet.Schema
Deconstruct(parquet.Row, uint64, T) parquet.Row
Reconstruct(parquet.Row) (uint64, T, error)
SortingColumns() parquet.SortingOption
}
type ReadWriter[T any, P Persister[T]] struct{}
func (*ReadWriter[T, P]) WriteParquetFile(file io.Writer, elements []T) error {
var (
persister P
rows = make([]parquet.Row, len(elements))
)
buffer := parquet.NewBuffer(persister.Schema(), parquet.SortingRowGroupConfig(persister.SortingColumns()))
for pos := range rows {
rows[pos] = persister.Deconstruct(rows[pos], uint64(pos), elements[pos])
}
if _, err := buffer.WriteRows(rows); err != nil {
return err
}
sort.Sort(buffer)
writer := parquet.NewWriter(file, persister.Schema())
if _, err := parquet.CopyRows(writer, buffer.Rows()); err != nil {
return err
}
return writer.Close()
}
func (*ReadWriter[T, P]) ReadParquetFile(file io.ReaderAt) ([]T, error) {
var (
persister P
reader = parquet.NewReader(file, persister.Schema())
)
defer reader.Close()
rows := make([]parquet.Row, reader.NumRows())
if _, err := reader.ReadRows(rows); err != nil {
return nil, err
}
var (
elements = make([]T, reader.NumRows())
err error
)
for pos := range elements {
_, elements[pos], err = persister.Reconstruct(rows[pos])
if err != nil {
return nil, err
}
}
return elements, nil
}