forked from NVIDIA/aistore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
intrareq.go
130 lines (120 loc) · 3.21 KB
/
intrareq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Package ec provides erasure coding (EC) based data protection for AIStore.
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
*/
package ec
import (
"github.com/artashesbalabekyan/aistore/cluster/meta"
"github.com/artashesbalabekyan/aistore/cmn/cos"
"github.com/artashesbalabekyan/aistore/memsys"
)
const (
// a target sends a replica or slice to store on another target
// the destionation does not have to respond
reqPut intraReqType = iota
// response for requested slice/replica by another target
respPut
// a target requests a slice or replica from another target
// if the destination has the object/slice it sends it back, otherwise
// it sets Exists=false in response header
reqGet
// a target cleans up the object and notifies all other targets to do
// cleanup as well. Destinations do not have to respond
reqDel
)
type (
// type of EC request between targets. If the destination has to respond it
// must set the same request type in response header
intraReqType = int
// An EC request sent via transport using Opaque field of transport.ObjHdr
// between targets inside a cluster
intraReq struct {
// Object metadata, used when a target copies replicas/slices after
// encoding or restoring the object data
meta *Metadata
// Used only by destination to answer to the sender if the destination
// has the requested metafile or replica/slice
exists bool
// The sent data is slice or full replica
isSlice bool
// bucket ID
bid uint64
}
)
// interface guard
var (
_ cos.Unpacker = (*intraReq)(nil)
_ cos.Packer = (*intraReq)(nil)
)
// Create a request header: initializes the `Sender` field with local target's
// daemon ID, and sets `Exists:true` that means "local object exists".
// Later `Exists` can be changed to `false` if local file is unreadable or does
// not exist
func newIntraReq(act intraReqType, meta *Metadata, bck *meta.Bck) *intraReq {
req := &intraReq{
meta: meta,
exists: true,
}
if bck != nil && bck.Props != nil {
req.bid = bck.Props.BID
}
if act == reqGet && meta != nil {
req.isSlice = !meta.IsCopy
}
return req
}
func (r *intraReq) PackedSize() int {
if r.meta == nil {
// int8+int8+ptr_marker
return 3 + cos.SizeofI64
}
// int8+int8+ptr_marker+sizeof(meta)
return r.meta.PackedSize() + 3 + cos.SizeofI64
}
func (r *intraReq) Pack(packer *cos.BytePack) {
packer.WriteBool(r.exists)
packer.WriteBool(r.isSlice)
packer.WriteUint64(r.bid)
if r.meta == nil {
packer.WriteByte(0)
} else {
packer.WriteByte(1)
packer.WriteAny(r.meta)
}
}
func (r *intraReq) Unpack(unpacker *cos.ByteUnpack) error {
var (
i byte
err error
)
if r.exists, err = unpacker.ReadBool(); err != nil {
return err
}
if r.isSlice, err = unpacker.ReadBool(); err != nil {
return err
}
if r.bid, err = unpacker.ReadUint64(); err != nil {
return err
}
if i, err = unpacker.ReadByte(); err != nil {
return err
}
if i == 0 {
r.meta = nil
return nil
}
r.meta = NewMetadata()
return unpacker.ReadAny(r.meta)
}
func (r *intraReq) NewPack(mm *memsys.MMSA) []byte {
var (
buf []byte
l = r.PackedSize()
)
if mm != nil {
buf, _ = mm.AllocSize(int64(l))
}
packer := cos.NewPacker(buf, l)
packer.WriteAny(r)
return packer.Bytes()
}