forked from nspcc-dev/neofs-node
/
evacuate.go
107 lines (90 loc) · 2.7 KB
/
evacuate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package control
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"github.com/TrueCloudLab/frostfs-node/pkg/services/control"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"github.com/TrueCloudLab/frostfs-node/pkg/services/replicator"
"github.com/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequest) (*control.EvacuateShardResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
var prm engine.EvacuateShardPrm
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)
res, err := s.s.Evacuate(prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.EvacuateShardResponse{
Body: &control.EvacuateShardResponse_Body{
Count: uint32(res.Count()),
},
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
func (s *Server) replicate(addr oid.Address, obj *objectSDK.Object) error {
cid, ok := obj.ContainerID()
if !ok {
// Return nil to prevent situations where a shard can't be evacuated
// because of a single bad/corrupted object.
return nil
}
nm, err := s.netMapSrc.GetNetMap(0)
if err != nil {
return err
}
c, err := s.cnrSrc.Get(cid)
if err != nil {
return err
}
binCnr := make([]byte, sha256.Size)
cid.Encode(binCnr)
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
if err != nil {
return fmt.Errorf("can't build a list of container nodes")
}
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := 0; i < len(nodes); i++ {
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]
}
}
var res replicatorResult
var task replicator.Task
task.SetObject(obj)
task.SetObjectAddress(addr)
task.SetCopiesNumber(1)
task.SetNodes(nodes)
s.replicator.HandleTask(context.TODO(), task, &res)
if res.count == 0 {
return errors.New("object was not replicated")
}
return nil
}
type replicatorResult struct {
count int
}
// SubmitSuccessfulReplication implements the replicator.TaskResult interface.
func (r *replicatorResult) SubmitSuccessfulReplication(_ netmap.NodeInfo) {
r.count++
}