forked from minio/minio
-
Notifications
You must be signed in to change notification settings - Fork 14
/
ipfs.go
183 lines (170 loc) · 4.48 KB
/
ipfs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package s3x
import (
"context"
"fmt"
"io"
pb "github.com/RTradeLtd/TxPB/v3/go"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-merkledag"
"github.com/pkg/errors"
)
type unmarshaller interface {
Unmarshal(data []byte) error
}
type marshaller interface {
Marshal() ([]byte, error)
}
// ipfsBytes returns data from IPFS using its hash
func ipfsBytes(ctx context.Context, dag pb.NodeAPIClient, h string) ([]byte, error) {
resp, err := dag.Dag(ctx, &pb.DagRequest{
RequestType: pb.DAGREQTYPE_DAG_GET,
Hash: h,
})
return resp.GetRawData(), err
}
// ipfsUnmarshal unmarshalls any data structure from IPFS using its hash
func ipfsUnmarshal(ctx context.Context, dag pb.NodeAPIClient, h string, u unmarshaller) error {
data, err := ipfsBytes(ctx, dag, h)
if err != nil {
return err
}
return u.Unmarshal(data)
}
// ipfsObject returns an object from IPFS using its hash
func ipfsObject(ctx context.Context, dag pb.NodeAPIClient, h string) (*Object, error) {
obj := &Object{}
if err := ipfsUnmarshal(ctx, dag, h, obj); err != nil {
return nil, err
}
return obj, nil
}
// ipfsBucket returns a bucket from IPFS using its hash
func ipfsBucket(ctx context.Context, dag pb.NodeAPIClient, h string) (*Bucket, error) {
b := &Bucket{}
if err := ipfsUnmarshal(ctx, dag, h, b); err != nil {
return nil, err
}
return b, nil
}
// ipfsSave saves any marshaller object and returns it's IPFS hash
func ipfsSave(ctx context.Context, dag pb.NodeAPIClient, m marshaller) (string, error) {
data, err := m.Marshal()
if err != nil {
return "", err
}
return ipfsSaveBytes(ctx, dag, data)
}
// ipfsSaveBytes saves data and returns it's IPFS hash
func ipfsSaveBytes(ctx context.Context, dag pb.NodeAPIClient, data []byte) (string, error) {
resp, err := dag.Dag(ctx, &pb.DagRequest{
RequestType: pb.DAGREQTYPE_DAG_PUT,
Data: data,
})
if err != nil {
return "", errors.Wrap(err, "dag client error in ipfsSaveBytes")
}
return resp.GetHashes()[0], nil
}
func ipfsSaveProtoNode(ctx context.Context, dag pb.NodeAPIClient, node *merkledag.ProtoNode) (string, error) {
data, err := node.Marshal()
if err != nil {
return "", err
}
resp, err := dag.Dag(ctx, &pb.DagRequest{
RequestType: pb.DAGREQTYPE_DAG_PUT,
Data: data,
ObjectEncoding: "protobuf",
SerializationFormat: "protobuf",
})
if err != nil {
return "", errors.Wrap(err, "dag client error in ipfsSaveProtoNode")
}
if len(resp.GetHashes()) != 1 {
return "", errors.New("unexpected number of hashes returned")
}
return resp.GetHashes()[0], nil
}
const chunkSize = 4*1024*1024 - 1024 //1KB less than 4MB for a good safety buffer
func ipfsFileUpload(ctx context.Context, fileClient pb.FileAPIClient, r io.Reader) (string, int, error) {
stream, err := fileClient.UploadFile(ctx)
if err != nil {
return "", 0, err
}
var (
buf = make([]byte, chunkSize)
size int
)
for {
n, err := r.Read(buf)
if err == io.EOF {
if n == 0 {
break
}
} else if err != nil {
_ = stream.CloseSend()
return "", size, err
}
size = size + n
if err := stream.Send(&pb.UploadRequest{
Blob: &pb.Blob{Content: buf[:n]},
}); err != nil {
return "", size, err
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return "", size, err
}
if _, err := cid.Decode(resp.Hash); err != nil {
return "", size, fmt.Errorf("resp.Hash mush be a valid cid, but got error: %v", err)
}
return resp.Hash, size, nil
}
func ipfsFileDownload(ctx context.Context, fileClient pb.FileAPIClient, w io.Writer, hash string, startOffset, length int64) (int64, error) {
isSubSet := startOffset != 0 || length != 0
//TODO: put startOffset and length in DownloadRequest to improve performance
stream, err := fileClient.DownloadFile(ctx, &pb.DownloadRequest{
Hash: hash,
ChunkSize: chunkSize, //TODO: determine an optimal size
})
var n int64
if err != nil {
return n, err
}
for {
recv, err := stream.Recv()
if err != nil {
if err == io.EOF {
return n, nil
}
return n, err
}
data := recv.GetBlob().GetContent()
if isSubSet {
if int64(len(data)) < startOffset {
startOffset -= int64(len(data))
continue
}
if startOffset > 0 {
data = data[startOffset:]
startOffset = 0
}
if int64(len(data)) > length {
data = data[:length]
length = 0
} else {
length -= int64(len(data))
}
}
m, err := w.Write(data)
n += int64(m)
if err != nil {
_ = stream.CloseSend()
return n, err
}
if isSubSet && length == 0 {
_ = stream.CloseSend()
return n, nil
}
}
}