forked from xitongsys/parquet-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
arrow.go
85 lines (76 loc) · 2.49 KB
/
arrow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package writer
import (
"fmt"
"github.com/apache/arrow/go/v12/arrow"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/layout"
"github.com/xitongsys/parquet-go/marshal"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/source"
)
const (
pageSize = 8 * 1024
rowGroupSize = 128 * 1024 * 1024
footerVersion = 1
offset = 4
)
// ArrowWriter extending the base ParqueWriter
type ArrowWriter struct {
ParquetWriter
}
// NewArrowWriter creates arrow schema parquet writer given the native
// arrow schema, parquet file writer which contains the parquet file in
// which we will write the record along with the number of parallel threads
// which will write in the file.
func NewArrowWriter(arrowSchema *arrow.Schema, pfile source.ParquetFile,
np int64) (*ArrowWriter, error) {
var err error
res := new(ArrowWriter)
res.SchemaHandler, err = schema.NewSchemaHandlerFromArrow(arrowSchema)
if err != nil {
return res, fmt.Errorf("Unable to create schema from arrow definition: %s",
err.Error())
}
res.PFile = pfile
res.PageSize = pageSize
res.RowGroupSize = rowGroupSize
// Compression type is by default: parquet.CompressionCodec_SNAPPY
res.CompressionType = parquet.CompressionCodec_GZIP
res.PagesMapBuf = make(map[string][]*layout.Page)
res.DictRecs = make(map[string]*layout.DictRecType)
res.NP = np
res.Footer = parquet.NewFileMetaData()
res.Footer.Version = footerVersion
res.Footer.Schema = append(res.Footer.Schema,
res.SchemaHandler.SchemaElements...)
res.Offset = offset
_, err = res.PFile.Write([]byte("PAR1"))
res.MarshalFunc = marshal.MarshalArrow
return res, err
}
// WriteArrow wraps the base Write function provided by writer.ParquetWriter.
// The function transforms the data from the record, which the go arrow library
// gives as array of columns, to array of rows which the parquet-go library
// can understand as it does not accepts data by columns, but rather by rows.
func (w *ArrowWriter) WriteArrow(record arrow.Record) error {
table := make([][]interface{}, 0)
for i, column := range record.Columns() {
columnFromRecord, err := common.ArrowColToParquetCol(
record.Schema().Field(i), column)
if err != nil {
return err
}
if len(columnFromRecord) > 0 {
table = append(table, columnFromRecord)
}
}
transposedTable := common.TransposeTable(table)
for _, row := range transposedTable {
err := w.Write(row)
if err != nil {
return err
}
}
return nil
}