Skip to content

Commit

Permalink
beacon/light: request finality update explicitly when necessary (#29567)
Browse files Browse the repository at this point in the history
This PR adds an extra mechanism to sync.HeadSync that tries to retrieve the latest finality update from every server each time it sends an optimistic update in a new epoch (unless we already have a validated finality update attested in the same epoch). 

Note that this is not necessary and does not happen if the new finality update is delivered before the optimistic update. The spec only mandates light_client_finality_update events when a new epoch is finalized. If the chain does not finalize for a while then we might need an explicit request that returns a finality proof that proves the same finality epoch from the latest attested epoch.
  • Loading branch information
zsfelfoldi committed Apr 23, 2024
1 parent b2b0e1d commit 256d4b0
Show file tree
Hide file tree
Showing 14 changed files with 316 additions and 159 deletions.
18 changes: 9 additions & 9 deletions beacon/blsync/block_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type beaconBlockSync struct {

type headTracker interface {
PrefetchHead() types.HeadInfo
ValidatedHead() (types.SignedHeader, bool)
ValidatedOptimistic() (types.OptimisticUpdate, bool)
ValidatedFinality() (types.FinalityUpdate, bool)
}

Expand All @@ -66,6 +66,7 @@ func (s *beaconBlockSync) Process(requester request.Requester, events []request.
case request.EvResponse, request.EvFail, request.EvTimeout:
sid, req, resp := event.RequestInfo()
blockRoot := common.Hash(req.(sync.ReqBeaconBlock))
log.Debug("Beacon block event", "type", event.Type.Name, "hash", blockRoot)
if resp != nil {
s.recentBlocks.Add(blockRoot, resp.(*types.BeaconBlock))
}
Expand All @@ -80,8 +81,8 @@ func (s *beaconBlockSync) Process(requester request.Requester, events []request.
}
s.updateEventFeed()
// request validated head block if unavailable and not yet requested
if vh, ok := s.headTracker.ValidatedHead(); ok {
s.tryRequestBlock(requester, vh.Header.Hash(), false)
if vh, ok := s.headTracker.ValidatedOptimistic(); ok {
s.tryRequestBlock(requester, vh.Attested.Hash(), false)
}
// request prefetch head if the given server has announced it
if prefetchHead := s.headTracker.PrefetchHead().BlockRoot; prefetchHead != (common.Hash{}) {
Expand Down Expand Up @@ -114,31 +115,30 @@ func blockHeadInfo(block *types.BeaconBlock) types.HeadInfo {
}

func (s *beaconBlockSync) updateEventFeed() {
head, ok := s.headTracker.ValidatedHead()
optimistic, ok := s.headTracker.ValidatedOptimistic()
if !ok {
return
}

validatedHead := head.Header.Hash()
validatedHead := optimistic.Attested.Hash()
headBlock, ok := s.recentBlocks.Get(validatedHead)
if !ok {
return
}

var finalizedHash common.Hash
if finality, ok := s.headTracker.ValidatedFinality(); ok {
he := head.Header.Epoch()
he := optimistic.Attested.Epoch()
fe := finality.Attested.Header.Epoch()
switch {
case he == fe:
finalizedHash = finality.Finalized.PayloadHeader.BlockHash()
case he < fe:
return
case he == fe+1:
parent, ok := s.recentBlocks.Get(head.Header.ParentRoot)
parent, ok := s.recentBlocks.Get(optimistic.Attested.ParentRoot)
if !ok || parent.Slot()/params.EpochLength == fe {
return // head is at first slot of next epoch, wait for finality update
//TODO: try to fetch finality update directly if subscription does not deliver
}
}
}
Expand All @@ -156,7 +156,7 @@ func (s *beaconBlockSync) updateEventFeed() {
return
}
s.chainHeadFeed.Send(types.ChainHeadEvent{
BeaconHead: head.Header,
BeaconHead: optimistic.Attested.Header,
Block: execBlock,
Finalized: finalizedHash,
})
Expand Down
8 changes: 6 additions & 2 deletions beacon/blsync/block_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,12 @@ func (h *testHeadTracker) PrefetchHead() types.HeadInfo {
return h.prefetch
}

func (h *testHeadTracker) ValidatedHead() (types.SignedHeader, bool) {
return h.validated, h.validated.Header != (types.Header{})
func (h *testHeadTracker) ValidatedOptimistic() (types.OptimisticUpdate, bool) {
return types.OptimisticUpdate{
Attested: types.HeaderWithExecProof{Header: h.validated.Header},
Signature: h.validated.Signature,
SignatureSlot: h.validated.SignatureSlot,
}, h.validated.Header != (types.Header{})
}

// TODO add test case for finality
Expand Down
3 changes: 3 additions & 0 deletions beacon/blsync/engineclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) {
for {
select {
case <-ec.rootCtx.Done():
log.Debug("Stopping engine API update loop")
return

case event := <-headCh:
Expand All @@ -73,12 +74,14 @@ func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) {
fork := ec.config.ForkAtEpoch(event.BeaconHead.Epoch())
forkName := strings.ToLower(fork.Name)

log.Debug("Calling NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash())
if status, err := ec.callNewPayload(forkName, event); err == nil {
log.Info("Successful NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash(), "status", status)
} else {
log.Error("Failed NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash(), "error", err)
}

log.Debug("Calling ForkchoiceUpdated", "head", event.Block.Hash())
if status, err := ec.callForkchoiceUpdated(forkName, event); err == nil {
log.Info("Successful ForkchoiceUpdated", "head", event.Block.Hash(), "status", status)
} else {
Expand Down
16 changes: 10 additions & 6 deletions beacon/light/api/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ func (s *ApiServer) Subscribe(eventCallback func(event request.Event)) {
log.Debug("New head received", "slot", slot, "blockRoot", blockRoot)
eventCallback(request.Event{Type: sync.EvNewHead, Data: types.HeadInfo{Slot: slot, BlockRoot: blockRoot}})
},
OnSignedHead: func(head types.SignedHeader) {
log.Debug("New signed head received", "slot", head.Header.Slot, "blockRoot", head.Header.Hash(), "signerCount", head.Signature.SignerCount())
eventCallback(request.Event{Type: sync.EvNewSignedHead, Data: head})
OnOptimistic: func(update types.OptimisticUpdate) {
log.Debug("New optimistic update received", "slot", update.Attested.Slot, "blockRoot", update.Attested.Hash(), "signerCount", update.Signature.SignerCount())
eventCallback(request.Event{Type: sync.EvNewOptimisticUpdate, Data: update})
},
OnFinality: func(head types.FinalityUpdate) {
log.Debug("New finality update received", "slot", head.Attested.Slot, "blockRoot", head.Attested.Hash(), "signerCount", head.Signature.SignerCount())
eventCallback(request.Event{Type: sync.EvNewFinalityUpdate, Data: head})
OnFinality: func(update types.FinalityUpdate) {
log.Debug("New finality update received", "slot", update.Attested.Slot, "blockRoot", update.Attested.Hash(), "signerCount", update.Signature.SignerCount())
eventCallback(request.Event{Type: sync.EvNewFinalityUpdate, Data: update})
},
OnError: func(err error) {
log.Warn("Head event stream error", "err", err)
Expand Down Expand Up @@ -83,13 +83,17 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) {
case sync.ReqBeaconBlock:
log.Debug("Beacon API: requesting block", "reqid", id, "hash", common.Hash(data))
resp, err = s.api.GetBeaconBlock(common.Hash(data))
case sync.ReqFinality:
log.Debug("Beacon API: requesting finality update")
resp, err = s.api.GetFinalityUpdate()
default:
}

if err != nil {
log.Warn("Beacon API request failed", "type", reflect.TypeOf(req), "reqid", id, "err", err)
s.eventCallback(request.Event{Type: request.EvFail, Data: request.RequestResponse{ID: id, Request: req}})
} else {
log.Debug("Beacon API request answered", "type", reflect.TypeOf(req), "reqid", id)
s.eventCallback(request.Event{Type: request.EvResponse, Data: request.RequestResponse{ID: id, Request: req, Response: resp}})
}
}()
Expand Down
77 changes: 53 additions & 24 deletions beacon/light/api/light_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/beacon/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)

var (
Expand Down Expand Up @@ -184,46 +185,56 @@ func (api *BeaconLightApi) GetBestUpdatesAndCommittees(firstPeriod, count uint64
return updates, committees, nil
}

// GetOptimisticHeadUpdate fetches a signed header based on the latest available
// optimistic update. Note that the signature should be verified by the caller
// as its validity depends on the update chain.
// GetOptimisticUpdate fetches the latest available optimistic update.
// Note that the signature should be verified by the caller as its validity
// depends on the update chain.
//
// See data structure definition here:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientoptimisticupdate
func (api *BeaconLightApi) GetOptimisticHeadUpdate() (types.SignedHeader, error) {
func (api *BeaconLightApi) GetOptimisticUpdate() (types.OptimisticUpdate, error) {
resp, err := api.httpGet("/eth/v1/beacon/light_client/optimistic_update")
if err != nil {
return types.SignedHeader{}, err
return types.OptimisticUpdate{}, err
}
return decodeOptimisticHeadUpdate(resp)
return decodeOptimisticUpdate(resp)
}

func decodeOptimisticHeadUpdate(enc []byte) (types.SignedHeader, error) {
func decodeOptimisticUpdate(enc []byte) (types.OptimisticUpdate, error) {
var data struct {
Data struct {
Header jsonBeaconHeader `json:"attested_header"`
Aggregate types.SyncAggregate `json:"sync_aggregate"`
SignatureSlot common.Decimal `json:"signature_slot"`
Version string
Data struct {
Attested jsonHeaderWithExecProof `json:"attested_header"`
Aggregate types.SyncAggregate `json:"sync_aggregate"`
SignatureSlot common.Decimal `json:"signature_slot"`
} `json:"data"`
}
if err := json.Unmarshal(enc, &data); err != nil {
return types.SignedHeader{}, err
return types.OptimisticUpdate{}, err
}
// Decode the execution payload headers.
attestedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Attested.Execution)
if err != nil {
return types.OptimisticUpdate{}, fmt.Errorf("invalid attested header: %v", err)
}
if data.Data.Header.Beacon.StateRoot == (common.Hash{}) {
if data.Data.Attested.Beacon.StateRoot == (common.Hash{}) {
// workaround for different event encoding format in Lodestar
if err := json.Unmarshal(enc, &data.Data); err != nil {
return types.SignedHeader{}, err
return types.OptimisticUpdate{}, err
}
}

if len(data.Data.Aggregate.Signers) != params.SyncCommitteeBitmaskSize {
return types.SignedHeader{}, errors.New("invalid sync_committee_bits length")
return types.OptimisticUpdate{}, errors.New("invalid sync_committee_bits length")
}
if len(data.Data.Aggregate.Signature) != params.BLSSignatureSize {
return types.SignedHeader{}, errors.New("invalid sync_committee_signature length")
return types.OptimisticUpdate{}, errors.New("invalid sync_committee_signature length")
}
return types.SignedHeader{
Header: data.Data.Header.Beacon,
return types.OptimisticUpdate{
Attested: types.HeaderWithExecProof{
Header: data.Data.Attested.Beacon,
PayloadHeader: attestedExecHeader,
PayloadBranch: data.Data.Attested.ExecutionBranch,
},
Signature: data.Data.Aggregate,
SignatureSlot: uint64(data.Data.SignatureSlot),
}, nil
Expand Down Expand Up @@ -411,7 +422,7 @@ func decodeHeadEvent(enc []byte) (uint64, common.Hash, error) {

type HeadEventListener struct {
OnNewHead func(slot uint64, blockRoot common.Hash)
OnSignedHead func(head types.SignedHeader)
OnOptimistic func(head types.OptimisticUpdate)
OnFinality func(head types.FinalityUpdate)
OnError func(err error)
}
Expand Down Expand Up @@ -449,21 +460,35 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
defer wg.Done()

// Request initial data.
log.Trace("Requesting initial head header")
if head, _, _, err := api.GetHeader(common.Hash{}); err == nil {
log.Trace("Retrieved initial head header", "slot", head.Slot, "hash", head.Hash())
listener.OnNewHead(head.Slot, head.Hash())
} else {
log.Debug("Failed to retrieve initial head header", "error", err)
}
if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil {
listener.OnSignedHead(signedHead)
log.Trace("Requesting initial optimistic update")
if optimisticUpdate, err := api.GetOptimisticUpdate(); err == nil {
log.Trace("Retrieved initial optimistic update", "slot", optimisticUpdate.Attested.Slot, "hash", optimisticUpdate.Attested.Hash())
listener.OnOptimistic(optimisticUpdate)
} else {
log.Debug("Failed to retrieve initial optimistic update", "error", err)
}
log.Trace("Requesting initial finality update")
if finalityUpdate, err := api.GetFinalityUpdate(); err == nil {
log.Trace("Retrieved initial finality update", "slot", finalityUpdate.Finalized.Slot, "hash", finalityUpdate.Finalized.Hash())
listener.OnFinality(finalityUpdate)
} else {
log.Debug("Failed to retrieve initial finality update", "error", err)
}

log.Trace("Starting event stream processing loop")
// Receive the stream.
var stream *eventsource.Stream
select {
case stream = <-streamCh:
case <-ctx.Done():
log.Trace("Stopping event stream processing loop")
return
}

Expand All @@ -474,8 +499,10 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()

case event, ok := <-stream.Events:
if !ok {
log.Trace("Event stream closed")
return
}
log.Trace("New event received from event stream", "type", event.Event())
switch event.Event() {
case "head":
slot, blockRoot, err := decodeHeadEvent([]byte(event.Data()))
Expand All @@ -485,9 +512,9 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
listener.OnError(fmt.Errorf("error decoding head event: %v", err))
}
case "light_client_optimistic_update":
signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data()))
optimisticUpdate, err := decodeOptimisticUpdate([]byte(event.Data()))
if err == nil {
listener.OnSignedHead(signedHead)
listener.OnOptimistic(optimisticUpdate)
} else {
listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err))
}
Expand Down Expand Up @@ -521,7 +548,8 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
// established. It can only return nil when the context is canceled.
func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream {
for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) {
path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update"
path := "/eth/v1/events?topics=head&topics=light_client_finality_update&topics=light_client_optimistic_update"
log.Trace("Sending event subscription request")
req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
Expand All @@ -535,6 +563,7 @@ func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadE
listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
continue
}
log.Trace("Successfully created event stream")
return stream
}
return nil
Expand Down
Loading

0 comments on commit 256d4b0

Please sign in to comment.