forked from xitongsys/parquet-go-source
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gocloud.go
144 lines (114 loc) · 2.97 KB
/
gocloud.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package gocloud
import (
"context"
"io"
"github.com/pkg/errors"
"github.com/xitongsys/parquet-go/source"
"gocloud.dev/blob"
)
type blobFile struct {
ctx context.Context
bucket *blob.Bucket
writer *blob.Writer
key string
size int64
offset int64
}
func NewBlobWriter(ctx context.Context, b *blob.Bucket, name string) (source.ParquetFile, error) {
bf := &blobFile{
ctx: ctx,
bucket: b,
}
return bf.Create(name)
}
func NewBlobReader(ctx context.Context, b *blob.Bucket, name string) (source.ParquetFile, error) {
bf := &blobFile{
ctx: ctx,
bucket: b,
}
return bf.Open(name)
}
func (b *blobFile) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
case io.SeekCurrent:
offset += b.offset
case io.SeekEnd:
offset = b.size + offset
default:
return 0, errors.Errorf("Invalid whence. whence=%d", whence)
}
if offset < 0 {
return 0, errors.Errorf("Invalid offset. offset=%d", offset)
}
b.offset = offset
return b.offset, nil
}
func (b *blobFile) Read(p []byte) (n int, err error) {
r, err := b.bucket.NewRangeReader(b.ctx, b.key, b.offset, int64(len(p)), nil)
if err != nil {
return 0, errors.Wrapf(err, "Failed to open reader. key=%s, offset=%d, len=%d", b.key, b.offset, len(p))
}
defer r.Close()
n, err = r.Read(p)
b.offset += int64(n)
return n, err
}
// Note that for blob storage, calling write on an existing blob overwrites that blob as opposed to appending to it.
// Additionally Write is not guaranteed to have succeeded unless Close() also succeeds
func (b *blobFile) Write(p []byte) (n int, err error) {
if b.writer == nil {
if b.key == "" {
return 0, errors.New("Invalid call to write, you must create or open a ParquetFile for writing")
}
if w, err := b.bucket.NewWriter(b.ctx, b.key, nil); err != nil {
return 0, errors.Wrapf(err, "Could not create blob writer. key=%s", b.key)
} else {
b.writer = w
}
}
n, err = b.writer.Write(p)
b.size += int64(n)
return n, err
}
func (b *blobFile) Close() error {
if b.writer != nil {
return b.writer.Close()
}
return nil
}
func (b *blobFile) Create(name string) (source.ParquetFile, error) {
if name == "" {
return nil, errors.New("Parquet File name cannot be empty")
}
bf := &blobFile{
ctx: b.ctx,
bucket: b.bucket,
}
w, err := bf.bucket.NewWriter(bf.ctx, name, nil)
if err != nil {
return nil, errors.Wrapf(err, "Could not create blob writer. blob=%s", name)
}
bf.key = name
bf.writer = w
return bf, nil
}
func (b *blobFile) Open(name string) (source.ParquetFile, error) {
bf := &blobFile{
ctx: b.ctx,
bucket: b.bucket,
}
if name == "" {
name = b.key
}
if e, err := bf.bucket.Exists(bf.ctx, name); !e || err != nil {
return nil, errors.Errorf("Requested blob does not exist. blob=%s", name)
}
bf.key = name
attrs, err := bf.bucket.Attributes(bf.ctx, bf.key)
if err != nil {
return nil, errors.Wrapf(err, "Could not get attributes for blob. blob=%s", name)
}
bf.size = attrs.Size
return bf, nil
}