forked from xitongsys/parquet-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rowgroup.go
113 lines (99 loc) · 2.65 KB
/
rowgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package layout
import (
"errors"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/source"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/parquet"
)
//RowGroup stores the RowGroup in parquet file
type RowGroup struct {
Chunks []*Chunk
RowGroupHeader *parquet.RowGroup
}
//Create a RowGroup
func NewRowGroup() *RowGroup {
rowGroup := new(RowGroup)
rowGroup.RowGroupHeader = parquet.NewRowGroup()
return rowGroup
}
//Convert a RowGroup to table map
func (rowGroup *RowGroup) RowGroupToTableMap() *map[string]*Table {
tableMap := make(map[string]*Table, 0)
for _, chunk := range rowGroup.Chunks {
pathStr := ""
for _, page := range chunk.Pages {
if pathStr == "" {
pathStr = common.PathToStr(page.DataTable.Path)
}
if _, ok := tableMap[pathStr]; !ok {
tableMap[pathStr] = NewTableFromTable(page.DataTable)
}
tableMap[pathStr].Merge(page.DataTable)
}
}
return &tableMap
}
//Read one RowGroup from parquet file (Deprecated)
func ReadRowGroup(rowGroupHeader *parquet.RowGroup, PFile source.ParquetFile, schemaHandler *schema.SchemaHandler, NP int64) (*RowGroup, error) {
var err error
rowGroup := new(RowGroup)
rowGroup.RowGroupHeader = rowGroupHeader
columnChunks := rowGroupHeader.GetColumns()
ln := int64(len(columnChunks))
chunksList := make([][]*Chunk, NP)
for i := int64(0); i < NP; i++ {
chunksList[i] = make([]*Chunk, 0)
}
delta := (ln + NP - 1) / NP
doneChan := make(chan int, 1)
for c := int64(0); c < NP; c++ {
bgn := c * delta
end := bgn + delta
if end > ln {
end = ln
}
if bgn >= ln {
bgn, end = ln, ln
}
go func(index int64, bgn int64, end int64) {
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case string:
err = errors.New(x)
case error:
err = x
default:
err = errors.New("unknown error")
}
}
}()
for i := bgn; i < end; i++ {
offset := columnChunks[i].FileOffset
PFile := PFile
if columnChunks[i].FilePath != nil {
PFile, _ = PFile.Open(*columnChunks[i].FilePath)
} else {
PFile, _ = PFile.Open("")
}
size := columnChunks[i].MetaData.GetTotalCompressedSize()
thriftReader := source.ConvertToThriftReader(PFile, offset, size)
chunk, _ := ReadChunk(thriftReader, schemaHandler, columnChunks[i])
chunksList[index] = append(chunksList[index], chunk)
PFile.Close()
}
doneChan <- 1
}(c, bgn, end)
}
for c := int64(0); c < NP; c++ {
<-doneChan
}
for c := int64(0); c < NP; c++ {
if len(chunksList[c]) <= 0 {
continue
}
rowGroup.Chunks = append(rowGroup.Chunks, chunksList[c]...)
}
return rowGroup, err
}