forked from ipfs-cluster/ipfs-cluster
/
adder.go
164 lines (140 loc) · 4.33 KB
/
adder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Package adder implements functionality to add content to IPFS daemons
// managed by the Cluster.
package adder
import (
"context"
"fmt"
"mime/multipart"
"strings"
"github.com/elastos/Elastos.NET.Hive.Cluster/adder/ipfsadd"
"github.com/elastos/Elastos.NET.Hive.Cluster/api"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
merkledag "github.com/ipfs/go-merkledag"
multihash "github.com/multiformats/go-multihash"
)
var logger = logging.Logger("adder")
// ClusterDAGService is an implementation of ipld.DAGService plus a Finalize
// method. ClusterDAGServices can be used to provide Adders with a different
// add implementation.
type ClusterDAGService interface {
ipld.DAGService
// Finalize receives the IPFS content root CID as
// returned by the ipfs adder.
Finalize(ctx context.Context, ipfsRoot cid.Cid) (cid.Cid, error)
}
// Adder is used to add content to IPFS Cluster using an implementation of
// ClusterDAGService.
type Adder struct {
ctx context.Context
cancel context.CancelFunc
dgs ClusterDAGService
params *api.AddParams
// AddedOutput updates are placed on this channel
// whenever a block is processed. They contain information
// about the block, the CID, the Name etc. and are mostly
// meant to be streamed back to the user.
output chan *api.AddedOutput
}
// New returns a new Adder with the given ClusterDAGService, add options and a
// channel to send updates during the adding process.
//
// An Adder may only be used once.
func New(ds ClusterDAGService, p *api.AddParams, out chan *api.AddedOutput) *Adder {
// Discard all progress update output as the caller has not provided
// a channel for them to listen on.
if out == nil {
out = make(chan *api.AddedOutput, 100)
go func() {
for range out {
}
}()
}
return &Adder{
dgs: ds,
params: p,
output: out,
}
}
func (a *Adder) setContext(ctx context.Context) {
if a.ctx == nil { // only allows first context
ctxc, cancel := context.WithCancel(ctx)
a.ctx = ctxc
a.cancel = cancel
}
}
// FromMultipart adds content from a multipart.Reader. The adder will
// no longer be usable after calling this method.
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid, error) {
logger.Debugf("adding from multipart with params: %+v", a.params)
f, err := files.NewFileFromPartReader(r, "multipart/form-data")
if err != nil {
return cid.Undef, err
}
defer f.Close()
return a.FromFiles(ctx, f)
}
// FromFiles adds content from a files.Directory. The adder will no longer
// be usable after calling this method.
func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, error) {
logger.Debugf("adding from files")
a.setContext(ctx)
if a.ctx.Err() != nil { // don't allow running twice
return cid.Undef, a.ctx.Err()
}
defer a.cancel()
defer close(a.output)
ipfsAdder, err := ipfsadd.NewAdder(a.ctx, a.dgs)
if err != nil {
logger.Error(err)
return cid.Undef, err
}
ipfsAdder.Hidden = a.params.Hidden
ipfsAdder.Trickle = a.params.Layout == "trickle"
ipfsAdder.RawLeaves = a.params.RawLeaves
ipfsAdder.Wrap = a.params.Wrap
ipfsAdder.Chunker = a.params.Chunker
ipfsAdder.Out = a.output
ipfsAdder.Progress = a.params.Progress
// Set up prefix
prefix, err := merkledag.PrefixForCidVersion(a.params.CidVersion)
if err != nil {
return cid.Undef, fmt.Errorf("bad CID Version: %s", err)
}
hashFunCode, ok := multihash.Names[strings.ToLower(a.params.HashFun)]
if !ok {
return cid.Undef, fmt.Errorf("unrecognized hash function: %s", a.params.HashFun)
}
prefix.MhType = hashFunCode
prefix.MhLength = -1
ipfsAdder.CidBuilder = &prefix
it := f.Entries()
for it.Next() {
select {
case <-a.ctx.Done():
return cid.Undef, a.ctx.Err()
default:
logger.Debugf("ipfsAdder AddFile(%s)", it.Name())
if ipfsAdder.AddFile(it.Name(), it.Node()); err != nil {
logger.Error("error adding to cluster: ", err)
return cid.Undef, err
}
}
}
if it.Err() != nil {
return cid.Undef, it.Err()
}
adderRoot, err := ipfsAdder.Finalize()
if err != nil {
return cid.Undef, err
}
clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot.Cid())
if err != nil {
logger.Error("error finalizing adder:", err)
return cid.Undef, err
}
logger.Infof("%s successfully added to cluster", clusterRoot)
return clusterRoot, nil
}