-
Notifications
You must be signed in to change notification settings - Fork 53
/
file_reader.go
116 lines (97 loc) · 3.17 KB
/
file_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package goparquet
import (
"io"
"github.com/fraugster/parquet-go/parquet"
"github.com/pkg/errors"
)
// FileReader is used to read data from a parquet file. Always use NewFileReader to create
// such an object.
type FileReader struct {
meta *parquet.FileMetaData
SchemaReader
reader io.ReadSeeker
rowGroupPosition int
currentRecord int64
skipRowGroup bool
}
// NewFileReader creates a new FileReader. You can limit the columns that are read by providing
// the names of the specific columns to read using dotted notation. If no columns are provided,
// then all columns are read.
func NewFileReader(r io.ReadSeeker, columns ...string) (*FileReader, error) {
meta, err := readFileMetaData(r)
if err != nil {
return nil, errors.Wrap(err, "reading file meta data failed")
}
schema, err := makeSchema(meta)
if err != nil {
return nil, errors.Wrap(err, "creating schema failed")
}
schema.setSelectedColumns(columns...)
// Reset the reader to the beginning of the file
if _, err := r.Seek(4, io.SeekStart); err != nil {
return nil, err
}
return &FileReader{
meta: meta,
SchemaReader: schema,
reader: r,
}, nil
}
// readRowGroup read the next row group into memory
func (f *FileReader) readRowGroup() error {
if len(f.meta.RowGroups) <= f.rowGroupPosition {
return io.EOF
}
f.rowGroupPosition++
return readRowGroup(f.reader, f.SchemaReader, f.meta.RowGroups[f.rowGroupPosition-1])
}
// CurrentRowGroup returns information about the current row group.
func (f *FileReader) CurrentRowGroup() *parquet.RowGroup {
if f == nil || f.meta == nil || f.meta.RowGroups == nil || f.rowGroupPosition-1 >= len(f.meta.RowGroups) {
return nil
}
return f.meta.RowGroups[f.rowGroupPosition-1]
}
// RowGroupCount returns the number of row groups in the parquet file.
func (f *FileReader) RowGroupCount() int {
return len(f.meta.RowGroups)
}
// NumRows returns the number of rows in the parquet file. This information is directly taken from
// the file's meta data.
func (f *FileReader) NumRows() int64 {
return f.meta.NumRows
}
func (f *FileReader) advanceIfNeeded() error {
if f.rowGroupPosition == 0 || f.currentRecord >= f.SchemaReader.rowGroupNumRecords() || f.skipRowGroup {
if err := f.readRowGroup(); err != nil {
f.skipRowGroup = true
return err
}
f.currentRecord = 0
f.skipRowGroup = false
}
return nil
}
// RowGroupNumRows returns the number of rows in the current RowGroup.
func (f *FileReader) RowGroupNumRows() (int64, error) {
if err := f.advanceIfNeeded(); err != nil {
return 0, err
}
return f.SchemaReader.rowGroupNumRecords(), nil
}
// NextRow reads the next row from the parquet file. If required, it will load the next row group.
func (f *FileReader) NextRow() (map[string]interface{}, error) {
if err := f.advanceIfNeeded(); err != nil {
return nil, err
}
f.currentRecord++
return f.SchemaReader.getData()
}
// SkipRowGroup skips the currently loaded row group and advances to the next row group.
func (f *FileReader) SkipRowGroup() {
f.skipRowGroup = true
}
// PreLoad is used to load the row group if required. It does nothing if the row group is already loaded.
func (f *FileReader) PreLoad() error {
return f.advanceIfNeeded()
}