forked from nspcc-dev/neofs-sdk-go
/
object_put.go
267 lines (223 loc) · 7.8 KB
/
object_put.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
package client
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"github.com/TrueCloudLab/frostfs-api-go/v2/acl"
v2object "github.com/TrueCloudLab/frostfs-api-go/v2/object"
rpcapi "github.com/TrueCloudLab/frostfs-api-go/v2/rpc"
"github.com/TrueCloudLab/frostfs-api-go/v2/rpc/client"
v2session "github.com/TrueCloudLab/frostfs-api-go/v2/session"
"github.com/TrueCloudLab/frostfs-api-go/v2/signature"
"github.com/TrueCloudLab/frostfs-sdk-go/bearer"
apistatus "github.com/TrueCloudLab/frostfs-sdk-go/client/status"
"github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/TrueCloudLab/frostfs-sdk-go/session"
)
// PrmObjectPutInit groups parameters of ObjectPutInit operation.
type PrmObjectPutInit struct {
copyNum uint32
key *ecdsa.PrivateKey
meta v2session.RequestMetaHeader
}
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
func (x *PrmObjectPutInit) SetCopiesNumber(copiesNumber uint32) {
x.copyNum = copiesNumber
}
// ResObjectPut groups the final result values of ObjectPutInit operation.
type ResObjectPut struct {
statusRes
obj oid.ID
}
// StoredObjectID returns identifier of the saved object.
func (x ResObjectPut) StoredObjectID() oid.ID {
return x.obj
}
// ObjectWriter is designed to write one object to FrostFS system.
//
// Must be initialized using Client.ObjectPutInit, any other
// usage is unsafe.
type ObjectWriter struct {
cancelCtxStream context.CancelFunc
client *Client
stream interface {
Write(*v2object.PutRequest) error
Close() error
}
key *ecdsa.PrivateKey
res ResObjectPut
err error
chunkCalled bool
respV2 v2object.PutResponse
req v2object.PutRequest
partInit v2object.PutObjectPartInit
partChunk v2object.PutObjectPartChunk
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectPutInit) UseKey(key ecdsa.PrivateKey) {
x.key = &key
}
// WithBearerToken attaches bearer token to be used for the operation.
// Should be called once before any writing steps.
func (x *PrmObjectPutInit) WithBearerToken(t bearer.Token) {
var v2token acl.BearerToken
t.WriteToV2(&v2token)
x.meta.SetBearerToken(&v2token)
}
// WithinSession specifies session within which object should be stored.
// Should be called once before any writing steps.
func (x *PrmObjectPutInit) WithinSession(t session.Object) {
var tv2 v2session.Token
t.WriteToV2(&tv2)
x.meta.SetSessionToken(&tv2)
}
// MarkLocal tells the server to execute the operation locally.
func (x *PrmObjectPutInit) MarkLocal() {
x.meta.SetTTL(1)
}
// WithXHeaders specifies list of extended headers (string key-value pairs)
// to be attached to the request. Must have an even length.
//
// Slice must not be mutated until the operation completes.
func (x *PrmObjectPutInit) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
// WriteHeader writes header of the object. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) WriteHeader(hdr object.Object) bool {
v2Hdr := hdr.ToV2()
x.partInit.SetObjectID(v2Hdr.GetObjectID())
x.partInit.SetHeader(v2Hdr.GetHeader())
x.partInit.SetSignature(v2Hdr.GetSignature())
x.req.GetBody().SetObjectPart(&x.partInit)
x.req.SetVerificationHeader(nil)
x.err = signature.SignServiceMessage(x.key, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
}
x.err = x.stream.Write(&x.req)
return x.err == nil
}
// WritePayloadChunk writes chunk of the object payload. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
if !x.chunkCalled {
x.chunkCalled = true
x.req.GetBody().SetObjectPart(&x.partChunk)
}
for ln := len(chunk); ln > 0; ln = len(chunk) {
// maxChunkLen restricts maximum byte length of the chunk
// transmitted in a single stream message. It depends on
// server settings and other message fields, but for now
// we simply assume that 3MB is large enough to reduce the
// number of messages, and not to exceed the limit
// (4MB by default for gRPC servers).
const maxChunkLen = 3 << 20
if ln > maxChunkLen {
ln = maxChunkLen
}
// we deal with size limit overflow above, but there is another case:
// what if method is called with "small" chunk many times? We write
// a message to the stream on each call. Alternatively, we could use buffering.
// In most cases, the chunk length does not vary between calls. Given this
// assumption, as well as the length of the payload from the header, it is
// possible to buffer the data of intermediate chunks, and send a message when
// the allocated buffer is filled, or when the last chunk is received.
// It is mentally assumed that allocating and filling the buffer is better than
// synchronous sending, but this needs to be tested.
x.partChunk.SetChunk(chunk[:ln])
x.req.SetVerificationHeader(nil)
x.err = signature.SignServiceMessage(x.key, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
}
x.err = x.stream.Write(&x.req)
if x.err != nil {
return false
}
chunk = chunk[ln:]
}
return true
}
// Close ends writing the object and returns the result of the operation
// along with the final results. Must be called after using the ObjectWriter.
//
// Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as Go built-in error.
// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
// codes are returned as error.
//
// Return statuses:
// - global (see Client docs);
// - *apistatus.ContainerNotFound;
// - *apistatus.ObjectAccessDenied;
// - *apistatus.ObjectLocked;
// - *apistatus.LockNonRegularObject;
// - *apistatus.SessionTokenNotFound;
// - *apistatus.SessionTokenExpired.
func (x *ObjectWriter) Close() (*ResObjectPut, error) {
defer x.cancelCtxStream()
// Ignore io.EOF error, because it is expected error for client-side
// stream termination by the server. E.g. when stream contains invalid
// message. Server returns an error in response message (in status).
if x.err != nil && !errors.Is(x.err, io.EOF) {
return nil, x.err
}
if x.err = x.stream.Close(); x.err != nil {
return nil, x.err
}
x.res.st, x.err = x.client.processResponse(&x.respV2)
if x.err != nil {
return nil, x.err
}
if !apistatus.IsSuccessful(x.res.st) {
return &x.res, nil
}
const fieldID = "ID"
idV2 := x.respV2.GetBody().GetObjectID()
if idV2 == nil {
return nil, newErrMissingResponseField(fieldID)
}
x.err = x.res.obj.ReadFromV2(*idV2)
if x.err != nil {
x.err = newErrInvalidResponseField(fieldID, x.err)
}
return &x.res, nil
}
// ObjectPutInit initiates writing an object through a remote server using FrostFS API protocol.
//
// The call only opens the transmission channel, explicit recording is done using the ObjectWriter.
// Exactly one return value is non-nil. Resulting writer must be finally closed.
//
// Returns an error if parameters are set incorrectly.
// Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*ObjectWriter, error) {
// check parameters
if ctx == nil {
return nil, errorMissingContext
}
var w ObjectWriter
ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("open stream: %w", err)
}
w.key = &c.prm.key
if prm.key != nil {
w.key = prm.key
}
w.cancelCtxStream = cancel
w.client = c
w.stream = stream
w.partInit.SetCopiesNumber(prm.copyNum)
w.req.SetBody(new(v2object.PutRequestBody))
c.prepareRequest(&w.req, &prm.meta)
return &w, nil
}