-
Notifications
You must be signed in to change notification settings - Fork 462
/
protocol.go
205 lines (179 loc) · 6 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package exchange
import (
"github.com/ipfs/go-cid"
"time"
"github.com/filecoin-project/venus/pkg/specactors/policy"
"github.com/filecoin-project/venus/pkg/types"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
)
var log = logging.Logger("exchange")
const (
// BlockSyncProtocolID is the protocol ID of the former blocksync protocol.
// Deprecated.
BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
// ChainExchangeProtocolID is the protocol ID of the chain exchange
// protocol.
ChainExchangeProtocolID = "/fil/chain/xchg/0.0.1"
)
// FIXME: Bumped from original 800 to this to accommodate `syncFork()`
// use of `GetBlocks()`. It seems the expectation of that API is to
// fetch any amount of blocks leaving it to the internal logic here
// to partition and reassemble the requests if they go above the maximum.
// (Also as a consequence of this temporarily removing the `const`
// qualifier to avoid "const initializer [...] is not a constant" error.)
var MaxRequestLength = uint64(policy.ChainFinality)
const (
// Extracted constants from the code.
// FIXME: Should be reviewed and confirmed.
SuccessPeerTagValue = 25
WriteReqDeadline = 5 * time.Second
ReadResDeadline = WriteReqDeadline
ReadResMinSpeed = 50 << 10
ShufflePeersPrefix = 16
WriteResDeadline = 60 * time.Second
)
// FIXME: Rename. Make private.
type Request struct {
// List of ordered CIDs comprising a `TipSetKey` from where to start
// fetching backwards.
// FIXME: Consider using `TipSetKey` now (introduced after the creation
// of this protocol) instead of converting back and forth.
Head []cid.Cid
// Number of block sets to fetch from `Head` (inclusive, should always
// be in the range `[1, MaxRequestLength]`).
Length uint64
// Request options, see `Options` type for more details. Compressed
// in a single `uint64` to save space.
Options uint64
}
// `Request` processed and validated to query the tipsets needed.
type validatedRequest struct {
head types.TipSetKey
length uint64
options *parsedOptions
}
// Request options. When fetching the chain segment we can fetch
// either block headers, messages, or both.
const (
Headers = 1 << iota
Messages
)
// Decompressed options into separate struct members for easy access
// during internal processing..
type parsedOptions struct {
IncludeHeaders bool
IncludeMessages bool
}
func (options *parsedOptions) noOptionsSet() bool {
return options.IncludeHeaders == false &&
options.IncludeMessages == false
}
func parseOptions(optfield uint64) *parsedOptions {
return &parsedOptions{
IncludeHeaders: optfield&(uint64(Headers)) != 0,
IncludeMessages: optfield&(uint64(Messages)) != 0,
}
}
// FIXME: Rename. Make private.
type Response struct {
Status status
// String that complements the error status when converting to an
// internal error (see `statusToError()`).
ErrorMessage string
Chain []*BSTipSet
}
type status uint64
const (
Ok status = 0
// We could not fetch all blocks requested (but at least we returned
// the `Head` requested). Not considered an error.
Partial = 101
// Errors
NotFound = 201
GoAway = 202
InternalError = 203
BadRequest = 204
)
// Convert status to internal error.
func (res *Response) statusToError() error {
switch res.Status {
case Ok, Partial:
return nil
// FIXME: Consider if we want to not process `Partial` responses
// and return an error instead.
case NotFound:
return xerrors.Errorf("not found")
case GoAway:
return xerrors.Errorf("not handling 'go away' chainxchg responses yet")
case InternalError:
return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage)
case BadRequest:
return xerrors.Errorf("block sync request invalid: %s", res.ErrorMessage)
default:
return xerrors.Errorf("unrecognized response code: %d", res.Status)
}
}
// FIXME: Rename.
type BSTipSet struct {
// List of blocks belonging to a single tipset to which the
// `CompactedMessages` are linked.
Blocks []*types.BlockHeader
Messages *CompactedMessages
}
// All messages of a single tipset compacted together instead
// of grouped by block to save space, since there are normally
// many repeated messages per tipset in different blocks.
//
// `BlsIncludes`/`SecpkIncludes` matches `Bls`/`Secpk` messages
// to blocks in the tipsets with the format:
// `BlsIncludes[BI][MI]`
// * BI: block index in the tipset.
// * MI: message index in `Bls` list
//
// FIXME: The logic to decompress this structure should belong
// to itself, not to the consumer.
type CompactedMessages struct {
Bls []*types.UnsignedMessage
BlsIncludes [][]uint64
Secpk []*types.SignedMessage
SecpkIncludes [][]uint64
}
// Response that has been validated according to the protocol
// and can be safely accessed.
type validatedResponse struct {
tipsets []*types.TipSet
// List of all messages per tipset (grouped by tipset,
// not by block, hence a single index like `tipsets`).
messages []*CompactedMessages
}
// Decompress messages and form full tipsets with them. The headers
// need to have been requested as well.
func (res *validatedResponse) toFullTipSets() []*types.FullTipSet {
if len(res.tipsets) == 0 || len(res.tipsets) != len(res.messages) {
// This decompression can only be done if both headers and
// messages are returned in the response. (The second check
// is already implied by the guarantees of `validatedResponse`,
// added here just for completeness.)
return nil
}
ftsList := make([]*types.FullTipSet, len(res.tipsets))
for tipsetIdx := range res.tipsets {
fts := &types.FullTipSet{} // FIXME: We should use the `NewFullTipSet` API.
msgs := res.messages[tipsetIdx]
for blockIdx, b := range res.tipsets[tipsetIdx].Blocks() {
fb := &types.FullBlock{
Header: b,
}
for _, mi := range msgs.BlsIncludes[blockIdx] {
fb.BLSMessages = append(fb.BLSMessages, msgs.Bls[mi])
}
for _, mi := range msgs.SecpkIncludes[blockIdx] {
fb.SECPMessages = append(fb.SECPMessages, msgs.Secpk[mi])
}
fts.Blocks = append(fts.Blocks, fb)
}
ftsList[tipsetIdx] = fts
}
return ftsList
}