-
Notifications
You must be signed in to change notification settings - Fork 14
/
io.go
248 lines (216 loc) · 6.99 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io
import (
"errors"
"fmt"
"io"
"io/fs"
"net/url"
"strings"
)
// IO is an interface to a hierarchical file system.
//
// The IO interface is the minimum implementation required for a file
// system to utilize an iceberg table. A file system may implement
// additional interfaces, such as ReadFileIO, to provide additional or
// optimized functionality.
type IO interface {
// Open opens the named file.
//
// When Open returns an error, it should be of type *PathError
// with the Op field set to "open", the Path field set to name,
// and the Err field describing the problem.
//
// Open should reject attempts to open names that do not satisfy
// fs.ValidPath(name), returning a *PathError with Err set to
// ErrInvalid or ErrNotExist.
Open(name string) (File, error)
// Remove removes the named file or (empty) directory.
//
// If there is an error, it will be of type *PathError.
Remove(name string) error
}
// ReadFileIO is the interface implemented by a file system that
// provides an optimized implementation of ReadFile.
type ReadFileIO interface {
IO
// ReadFile reads the named file and returns its contents.
// A successful call returns a nil error, not io.EOF.
// (Because ReadFile reads the whole file, the expected EOF
// from the final Read is not treated as an error to be reported.)
//
// The caller is permitted to modify the returned byte slice.
// This method should return a copy of the underlying data.
ReadFile(name string) ([]byte, error)
}
// A File provides access to a single file. The File interface is the
// minimum implementation required for Iceberg to interact with a file.
// Directory files should also implement
type File interface {
fs.File
io.ReadSeekCloser
io.ReaderAt
}
// A ReadDirFile is a directory file whose entries can be read with the
// ReadDir method. Every directory file should implement this interface.
// (It is permissible for any file to implement this interface, but
// if so ReadDir should return an error for non-directories.)
type ReadDirFile interface {
File
// ReadDir read the contents of the directory and returns a slice
// of up to n DirEntry values in directory order. Subsequent calls
// on the same file will yield further DirEntry values.
//
// If n > 0, ReadDir returns at most n DirEntry structures. In this
// case, if ReadDir returns an empty slice, it will return a non-nil
// error explaining why.
//
// At the end of a directory, the error is io.EOF. (ReadDir must return
// io.EOF itself, not an error wrapping io.EOF.)
//
// If n <= 0, ReadDir returns all the DirEntry values from the directory
// in a single slice. In this case, if ReadDir succeeds (reads all the way
// to the end of the directory), it returns the slice and a nil error.
// If it encounters an error before the end of the directory, ReadDir
// returns the DirEntry list read until that point and a non-nil error.
ReadDir(n int) ([]fs.DirEntry, error)
}
// FS wraps an io/fs.FS as an IO interface.
func FS(fsys fs.FS) IO {
if _, ok := fsys.(fs.ReadFileFS); ok {
return readFileFS{ioFS{fsys, nil}}
}
return ioFS{fsys, nil}
}
// FSPreProcName wraps an io/fs.FS like FS, only if fn is non-nil then
// it is called to preprocess any filenames before they are passed to
// the underlying fsys.
func FSPreProcName(fsys fs.FS, fn func(string) string) IO {
if _, ok := fsys.(fs.ReadFileFS); ok {
return readFileFS{ioFS{fsys, fn}}
}
return ioFS{fsys, fn}
}
type readFileFS struct {
ioFS
}
func (r readFileFS) ReadFile(name string) ([]byte, error) {
if r.preProcessName != nil {
name = r.preProcessName(name)
}
rfs, ok := r.fsys.(fs.ReadFileFS)
if !ok {
return nil, errMissingReadFile
}
return rfs.ReadFile(name)
}
type ioFS struct {
fsys fs.FS
preProcessName func(string) string
}
func (f ioFS) Open(name string) (File, error) {
if f.preProcessName != nil {
name = f.preProcessName(name)
}
if name == "/" {
name = "."
} else {
name = strings.TrimPrefix(name, "/")
}
file, err := f.fsys.Open(name)
if err != nil {
return nil, err
}
return ioFile{file}, nil
}
func (f ioFS) Remove(name string) error {
r, ok := f.fsys.(interface{ Remove(name string) error })
if !ok {
return errMissingRemove
}
return r.Remove(name)
}
var (
errMissingReadDir = errors.New("fs.File directory missing ReadDir method")
errMissingSeek = errors.New("fs.File missing Seek method")
errMissingReadAt = errors.New("fs.File missing ReadAt")
errMissingRemove = errors.New("fs.FS missing Remove method")
errMissingReadFile = errors.New("fs.FS missing ReadFile method")
)
type ioFile struct {
file fs.File
}
func (f ioFile) Close() error { return f.file.Close() }
func (f ioFile) Read(b []byte) (int, error) { return f.file.Read(b) }
func (f ioFile) Stat() (fs.FileInfo, error) { return f.file.Stat() }
func (f ioFile) Seek(offset int64, whence int) (int64, error) {
s, ok := f.file.(io.Seeker)
if !ok {
return 0, errMissingSeek
}
return s.Seek(offset, whence)
}
func (f ioFile) ReadAt(p []byte, off int64) (n int, err error) {
r, ok := f.file.(io.ReaderAt)
if !ok {
return 0, errMissingReadAt
}
return r.ReadAt(p, off)
}
func (f ioFile) ReadDir(count int) ([]fs.DirEntry, error) {
d, ok := f.file.(fs.ReadDirFile)
if !ok {
return nil, errMissingReadDir
}
return d.ReadDir(count)
}
func inferFileIOFromSchema(path string, props map[string]string) (IO, error) {
parsed, err := url.Parse(path)
if err != nil {
return nil, err
}
switch parsed.Scheme {
case "s3", "s3a", "s3n":
return createS3FileIO(parsed, props)
case "file", "":
return LocalFS{}, nil
default:
return nil, fmt.Errorf("IO for file '%s' not implemented", path)
}
}
// LoadFS takes a map of properties and an optional URI location
// and attempts to infer an IO object from it.
//
// A schema of "file://" or an empty string will result in a LocalFS
// implementation. Otherwise this will return an error if the schema
// does not yet have an implementation here.
//
// Currently only LocalFS and S3 are implemented.
func LoadFS(props map[string]string, location string) (IO, error) {
if location == "" {
location = props["warehouse"]
}
iofs, err := inferFileIOFromSchema(location, props)
if err != nil {
return nil, err
}
if iofs == nil {
iofs = LocalFS{}
}
return iofs, nil
}