-
Notifications
You must be signed in to change notification settings - Fork 5
/
data_recovery.go
167 lines (155 loc) · 4.43 KB
/
data_recovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package dagnode
import (
"bytes"
"context"
"encoding/binary"
"errors"
"github.com/filedag-project/filedag-storage/dag/proto"
"github.com/filedag-project/filedag-storage/dag/utils/paralleltask"
"github.com/ipfs/go-cid"
"google.golang.org/protobuf/types/known/emptypb"
"io"
)
// RepairDataNode prepare node repair
func (d *DagNode) RepairDataNode(ctx context.Context, fromNodeIndex int, repairNodeIndex int) error {
if fromNodeIndex >= len(d.Nodes) {
return errors.New("index greater than max index of nodes")
}
if repairNodeIndex >= len(d.Nodes) {
return errors.New("repair index greater than max index of nodes")
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := d.Nodes[fromNodeIndex].Client.DataClient.AllKeysChan(ctx, &emptypb.Empty{})
if err != nil {
return err
}
repairNode := d.Nodes[repairNodeIndex]
for {
resp, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
key := resp.Key
if _, err := repairNode.Client.DataClient.GetMeta(ctx, &proto.GetMetaRequest{Key: key}); err == nil {
continue
}
dataCid, err := cid.Decode(key)
if err != nil {
log.Errorw("decode cid error", "key", key, "error", err)
continue
}
size, err := d.GetSize(ctx, dataCid)
if err != nil {
log.Errorw("get block size error", "key", key, "error", err)
continue
}
shards := make([][]byte, len(d.Nodes))
entryReadQuorum, _ := d.entryQuorum()
task := paralleltask.NewParallelTask(ctx, entryReadQuorum, len(d.Nodes)-entryReadQuorum+1, true)
for i, snode := range d.Nodes {
index := i
tnode := snode
task.Goroutine(func(ctx context.Context) error {
if index == repairNodeIndex {
return errors.New("there is no data in this node")
}
res, err := tnode.Client.DataClient.Get(ctx, &proto.GetRequest{Key: key})
if err != nil {
log.Errorf("this node[%s] get key err: %v", tnode.RpcAddress, err)
return err
}
if len(res.Data) == 0 {
err = errors.New("there is no data in this node")
return err
}
shards[index] = res.Data
return nil
})
}
if err = task.Wait(); err != nil {
log.Errorw("task error, missing shards", "key", key, "error", err)
continue
}
enc, err := NewErasure(d.config.DataBlocks, d.config.ParityBlocks, int64(size))
if err != nil {
log.Errorf("new erasure fail :%v", err)
return err
}
err = enc.DecodeDataAndParityBlocks(shards)
if err != nil {
log.Errorf("decode data blocks failed: %v", err)
return err
}
meta := Meta{
BlockSize: int32(size),
}
var metaBuf bytes.Buffer
if err = binary.Write(&metaBuf, binary.LittleEndian, meta); err != nil {
log.Errorf("binary.Write failed: %v", err)
continue
}
if _, err = repairNode.Client.DataClient.Put(ctx, &proto.AddRequest{
Key: key,
Meta: metaBuf.Bytes(),
Data: shards[repairNodeIndex],
}); err != nil {
log.Errorf("data node put failed: %v", err)
return err
}
log.Infow("repair entry success", "key", key)
}
}
// repairBlock repairs shards of one erasure set
func (d *DagNode) repairBlock(ctx context.Context, key string, blockSize int32, shards [][]byte, repairIndexes []int) error {
for _, repairNodeIndex := range repairIndexes {
if repairNodeIndex >= len(d.Nodes) {
return errors.New("repair index greater than max index of nodes")
}
}
entryReadQuorum, _ := d.entryQuorum()
availableShards := 0
for _, shard := range shards {
if shard != nil {
availableShards++
}
}
if availableShards < entryReadQuorum {
return errors.New("repair index greater than max index of nodes")
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
enc, err := NewErasure(d.config.DataBlocks, d.config.ParityBlocks, int64(blockSize))
if err != nil {
log.Errorf("new erasure fail :%v", err)
return err
}
err = enc.DecodeDataAndParityBlocks(shards)
if err != nil {
log.Errorf("decode data blocks failed: %v", err)
return err
}
meta := Meta{
BlockSize: blockSize,
}
for _, index := range repairIndexes {
var metaBuf bytes.Buffer
if err = binary.Write(&metaBuf, binary.LittleEndian, meta); err != nil {
log.Errorf("binary.Write failed: %v", err)
continue
}
if _, err = d.Nodes[index].Client.DataClient.Put(ctx, &proto.AddRequest{
Key: key,
Meta: metaBuf.Bytes(),
Data: shards[index],
}); err != nil {
log.Errorf("data node put failed: %v", err)
return err
}
log.Infow("repair block shard success", "key", key, "shardIndex", index)
}
return nil
}