forked from celestiaorg/dagstore
/
repo_fs.go
175 lines (146 loc) · 3.94 KB
/
repo_fs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package index
import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/filecoin-project/dagstore/shard"
carindex "github.com/ipld/go-car/v2/index"
"golang.org/x/xerrors"
)
const (
repoVersion = "1"
indexSuffix = ".full.idx"
)
// FSIndexRepo implements FullIndexRepo using the local file system to store
// the indices
type FSIndexRepo struct {
baseDir string
}
var _ FullIndexRepo = (*FSIndexRepo)(nil)
// NewFSRepo creates a new index repo that stores indices on the local
// filesystem with the given base directory as the root
func NewFSRepo(baseDir string) (*FSIndexRepo, error) {
err := os.MkdirAll(baseDir, os.ModePerm)
if err != nil {
return nil, fmt.Errorf("failed to create index repo dir: %w", err)
}
l := &FSIndexRepo{baseDir: baseDir}
// Get the repo version
bs, err := os.ReadFile(l.versionPath())
if err != nil {
// If the repo has not been initialized, write out the repo version file
if os.IsNotExist(err) {
err = os.WriteFile(l.versionPath(), []byte(repoVersion), 0666)
if err != nil {
return nil, err
}
return l, nil
}
// There was some other error
return nil, err
}
// Check that this library can read this repo
if string(bs) != repoVersion {
return nil, xerrors.Errorf("cannot read existing repo with version %s", bs)
}
return l, nil
}
func (l *FSIndexRepo) GetFullIndex(key shard.Key) (carindex.Index, error) {
path := l.indexPath(key)
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
return carindex.ReadFrom(f)
}
func (l *FSIndexRepo) AddFullIndex(key shard.Key, index carindex.Index) (err error) {
// Create a file at the key path
f, err := os.Create(l.indexPath(key))
if err != nil {
return err
}
defer f.Close()
// Write the index to the file
_, err = carindex.WriteTo(index, f)
return err
}
func (l *FSIndexRepo) DropFullIndex(key shard.Key) (dropped bool, err error) {
// Remove the file at the key path
return true, os.Remove(l.indexPath(key))
}
func (l *FSIndexRepo) StatFullIndex(key shard.Key) (Stat, error) {
// Stat the file at the key path
info, err := os.Stat(l.indexPath(key))
if err != nil {
// Check if the file exists
if os.IsNotExist(err) {
// Should we return ErrNotFound instead of Stat{Exists:false} ?
return Stat{Exists: false}, nil
}
return Stat{}, err
}
return Stat{
Exists: true,
Size: uint64(info.Size()),
}, nil
}
var stopWalk = xerrors.New("stop walk")
// ForEach iterates over each index file to extract the key
func (l *FSIndexRepo) ForEach(f func(shard.Key) (bool, error)) error {
// Iterate over each index file
err := l.eachIndexFile(func(info os.FileInfo) error {
// The file name is derived by base 58 encoding the key
// so decode the file name to get the key
name := info.Name()
name = name[:len(name)-len(indexSuffix)]
k := shard.KeyFromString(name)
// Call the callback with the key
ok, err := f(k)
if err != nil {
return err
}
if !ok {
return stopWalk
}
return nil
})
if err == stopWalk {
return nil
}
return err
}
// Len counts all index files in the base path
func (l *FSIndexRepo) Len() (int, error) {
ret := 0
err := l.eachIndexFile(func(info os.FileInfo) error {
ret++
return nil
})
return ret, err
}
// Size sums the size of all index files in the base path
func (l *FSIndexRepo) Size() (uint64, error) {
var size uint64
err := l.eachIndexFile(func(info os.FileInfo) error {
size += uint64(info.Size())
return nil
})
return size, err
}
// eachIndexFile calls the callback for each index file
func (l *FSIndexRepo) eachIndexFile(f func(info os.FileInfo) error) error {
return filepath.Walk(l.baseDir, func(path string, info os.FileInfo, err error) error {
if strings.HasSuffix(info.Name(), indexSuffix) {
return f(info)
}
return nil
})
}
func (l *FSIndexRepo) indexPath(key shard.Key) string {
return filepath.Join(l.baseDir, key.String()+indexSuffix)
}
func (l *FSIndexRepo) versionPath() string {
return filepath.Join(l.baseDir, ".version")
}