/
parquet_file_reader.go
80 lines (72 loc) · 2.13 KB
/
parquet_file_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package parquet
import (
"github.com/chrislusf/gleam/filesystem"
"github.com/chrislusf/gleam/util"
. "github.com/xitongsys/parquet-go/source"
. "github.com/xitongsys/parquet-go/reader"
. "github.com/xitongsys/parquet-go/types"
"io"
)
type PqFile struct {
FileName string
VF filesystem.VirtualFile
}
func (self *PqFile) Create(name string) (ParquetFile, error) {
return nil, nil
}
func (self *PqFile) Open(name string) (ParquetFile, error) {
if name == "" {
name = self.FileName
}
vf, err := filesystem.Open(name)
if err != nil {
return nil, err
}
res := &PqFile{
VF: vf,
FileName: name,
}
return res, nil
}
func (self *PqFile) Seek(offset int64, pos int) (int64, error) {
return self.VF.Seek(offset, pos)
}
func (self *PqFile) Read(b []byte) (n int, err error) {
return self.VF.Read(b)
}
func (self *PqFile) Write(b []byte) (n int, err error) {
return 0, nil
}
func (self *PqFile) Close() error { return nil }
type ParquetFileReader struct {
pqReader *ParquetReader
NumRows int
Cursor int
}
func New(reader filesystem.VirtualFile, fileName string) *ParquetFileReader {
parquetFileReader := new(ParquetFileReader)
var pqFile ParquetFile = &PqFile{}
pqFile, _ = pqFile.Open(fileName)
parquetFileReader.pqReader, _ = NewParquetColumnReader(pqFile, 1)
parquetFileReader.NumRows = int(parquetFileReader.pqReader.GetNumRows())
return parquetFileReader
}
func (self *ParquetFileReader) ReadHeader() (fieldNames []string, err error) {
return self.pqReader.SchemaHandler.ValueColumns, nil
}
func (self *ParquetFileReader) Read() (row *util.Row, err error) {
if self.Cursor >= self.NumRows {
return nil, io.EOF
}
objects := make([]interface{}, 0)
for _, fieldName := range self.pqReader.SchemaHandler.ValueColumns {
schemaIndex := self.pqReader.SchemaHandler.MapIndex[fieldName]
values, _, _, _ := self.pqReader.ReadColumnByPath(fieldName, 1)
objects = append(objects, ParquetTypeToGoType(values[0],
self.pqReader.SchemaHandler.SchemaElements[schemaIndex].Type,
self.pqReader.SchemaHandler.SchemaElements[schemaIndex].ConvertedType,
))
}
self.Cursor++
return util.NewRow(util.Now(), objects...), nil
}