-
Notifications
You must be signed in to change notification settings - Fork 3
/
manager.go
103 lines (87 loc) · 2.99 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package ipcs
import (
"context"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/hinshun/ipcs/digestconv"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// Info will return metadata about content available in the content store.
//
// If the content is not present, ErrNotFound will be returned.
func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
c, err := digestconv.DigestToCid(dgst)
if err != nil {
return content.Info{}, errors.Wrapf(err, "failed to convert digest %q to cid", dgst)
}
n, err := s.cln.Unixfs().Get(ctx, path.IpfsPath(c))
if err != nil {
return content.Info{}, errors.Wrapf(err, "failed to get unixfs node %q", c)
}
size, err := n.Size()
if err != nil {
return content.Info{}, errors.Wrapf(err, "failed to get size of %q", c)
}
now := time.Now()
return content.Info{
Digest: dgst,
Size: size,
CreatedAt: now,
UpdatedAt: now,
}, nil
}
// Update updates mutable information related to content.
// If one or more fieldpaths are provided, only those
// fields will be updated.
// Mutable fields:
// labels.*
func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
}
// Walk will call fn for each item in the content store which
// match the provided filters. If no filters are given all
// items will be walked.
func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
// TODO: Filters are also not supported in containerd's local store.
// Since we replace the local store, and filters are implemented in the boltdb
// metadata that wraps the local store, we can wait until upstream supports
// it too.
pins, err := s.cln.Pin().Ls(ctx, options.Pin.Type.All())
if err != nil {
return errors.Wrap(err, "failed to list ipfs pins")
}
for _, pin := range pins {
c := pin.Path().Cid()
dgst, err := digestconv.CidToDigest(c)
if err != nil {
return errors.Wrap(err, "failed to convert digest")
}
info, err := s.Info(ctx, dgst)
if err != nil {
return errors.Wrap(err, "failed to get info")
}
err = fn(info)
if err != nil {
return errors.Wrap(err, "failed to walk info")
}
}
return nil
}
// Delete removes the content from the store.
func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {
c, err := digestconv.DigestToCid(dgst)
if err != nil {
return errors.Wrap(err, "failed to convert digest")
}
// Recursively removing a pin will not remove shared chunks because IPFS has
// its internal refcounting. This will expose the unpinned blobs to IPFS GC.
err = s.cln.Pin().Rm(ctx, path.IpfsPath(c), options.Pin.RmRecursive(true))
if err != nil {
return errors.Wrap(err, "failed to remove pin")
}
return nil
}