/
upload_worker.go
171 lines (155 loc) · 5.71 KB
/
upload_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package sdk
import (
"bytes"
"context"
"errors"
"io"
"strings"
thrown "github.com/0chain/errors"
"github.com/0chain/gosdk/constants"
"github.com/0chain/gosdk/core/sys"
"github.com/0chain/gosdk/zboxcore/allocationchange"
"github.com/0chain/gosdk/zboxcore/fileref"
l "github.com/0chain/gosdk/zboxcore/logger"
"github.com/0chain/gosdk/zboxcore/zboxutil"
"github.com/google/uuid"
"go.uber.org/zap"
)
type UploadOperation struct {
refs []*fileref.FileRef
opCode int
chunkedUpload *ChunkedUpload
isUpdate bool
isDownload bool
}
var ErrPauseUpload = errors.New("retry_operation")
func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) {
if uo.isDownload {
if f, ok := uo.chunkedUpload.fileReader.(*sys.MemChanFile); ok {
err := allocObj.DownloadFileToFileHandler(f, uo.chunkedUpload.fileMeta.RemotePath, false, nil, true, WithFileCallback(func() {
f.Close() //nolint:errcheck
}))
if err != nil {
l.Logger.Error("DownloadFileToFileHandler Failed", zap.String("path", uo.chunkedUpload.fileMeta.RemotePath), zap.Error(err))
return nil, uo.chunkedUpload.uploadMask, err
}
}
}
err := uo.chunkedUpload.process()
if err != nil {
l.Logger.Error("UploadOperation Failed", zap.String("name", uo.chunkedUpload.fileMeta.RemoteName), zap.Error(err))
return nil, uo.chunkedUpload.uploadMask, err
}
var pos uint64
numList := len(uo.chunkedUpload.blobbers)
uo.refs = make([]*fileref.FileRef, numList)
for i := uo.chunkedUpload.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) {
pos = uint64(i.TrailingZeros())
uo.refs[pos] = uo.chunkedUpload.blobbers[pos].fileRef
uo.refs[pos].ChunkSize = uo.chunkedUpload.chunkSize
remotePath := uo.chunkedUpload.fileMeta.RemotePath
allocationID := allocObj.ID
if singleClientMode {
lookuphash := fileref.GetReferenceLookup(allocationID, remotePath)
cacheKey := fileref.GetCacheKey(lookuphash, uo.chunkedUpload.blobbers[pos].blobber.ID)
fileref.DeleteFileRef(cacheKey)
}
}
l.Logger.Info("UploadOperation Success", zap.String("name", uo.chunkedUpload.fileMeta.RemoteName))
return nil, uo.chunkedUpload.uploadMask, nil
}
func (uo *UploadOperation) buildChange(_ []fileref.RefEntity, uid uuid.UUID) []allocationchange.AllocationChange {
changes := make([]allocationchange.AllocationChange, len(uo.refs))
for idx, ref := range uo.refs {
if ref == nil {
change := &allocationchange.EmptyFileChange{}
changes[idx] = change
continue
}
if uo.isUpdate {
change := &allocationchange.UpdateFileChange{}
change.NewFile = ref
change.NumBlocks = ref.NumBlocks
change.Operation = constants.FileOperationUpdate
change.Size = ref.Size
changes[idx] = change
continue
}
newChange := &allocationchange.NewFileChange{}
newChange.File = ref
newChange.NumBlocks = ref.NumBlocks
newChange.Operation = constants.FileOperationInsert
newChange.Size = ref.Size
newChange.Uuid = uid
changes[idx] = newChange
}
return changes
}
func (uo *UploadOperation) Verify(allocationObj *Allocation) error {
if allocationObj == nil {
return thrown.Throw(constants.ErrInvalidParameter, "allocationObj")
}
if !uo.isUpdate && !allocationObj.CanUpload() || uo.isUpdate && !allocationObj.CanUpdate() {
return thrown.Throw(constants.ErrFileOptionNotPermitted, "file_option_not_permitted ")
}
err := ValidateRemoteFileName(uo.chunkedUpload.fileMeta.RemoteName)
if err != nil {
return err
}
spaceLeft := allocationObj.Size
if allocationObj.Stats != nil {
spaceLeft -= allocationObj.Stats.UsedSize
}
if uo.isUpdate {
f, err := allocationObj.GetFileMeta(uo.chunkedUpload.fileMeta.RemotePath)
if err != nil {
return err
}
spaceLeft += f.ActualFileSize
}
if uo.chunkedUpload.fileMeta.ActualSize > spaceLeft {
return ErrNoEnoughSpaceLeftInAllocation
}
return nil
}
func (uo *UploadOperation) Completed(allocObj *Allocation) {
if uo.chunkedUpload.progressStorer != nil {
uo.chunkedUpload.removeProgress()
}
cancelLock.Lock()
delete(CancelOpCtx, uo.chunkedUpload.fileMeta.RemotePath)
cancelLock.Unlock()
if uo.chunkedUpload.statusCallback != nil {
uo.chunkedUpload.statusCallback.Completed(allocObj.ID, uo.chunkedUpload.fileMeta.RemotePath, uo.chunkedUpload.fileMeta.RemoteName, uo.chunkedUpload.fileMeta.MimeType, int(uo.chunkedUpload.fileMeta.ActualSize), uo.opCode)
}
}
func (uo *UploadOperation) Error(allocObj *Allocation, consensus int, err error) {
if uo.chunkedUpload.progressStorer != nil && !strings.Contains(err.Error(), "context") && !errors.Is(err, ErrPauseUpload) {
uo.chunkedUpload.removeProgress()
}
cancelLock.Lock()
delete(CancelOpCtx, uo.chunkedUpload.fileMeta.RemotePath)
cancelLock.Unlock()
if uo.chunkedUpload.statusCallback != nil {
uo.chunkedUpload.statusCallback.Error(allocObj.ID, uo.chunkedUpload.fileMeta.RemotePath, uo.opCode, err)
}
}
func NewUploadOperation(ctx context.Context, workdir string, allocObj *Allocation, connectionID string, fileMeta FileMeta, fileReader io.Reader, isUpdate, isWebstreaming, isRepair, isMemoryDownload, isStreamUpload bool, opts ...ChunkedUploadOption) (*UploadOperation, string, error) {
uo := &UploadOperation{}
if fileMeta.ActualSize == 0 && !isStreamUpload {
byteReader := bytes.NewReader([]byte(
emptyFileDataHash))
fileReader = byteReader
opts = append(opts, WithActualHash(emptyFileDataHash))
fileMeta.ActualSize = int64(len(emptyFileDataHash))
}
cu, err := CreateChunkedUpload(ctx, workdir, allocObj, fileMeta, fileReader, isUpdate, isRepair, isWebstreaming, connectionID, opts...)
if err != nil {
return nil, "", err
}
uo.chunkedUpload = cu
uo.opCode = cu.opCode
uo.isUpdate = isUpdate
uo.isDownload = isMemoryDownload
return uo, cu.progress.ConnectionID, nil
}