forked from GoogleCloudPlatform/gcsfuse
/
dir_handle.go
291 lines (244 loc) · 7.31 KB
/
dir_handle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fs
import (
"fmt"
"sort"
"github.com/googlecloudplatform/gcsfuse/internal/fs/inode"
"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
"github.com/jacobsa/syncutil"
"golang.org/x/net/context"
)
// State required for reading from directories.
type dirHandle struct {
/////////////////////////
// Constant data
/////////////////////////
in inode.DirInode
implicitDirs bool
/////////////////////////
// Mutable state
/////////////////////////
Mu syncutil.InvariantMutex
// All entries in the directory. Populated the first time we need one.
//
// INVARIANT: For each i, entries[i+1].Offset == entries[i].Offset + 1
//
// GUARDED_BY(Mu)
entries []fuseutil.Dirent
// Has entries yet been populated?
//
// INVARIANT: If !entriesValid, then len(entries) == 0
//
// GUARDED_BY(Mu)
entriesValid bool
}
// Create a directory handle that obtains listings from the supplied inode.
func newDirHandle(
in inode.DirInode,
implicitDirs bool) (dh *dirHandle) {
// Set up the basic struct.
dh = &dirHandle{
in: in,
implicitDirs: implicitDirs,
}
// Set up invariant checking.
dh.Mu = syncutil.NewInvariantMutex(dh.checkInvariants)
return
}
////////////////////////////////////////////////////////////////////////
// Helpers
////////////////////////////////////////////////////////////////////////
// Dirents, sorted by name.
type sortedDirents []fuseutil.Dirent
func (p sortedDirents) Len() int { return len(p) }
func (p sortedDirents) Less(i, j int) bool { return p[i].Name < p[j].Name }
func (p sortedDirents) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (dh *dirHandle) checkInvariants() {
// INVARIANT: For each i, entries[i+1].Offset == entries[i].Offset + 1
for i := 0; i < len(dh.entries)-1; i++ {
if !(dh.entries[i+1].Offset == dh.entries[i].Offset+1) {
panic(
fmt.Sprintf(
"Unexpected offset sequence: %v, %v",
dh.entries[i].Offset,
dh.entries[i+1].Offset))
}
}
// INVARIANT: If !entriesValid, then len(entries) == 0
if !dh.entriesValid && len(dh.entries) != 0 {
panic("Unexpected non-empty entries slice")
}
}
// Resolve name conflicts between file objects and directory objects (e.g. the
// objects "foo/bar" and "foo/bar/") by appending U+000A, which is illegal in
// GCS object names, to conflicting file names.
//
// Input must be sorted by name.
func fixConflictingNames(entries []fuseutil.Dirent) (err error) {
// Sanity check.
if !sort.IsSorted(sortedDirents(entries)) {
err = fmt.Errorf("Expected sorted input")
return
}
// Examine each adjacent pair of names.
for i, _ := range entries {
e := &entries[i]
// Find the previous entry.
if i == 0 {
continue
}
prev := &entries[i-1]
// Does the pair have matching names?
if e.Name != prev.Name {
continue
}
// We expect exactly one to be a directory.
eIsDir := e.Type == fuseutil.DT_Directory
prevIsDir := prev.Type == fuseutil.DT_Directory
if eIsDir == prevIsDir {
err = fmt.Errorf(
"Weird dirent type pair for name %q: %v, %v",
e.Name,
e.Type,
prev.Type)
return
}
// Repair whichever is not the directory.
if eIsDir {
prev.Name += inode.ConflictingFileNameSuffix
} else {
e.Name += inode.ConflictingFileNameSuffix
}
}
return
}
// Read all entries for the directory, fix up conflicting names, and fill in
// offset fields.
//
// LOCKS_REQUIRED(in)
func readAllEntries(
ctx context.Context,
in inode.DirInode) (entries []fuseutil.Dirent, err error) {
// Read one batch at a time.
var tok string
for {
// Read a batch.
var batch []fuseutil.Dirent
batch, tok, err = in.ReadEntries(ctx, tok)
if err != nil {
err = fmt.Errorf("ReadEntries: %v", err)
return
}
// Accumulate.
entries = append(entries, batch...)
// Are we done?
if tok == "" {
break
}
}
// Ensure that the entries are sorted, for use in fixConflictingNames
// below.
sort.Sort(sortedDirents(entries))
// Fix name conflicts.
err = fixConflictingNames(entries)
if err != nil {
err = fmt.Errorf("fixConflictingNames: %v", err)
return
}
// Fix up offset fields.
for i := 0; i < len(entries); i++ {
entries[i].Offset = fuseops.DirOffset(i) + 1
}
// Return a bogus inode ID for each entry, but not the root inode ID.
//
// NOTE(jacobsa): As far as I can tell this is harmless. Minting and
// returning a real inode ID is difficult because fuse does not count
// readdir as an operation that increases the inode ID's lookup count and
// we therefore don't get a forget for it later, but we would like to not
// have to remember every inode ID that we've ever minted for readdir.
//
// If it turns out this is not harmless, we'll need to switch to something
// like inode IDs based on (object name, generation) hashes. But then what
// about the birthday problem? And more importantly, what about our
// semantic of not minting a new inode ID when the generation changes due
// to a local action?
for i, _ := range entries {
entries[i].Inode = fuseops.RootInodeID + 1
}
return
}
// LOCKS_REQUIRED(dh.Mu)
// LOCKS_EXCLUDED(dh.in)
func (dh *dirHandle) ensureEntries(ctx context.Context) (err error) {
dh.in.Lock()
defer dh.in.Unlock()
// Read entries.
var entries []fuseutil.Dirent
entries, err = readAllEntries(ctx, dh.in)
if err != nil {
err = fmt.Errorf("readAllEntries: %v", err)
return
}
// Update state.
dh.entries = entries
dh.entriesValid = true
return
}
////////////////////////////////////////////////////////////////////////
// Public interface
////////////////////////////////////////////////////////////////////////
// Handle a request to read from the directory, without responding.
//
// Special case: we assume that a zero offset indicates that rewinddir has been
// called (since fuse gives us no way to intercept and know for sure), and
// start the listing process over again.
//
// LOCKS_REQUIRED(dh.Mu)
// LOCKS_EXCLUDED(du.in)
func (dh *dirHandle) ReadDir(
ctx context.Context,
op *fuseops.ReadDirOp) (err error) {
// If the request is for offset zero, we assume that either this is the first
// call or rewinddir has been called. Reset state.
if op.Offset == 0 {
dh.entries = nil
dh.entriesValid = false
}
// Do we need to read entries from GCS?
if !dh.entriesValid {
err = dh.ensureEntries(ctx)
if err != nil {
return
}
}
// Is the offset past the end of what we have buffered? If so, this must be
// an invalid seekdir according to posix.
index := int(op.Offset)
if index > len(dh.entries) {
err = fuse.EINVAL
return
}
// We copy out entries until we run out of entries or space.
for i := index; i < len(dh.entries); i++ {
n := fuseutil.WriteDirent(op.Dst[op.BytesRead:], dh.entries[i])
if n == 0 {
break
}
op.BytesRead += n
}
return
}