Conversation
c5c29aa to
d08b30b
Compare
replication.go
Outdated
| } | ||
|
|
||
| // we want to collect | ||
| for seq := start; seq <= end; seq++ { |
There was a problem hiding this comment.
I think it's best to allocate the blocks to retrieve in the following manner:
for n blocks and k nodes to pick from, we ask n/k+1 successive blocks from each node, so that it will be efficient for a node to retrieve blocks in proximity in its storage.
replication.go
Outdated
| return response | ||
| } | ||
|
|
||
| func (e *Epoch) handleFinalizationCertificateRequest(req *FinalizationCertificateRequest) *FinalizationCertificateResponse { |
There was a problem hiding this comment.
I think we should first find this in the memory before going to storage. We have a method locateBlock(seq uint64, digest []byte) which does just that only it expects the block to have the specified hash. We could replace the digest []byte with a predicate function and re-use it here.
epoch.go
Outdated
| func (e *Epoch) collectFutureFinalizationCertificates(fCert *FinalizationCertificate) { | ||
| fCertRound := fCert.Finalization.Round | ||
| // Don't exceed the max round window | ||
| endSeq := math.Min(float64(fCertRound), float64(e.maxRoundWindow+e.round)) |
There was a problem hiding this comment.
we are interested in replicating blocks and finalizations, right?
Then why are we looking at e.round and not at the next seq to commit?
There was a problem hiding this comment.
i chose e.round because the maxRoundWindow relates to the window of storing future messages(starting from e.round->e.round + maxRoundWindow). Therefore it should be fine to request sequences up to that round since we allow other messages from those rounds.
2a7b8d1 to
ff9c1a4
Compare
yacovm
left a comment
There was a problem hiding this comment.
Good job on decoupling the replication code path from the standard consensus path.
I am wondering if it's possible to even further decouple it though, and for example - not use the rounds map at all?
message_handler.go
Outdated
|
|
||
| // HandleRequest processes a request and returns a response. It also sends a response to the sender. | ||
| func (e *Epoch) HandleRequest(req *Request, from NodeID) *Response { | ||
| // TODO: should I update requests to be async? and have the epoch respond with e.Comm.Send(msg, node) |
There was a problem hiding this comment.
// TODO: should I update requests to be async? and have the epoch respond with e.Comm.Send(msg, node)
That's a good question. I think for now we can put it not async, but eventually when the code becomes more mature, we could have a dedicated thread to service all such requests and it won't even be tied to the Epoch object as we hand out blocks without caring which epoch we're at.
message_handler.go
Outdated
| func (e *Epoch) handleFinalizationCertificateRequest(req *FinalizationCertificateRequest) *FinalizationCertificateResponse { | ||
| e.Logger.Debug("Received finalization certificate request", zap.Int("num seqs", len(req.Sequences))) | ||
| seqs := req.Sequences | ||
| slices.Sort(seqs) |
There was a problem hiding this comment.
Alternatively, we can just obtain the height of the storage and skip all indices that are bigger or equal to the height.
I'm also OK with how this is done here, though.
Long term I think we ought to have a method that can retrieve several blocks from the storage in the same API call, but this is good enough for now.
epoch.go
Outdated
| // TODO: timeout | ||
| } | ||
|
|
||
| // add the fCert to the round |
There was a problem hiding this comment.
// add the fCert to the round
round, exists := e.rounds[md.Round]
if !exists {
// This shouldn't happen, but in case it does, return an error
e.Logger.Error("programming error: round not found", zap.Uint64("round", md.Round))
return md.Digest
}
round.fCert = &finalizedBlock.FCert
Not sure I understand why we bother to update the round object or the rounds map in general, when we know we will jump to the next round inside indexFinalizationCertificates and do progressRoundsDueToCommit.
There was a problem hiding this comment.
Is it possible to implement replication without having to touch the rounds map at all?
I am not sure why we need to store the proposal there. Can't we just verify the block and if we succeed we index it to storage and move to the next round?
There was a problem hiding this comment.
good point! removed using the rounds map in the verification task. However I still think we will still need to touch it in this specific case https://github.com/ava-labs/Simplex/pull/70/files#r1953899419
Signed-off-by: Sam Liokumovich <65994425+samliok@users.noreply.github.com>
| return | ||
| } | ||
|
|
||
| e.replicationState.maybeCollectFutureFinalizationCertificates(e.round, e.Storage.Height()) |
There was a problem hiding this comment.
I might be missing something but why are we sending requests for blocks here ?
There was a problem hiding this comment.
in case we have progressed enough rounds to send out more certificates. It might make more sense to call it when we progress the round due to a commit, but I think its also fine calling here as well
There was a problem hiding this comment.
I see, so as we process the blocks asynchronously - the round will increase and this would eventually satisfy the condition round+r.maxRoundWindow/2 > r.lastSequenceRequested inside maybeCollectFutureFinalizationCertificates, is that it?
Signed-off-by: Sam Liokumovich <65994425+samliok@users.noreply.github.com>
yacovm
left a comment
There was a problem hiding this comment.
LGTM and thanks for bearing through several review iterations!
| } | ||
| e.indexFinalizationCertificate(block, finalizedBlock.FCert) | ||
| e.processReplicationState() | ||
| err := e.maybeLoadFutureMessages() |
There was a problem hiding this comment.
this is in case we have a "gap" in the rounds map we need to fill with the replication, right?
There was a problem hiding this comment.
yep, also we may have received more messages during replication(such as blocks & notarizations) that we may need to start processing.
| return | ||
| } | ||
|
|
||
| e.replicationState.maybeCollectFutureFinalizationCertificates(e.round, e.Storage.Height()) |
There was a problem hiding this comment.
I see, so as we process the blocks asynchronously - the round will increase and this would eventually satisfy the condition round+r.maxRoundWindow/2 > r.lastSequenceRequested inside maybeCollectFutureFinalizationCertificates, is that it?
| } | ||
| msg := &Message{ReplicationRequest: roundRequest} | ||
|
|
||
| requestFrom := r.requestFrom() |
There was a problem hiding this comment.
would it make sense (as a follow-up PR) to request the sequences from the nodes that helped construct the fCert? We can get the list from fCert.QC.Signers().
Add replication support for lagging nodes
This PR adds support for nodes to catch up when they fall behind in consensus. The changes introduce a request/response system that allows nodes to request missing finalization certificates and latest round information from other nodes in the network.
The replication process works as follows:
fcertsandblocks(up tomaxRoundWindowahead). It also sends aLatestRoundRequestin order to fully catch up to the latest round.fCertsfrom previous rounds arrive.block,notarizationand potentiallyfCertfrom the latest round request earlier(TODO)TODO
fCertsfrom >maxRoundWindowaway. This essentially requires adding a callback to restart requesting nodes for newfCerts.MultiNodeTestin). The other tests I'd like to write are outlined inreplication_test.goroundsmap