forked from ipfs/kubo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
system.go
283 lines (235 loc) · 5.61 KB
/
system.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
// package mfs implements an in memory model of a mutable IPFS filesystem.
//
// It consists of four main structs:
// 1) The Filesystem
// The filesystem serves as a container and entry point for various mfs filesystems
// 2) Root
// Root represents an individual filesystem mounted within the mfs system as a whole
// 3) Directories
// 4) Files
package mfs
import (
"context"
"errors"
"fmt"
"sync"
"time"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
)
var ErrNotExist = errors.New("no such rootfs")
var log = logging.Logger("mfs")
var ErrIsDirectory = errors.New("error: is a directory")
type childCloser interface {
closeChild(string, node.Node, bool) error
}
type NodeType int
const (
TFile NodeType = iota
TDir
)
// FSNode represents any node (directory, root, or file) in the mfs filesystem
type FSNode interface {
GetNode() (node.Node, error)
Flush() error
Type() NodeType
}
// Root represents the root of a filesystem tree
type Root struct {
// node is the merkledag root
node *dag.ProtoNode
// val represents the node. It can either be a File or a Directory
val FSNode
repub *Republisher
dserv dag.DAGService
Type string
}
type PubFunc func(context.Context, *cid.Cid) error
// newRoot creates a new Root and starts up a republisher routine for it
func NewRoot(parent context.Context, ds dag.DAGService, node *dag.ProtoNode, pf PubFunc) (*Root, error) {
var repub *Republisher
if pf != nil {
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)
repub.setVal(node.Cid())
go repub.Run()
}
root := &Root{
node: node,
repub: repub,
dserv: ds,
}
pbn, err := ft.FromBytes(node.Data())
if err != nil {
log.Error("IPNS pointer was not unixfs node")
return nil, err
}
switch pbn.GetType() {
case ft.TDirectory, ft.THAMTShard:
rval, err := NewDirectory(parent, node.String(), node, root, ds)
if err != nil {
return nil, err
}
root.val = rval
case ft.TFile, ft.TMetadata, ft.TRaw:
fi, err := NewFile(node.String(), node, root, ds)
if err != nil {
return nil, err
}
root.val = fi
default:
return nil, fmt.Errorf("unrecognized unixfs type: %s", pbn.GetType())
}
return root, nil
}
func (kr *Root) GetValue() FSNode {
return kr.val
}
func (kr *Root) Flush() error {
nd, err := kr.GetValue().GetNode()
if err != nil {
return err
}
if kr.repub != nil {
kr.repub.Update(nd.Cid())
}
return nil
}
// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *Root) closeChild(name string, nd node.Node, sync bool) error {
c, err := kr.dserv.Add(nd)
if err != nil {
return err
}
if kr.repub != nil {
kr.repub.Update(c)
}
return nil
}
func (kr *Root) Close() error {
nd, err := kr.GetValue().GetNode()
if err != nil {
return err
}
if kr.repub != nil {
kr.repub.Update(nd.Cid())
return kr.repub.Close()
}
return nil
}
// Republisher manages when to publish a given entry
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
Publish chan struct{}
pubfunc PubFunc
pubnowch chan chan struct{}
ctx context.Context
cancel func()
lk sync.Mutex
val *cid.Cid
lastpub *cid.Cid
}
func (rp *Republisher) getVal() *cid.Cid {
rp.lk.Lock()
defer rp.lk.Unlock()
return rp.val
}
// NewRepublisher creates a new Republisher object to republish the given root
// using the given short and long time intervals
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
Publish: make(chan struct{}, 1),
pubfunc: pf,
pubnowch: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
}
}
func (p *Republisher) setVal(c *cid.Cid) {
p.lk.Lock()
defer p.lk.Unlock()
p.val = c
}
func (p *Republisher) pubNow() {
select {
case p.pubnowch <- nil:
default:
}
}
func (p *Republisher) WaitPub() {
p.lk.Lock()
consistent := p.lastpub == p.val
p.lk.Unlock()
if consistent {
return
}
wait := make(chan struct{})
p.pubnowch <- wait
<-wait
}
func (p *Republisher) Close() error {
err := p.publish(p.ctx)
p.cancel()
return err
}
// Touch signals that an update has occurred since the last publish.
// Multiple consecutive touches may extend the time period before
// the next Publish occurs in order to more efficiently batch updates
func (np *Republisher) Update(c *cid.Cid) {
np.setVal(c)
select {
case np.Publish <- struct{}{}:
default:
}
}
// Run is the main republisher loop
func (np *Republisher) Run() {
for {
select {
case <-np.Publish:
quick := time.After(np.TimeoutShort)
longer := time.After(np.TimeoutLong)
wait:
var pubnowresp chan struct{}
select {
case <-np.ctx.Done():
return
case <-np.Publish:
quick = time.After(np.TimeoutShort)
goto wait
case <-quick:
case <-longer:
case pubnowresp = <-np.pubnowch:
}
err := np.publish(np.ctx)
if pubnowresp != nil {
pubnowresp <- struct{}{}
}
if err != nil {
log.Errorf("republishRoot error: %s", err)
}
case <-np.ctx.Done():
return
}
}
}
func (np *Republisher) publish(ctx context.Context) error {
np.lk.Lock()
topub := np.val
np.lk.Unlock()
err := np.pubfunc(ctx, topub)
if err != nil {
return err
}
np.lk.Lock()
np.lastpub = topub
np.lk.Unlock()
return nil
}