This repository has been archived by the owner on Feb 21, 2024. It is now read-only.
/
parquet-info.go
118 lines (109 loc) · 2.85 KB
/
parquet-info.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright 2022 Molecula Corp. (DBA FeatureBase).
// SPDX-License-Identifier: Apache-2.0
package ctl
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"github.com/apache/arrow/go/v10/arrow/memory"
"github.com/apache/arrow/go/v10/parquet/file"
"github.com/apache/arrow/go/v10/parquet/pqarrow"
pilosa "github.com/featurebasedb/featurebase/v3"
"github.com/featurebasedb/featurebase/v3/logger"
)
// ParquetInfoCommand represents a command for displaying info about a parquet file
type ParquetInfoCommand struct {
// Filepath or URL to the parquet file.
Path string
// Standard input/output
stdout io.Writer
logDest logger.Logger
}
// NewParquetInfoCommand returns a new instance of ParquetInfoCommand.
func NewParquetInfoCommand(logdest logger.Logger) *ParquetInfoCommand {
return &ParquetInfoCommand{
stdout: os.Stdout,
logDest: logdest,
}
}
// Run displays schema and samples data from a parquet file
func (cmd *ParquetInfoCommand) Run(ctx context.Context) error {
// Open database.
var f *os.File
_, err := url.ParseRequestURI(cmd.Path)
if err == nil { // treat as a URL
response, err := http.Get(cmd.Path)
if err != nil {
return err
}
if response.StatusCode != 200 {
return fmt.Errorf("unexpected response %d", response.StatusCode)
}
defer response.Body.Close()
// download to temp file first
f, err = os.CreateTemp("", "BulkParquetFile.parquet")
if err != nil {
return fmt.Errorf("error creating tempfile %v", err)
}
_, err = io.Copy(f, response.Body)
if err != nil {
return fmt.Errorf("error downloading url %v %v", cmd.Path, err)
}
defer os.Remove(f.Name())
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return fmt.Errorf("error reseting file for reading %v ", err)
}
} else {
f, err = os.Open(cmd.Path)
if err != nil {
return err
}
}
pf, err := file.NewParquetReader(f)
if err != nil {
return err
}
mem := memory.NewGoAllocator()
reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, mem)
if err != nil {
return err
}
table, err := reader.ReadTable(ctx)
if err != nil {
return err
}
// print file name
fmt.Printf("\n\nName:%v\n", cmd.Path)
// print schema
schema := table.Schema()
fields := schema.Fields()
for i, field := range fields {
fmt.Printf("%v. Name: %v\n", i, field.Name)
fmt.Printf("%v. Type: %v\n", i, field.Type)
fmt.Printf("%v. Nullable: %v\n\n", i, field.Nullable)
}
bt := pilosa.BasicTableFromArrow(table, mem)
// print num rows
numRows := int(bt.NumRows())
fmt.Printf("Number of rows:%v\n", numRows)
if numRows > 10 {
numRows = 10
}
fmt.Println("Sample:")
// print at most 10 sample rows in table format
for _, field := range fields {
fmt.Printf("%v\t", field.Name)
}
fmt.Println("")
for i := 0; i < numRows; i++ {
for j := 0; j < len(fields); j++ {
fmt.Printf("%v\t", bt.Get(j, i))
}
fmt.Println("")
}
return nil
}