forked from polarsignals/frostdb
/
util.go
82 lines (72 loc) 路 1.71 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package cmd
import (
"fmt"
"io"
"os"
"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/parquet-go/parquet-go"
)
func openParquetFile(file string) (*parquet.File, io.Closer, error) {
f, err := os.Open(file)
if err != nil {
return nil, nil, err
}
stats, err := f.Stat()
if err != nil {
return nil, f, err
}
pf, err := parquet.OpenFile(f, stats.Size())
if err != nil {
return nil, f, err
}
return pf, f, nil
}
func compare(v1, v2 parquet.Value) int {
if v1.IsNull() {
if v2.IsNull() {
return 0
}
return 1
}
if v2.IsNull() {
return -1
}
switch v1.Kind() {
case parquet.Int32:
return parquet.Int32Type.Compare(v1, v2)
case parquet.Int64:
return parquet.Int64Type.Compare(v1, v2)
case parquet.Float:
return parquet.FloatType.Compare(v1, v2)
case parquet.Double:
return parquet.DoubleType.Compare(v1, v2)
case parquet.ByteArray, parquet.FixedLenByteArray:
return parquet.ByteArrayType.Compare(v1, v2)
case parquet.Boolean:
return parquet.BooleanType.Compare(v1, v2)
default:
panic(fmt.Sprintf("unsupported value comparison: %v", v1.Kind()))
}
}
func inspectRecord(record arrow.Record, columns []string) {
if len(columns) == 0 {
fmt.Println(record)
} else {
fields := make([]arrow.Field, 0, len(columns))
cols := make([]arrow.Array, 0, len(columns))
for i := 0; i < record.Schema().NumFields(); i++ {
field := record.Schema().Field(i)
for _, col := range columns {
if col == field.Name {
fields = append(fields, field)
cols = append(cols, record.Column(i))
}
}
}
subschema := arrow.NewSchema(fields, nil)
r := array.NewRecord(subschema, cols, record.NumRows())
defer r.Release()
fmt.Println(r)
}
}