-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
75 lines (59 loc) · 1.58 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package reader
import (
"context"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/xitongsys/parquet-go-source/s3v2"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/source"
)
type S3ParquetReader[T any] struct {
s3Client *s3.Client
s3Reader *source.ParquetFile
diskReader *reader.ParquetReader
chunkSize int
}
func (pr *S3ParquetReader[T]) GetBatchSize() int {
return pr.chunkSize
}
func (pr *S3ParquetReader[T]) GetNumRows() int {
return int(pr.diskReader.GetNumRows())
}
func (pr *S3ParquetReader[T]) GetNumChunks() int {
return int(pr.GetNumRows()/pr.chunkSize) + 1
}
func (pr *S3ParquetReader[T]) GetNextChunk() (*[]T, error) {
outputRows := make([]T, pr.chunkSize)
if err := pr.diskReader.Read(&outputRows); err != nil {
return nil, err
}
return &outputRows, nil
}
func (pr *S3ParquetReader[T]) Close() error {
s3Reader := *pr.s3Reader
errS3 := s3Reader.Close()
diskReader := *pr.diskReader
diskReader.ReadStop()
return errS3
}
func New[T any](
s3Client *s3.Client, bucket string, key string, batchSize int, numProcs int64,
) (*S3ParquetReader[T], error) {
s3Reader, err := s3v2.NewS3FileReaderWithClient(
context.Background(), s3Client, bucket, key,
)
if err != nil {
return nil, err
}
diskReader, err := reader.NewParquetReader(s3Reader, new(T), numProcs)
if err != nil {
// TODO: close s3Reader, but figure out what to
// do with err from that if one is raised
return nil, err
}
return &S3ParquetReader[T]{
s3Client: s3Client,
s3Reader: &s3Reader,
diskReader: diskReader,
chunkSize: batchSize,
}, nil
}