/
uploader_parallel.go
121 lines (115 loc) · 3.72 KB
/
uploader_parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package transfer
import (
"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/core"
"github.com/0glabs/0g-storage-client/core/merkle"
"github.com/0glabs/0g-storage-client/node"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type SegmentUploader struct {
data core.IterableData
tree *merkle.Tree
clients []*node.ZeroGStorageClient
offset int64
disperse bool
taskSize uint
numTasks int
}
var _ parallel.Interface = (*SegmentUploader)(nil)
// ParallelCollect implements parallel.Interface.
func (uploader *SegmentUploader) ParallelCollect(result *parallel.Result) error {
return nil
}
// ParallelDo implements parallel.Interface.
func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, error) {
offset := uploader.offset + int64(task)*core.DefaultSegmentSize
numChunks := uploader.data.NumChunks()
numSegments := uploader.data.NumSegments()
segIndex := uint64(offset / core.DefaultSegmentSize)
startSegIndex := segIndex
segments := make([]node.SegmentWithProof, 0)
for i := 0; i < int(uploader.taskSize); i++ {
// check segment index
startIndex := segIndex * core.DefaultSegmentMaxChunks
allDataUploaded := false
if startIndex >= numChunks {
// file real data already uploaded
break
}
// get segment
segment, err := core.ReadAt(uploader.data, core.DefaultSegmentSize, offset, uploader.data.PaddedSize())
if err != nil {
return nil, err
}
if startIndex+uint64(len(segment))/core.DefaultChunkSize >= numChunks {
// last segment has real data
expectedLen := core.DefaultChunkSize * int(numChunks-startIndex)
segment = segment[:expectedLen]
allDataUploaded = true
}
// fill proof
proof := uploader.tree.ProofAt(int(segIndex))
segWithProof := node.SegmentWithProof{
Root: uploader.tree.Root(),
Data: segment,
Index: segIndex,
Proof: proof,
FileSize: uint64(uploader.data.Size()),
}
segments = append(segments, segWithProof)
if allDataUploaded {
break
}
segIndex += uint64(uploader.numTasks)
offset += core.DefaultSegmentSize * int64(uploader.numTasks)
}
// upload
if !uploader.disperse {
if _, err := uploader.clients[0].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) {
return nil, errors.WithMessage(err, "Failed to upload segment")
}
} else {
clientIndex := task % (len(uploader.clients))
ok := false
// retry
for i := 0; i < len(uploader.clients); i++ {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"clientIndex": clientIndex,
}).Debug("Uploading segment to node..")
if _, err := uploader.clients[clientIndex].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"clientIndex": clientIndex,
"error": err,
}).Warn("Failed to upload segment to node, try next node..")
clientIndex = (clientIndex + 1) % (len(uploader.clients))
} else {
ok = true
break
}
}
if !ok {
if _, err := uploader.clients[clientIndex].UploadSegments(segments); err != nil {
return nil, errors.WithMessage(err, "Failed to upload segment")
}
}
}
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"root": core.SegmentRoot(segments[0].Data),
}).Debug("Segments uploaded")
}
return nil, nil
}