Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Rename `evm-single` to `evm` and `grpc-single` to `evgrpc` for clarity. [#2839](https://github.com/evstack/ev-node/pull/2839)
- Split cache interface in `CacheManager` and `PendingManager` and create `da` client to easy DA handling. [#2878](https://github.com/evstack/ev-node/pull/2878)
- Improve startup da retrieval height when cache cleared or empty. [#2880](https://github.com/evstack/ev-node/pull/2880)

## v1.0.0-beta.10

Expand Down
23 changes: 10 additions & 13 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,38 +333,35 @@ func (s *Submitter) sendCriticalError(err error) {
// For blocks with empty transactions, both header and data use the same DA height since
// empty transaction data is not actually published to the DA layer.
func (s *Submitter) setSequencerHeightToDAHeight(ctx context.Context, height uint64, header *types.SignedHeader, data *types.Data, genesisInclusion bool) error {

headerHash, dataHash := header.Hash(), data.DACommitment()

headerHeightBytes := make([]byte, 8)
headerDaHeightBytes := make([]byte, 8)
daHeightForHeader, ok := s.cache.GetHeaderDAIncluded(headerHash.String())
if !ok {
return fmt.Errorf("header hash %s not found in cache", headerHash)
}
binary.LittleEndian.PutUint64(headerHeightBytes, daHeightForHeader)
genesisDAIncludedHeight := daHeightForHeader
binary.LittleEndian.PutUint64(headerDaHeightBytes, daHeightForHeader)

if err := s.store.SetMetadata(ctx, fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, height), headerHeightBytes); err != nil {
if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(height), headerDaHeightBytes); err != nil {
return err
}

dataHeightBytes := make([]byte, 8)
genesisDAIncludedHeight := daHeightForHeader
dataDaHeightBytes := make([]byte, 8)
// For empty transactions, use the same DA height as the header
if bytes.Equal(dataHash, common.DataHashForEmptyTxs) {
binary.LittleEndian.PutUint64(dataHeightBytes, daHeightForHeader)
binary.LittleEndian.PutUint64(dataDaHeightBytes, daHeightForHeader)
} else {
daHeightForData, ok := s.cache.GetDataDAIncluded(dataHash.String())
if !ok {
return fmt.Errorf("data hash %s not found in cache", dataHash.String())
}
binary.LittleEndian.PutUint64(dataHeightBytes, daHeightForData)
binary.LittleEndian.PutUint64(dataDaHeightBytes, daHeightForData)

// if data posted before header, use data da included height
if daHeightForData < genesisDAIncludedHeight {
genesisDAIncludedHeight = daHeightForData
}
// if data posted before header, use data da included height for genesis da height
genesisDAIncludedHeight = min(daHeightForData, genesisDAIncludedHeight)
}
if err := s.store.SetMetadata(ctx, fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, height), dataHeightBytes); err != nil {
if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(height), dataDaHeightBytes); err != nil {
return err
}

Expand Down
8 changes: 4 additions & 4 deletions block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func TestSubmitter_setSequencerHeightToDAHeight(t *testing.T) {
cm.SetHeaderDAIncluded(h.Hash().String(), 100, 1)
cm.SetDataDAIncluded(d.DACommitment().String(), 90, 1)

headerKey := fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, 1)
dataKey := fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, 1)
headerKey := store.GetHeightToDAHeightHeaderKey(1)
dataKey := store.GetHeightToDAHeightDataKey(1)

hBz := make([]byte, 8)
binary.LittleEndian.PutUint64(hBz, 100)
Expand Down Expand Up @@ -297,11 +297,11 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) {

// verify metadata mapping persisted in store
for i := range 2 {
hBz, err := st.GetMetadata(ctx, fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, i+1))
hBz, err := st.GetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(uint64(i+1)))
require.NoError(t, err)
assert.Equal(t, uint64(100+i), binary.LittleEndian.Uint64(hBz))

dBz, err := st.GetMetadata(ctx, fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, i+1))
dBz, err := st.GetMetadata(ctx, store.GetHeightToDAHeightDataKey(uint64(i+1)))
require.NoError(t, err)
assert.Equal(t, uint64(100+i), binary.LittleEndian.Uint64(dBz))
}
Expand Down
38 changes: 36 additions & 2 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncing
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -201,9 +202,9 @@ func (s *Syncer) initializeState() error {
}
s.SetLastState(state)

// Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height.
// Set DA height to the maximum of the genesis start height, the state's DA height, the cached DA height, and the highest stored included DA height.
// This ensures we resume from the highest known DA height, even if the cache is cleared on restart. If the DA height is too high because of a user error, reset it with --evnode.clear_cache. The DA height will be back to the last highest known executed DA height for a height.
s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))
s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight, s.getHighestStoredDAHeight()))

s.logger.Info().
Uint64("height", state.LastBlockHeight).
Expand Down Expand Up @@ -721,6 +722,39 @@ func (s *Syncer) sleepOrDone(duration time.Duration) bool {
}
}

// getHighestStoredDAHeight retrieves the highest DA height from the store by checking
// the DA heights stored for the last DA included height
// this relies on the node syncing with DA and setting included heights.
func (s *Syncer) getHighestStoredDAHeight() uint64 {
// Get the DA included height from store
daIncludedHeightBytes, err := s.store.GetMetadata(s.ctx, store.DAIncludedHeightKey)
if err != nil || len(daIncludedHeightBytes) != 8 {
return 0
}
daIncludedHeight := binary.LittleEndian.Uint64(daIncludedHeightBytes)
if daIncludedHeight == 0 {
return 0
}

var highestDAHeight uint64

// Get header DA height for the last included height
headerKey := store.GetHeightToDAHeightHeaderKey(daIncludedHeight)
if headerBytes, err := s.store.GetMetadata(s.ctx, headerKey); err == nil && len(headerBytes) == 8 {
headerDAHeight := binary.LittleEndian.Uint64(headerBytes)
highestDAHeight = max(highestDAHeight, headerDAHeight)
}

// Get data DA height for the last included height
dataKey := store.GetHeightToDAHeightDataKey(daIncludedHeight)
if dataBytes, err := s.store.GetMetadata(s.ctx, dataKey); err == nil && len(dataBytes) == 8 {
dataDAHeight := binary.LittleEndian.Uint64(dataBytes)
highestDAHeight = max(highestDAHeight, dataDAHeight)
}

return highestDAHeight
}
Comment on lines +728 to +756
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function can be improved for robustness and maintainability.

  1. Error Handling: Errors from s.store.GetMetadata are silently ignored. It's better to log these errors (unless it's a datastore.ErrNotFound) to help debug potential issues with the data store. This applies to all three GetMetadata calls in this function.
  2. Code Duplication: The logic to fetch and decode the header and data DA heights is duplicated. Extracting this into a helper function or a local closure would reduce redundancy.

Here is an example of how it could be refactored:

func (s *Syncer) getHighestStoredDAHeight() uint64 {
	daIncludedHeightBytes, err := s.store.GetMetadata(s.ctx, store.DAIncludedHeightKey)
	if err != nil {
		if !errors.Is(err, datastore.ErrNotFound) {
			s.logger.Warn().Err(err).Msg("Failed to get DA included height from store")
		}
		return 0
	}
	if len(daIncludedHeightBytes) != 8 {
		// Potentially log this as it indicates data corruption
		return 0
	}

	daIncludedHeight := binary.LittleEndian.Uint64(daIncludedHeightBytes)
	if daIncludedHeight == 0 {
		return 0
	}

	// Helper to fetch and decode a DA height from a given key.
	getDAHeight := func(key string) uint64 {
		bz, err := s.store.GetMetadata(s.ctx, key)
		if err != nil {
			if !errors.Is(err, datastore.ErrNotFound) {
				s.logger.Warn().Err(err).Str("key", key).Msg("Failed to get DA height from store")
			}
			return 0
		}
		if len(bz) != 8 {
			// Potentially log this as it indicates data corruption
			return 0
		}
		return binary.LittleEndian.Uint64(bz)
	}

	headerDAHeight := getDAHeight(store.GetHeightToDAHeightHeaderKey(daIncludedHeight))
	dataDAHeight := getDAHeight(store.GetHeightToDAHeightDataKey(daIncludedHeight))

	return max(headerDAHeight, dataDAHeight)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is shorter the way it currently is, and we do want to skip errors and not clutter the logs


type p2pWaitState struct {
height uint64
cancel context.CancelFunc
Expand Down
59 changes: 59 additions & 0 deletions block/internal/syncing/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
crand "crypto/rand"
"crypto/sha512"
"encoding/binary"
"errors"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -605,6 +606,9 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) {
nil,
)

// Mock GetMetadata calls for DA included height retrieval
mockStore.EXPECT().GetMetadata(mock.Anything, store.DAIncludedHeightKey).Return(nil, datastore.ErrNotFound)

// Setup execution layer to be in sync
mockExec.On("GetLatestHeight", mock.Anything).Return(storeHeight, nil)

Expand Down Expand Up @@ -641,3 +645,58 @@ func requireEmptyChan(t *testing.T, errorCh chan error) {
default:
}
}

func TestSyncer_getHighestStoredDAHeight(t *testing.T) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
st := store.New(ds)
ctx := context.Background()

syncer := &Syncer{
store: st,
ctx: ctx,
logger: zerolog.Nop(),
}

// Test case 1: No DA included height set
highestDA := syncer.getHighestStoredDAHeight()
assert.Equal(t, uint64(0), highestDA)

// Test case 2: DA included height set, but no mappings
bz := make([]byte, 8)
binary.LittleEndian.PutUint64(bz, 1)
require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz))

highestDA = syncer.getHighestStoredDAHeight()
assert.Equal(t, uint64(0), highestDA)

// Test case 3: DA included height with header mapping
headerBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(headerBytes, 100)
require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(1), headerBytes))

highestDA = syncer.getHighestStoredDAHeight()
assert.Equal(t, uint64(100), highestDA)

// Test case 4: DA included height with both header and data mappings (data is higher)
dataBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(dataBytes, 105)
require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(1), dataBytes))

highestDA = syncer.getHighestStoredDAHeight()
assert.Equal(t, uint64(105), highestDA)

// Test case 5: Advance to height 2 with higher DA heights
binary.LittleEndian.PutUint64(bz, 2)
require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz))

headerBytes2 := make([]byte, 8)
binary.LittleEndian.PutUint64(headerBytes2, 200)
require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(2), headerBytes2))

dataBytes2 := make([]byte, 8)
binary.LittleEndian.PutUint64(dataBytes2, 195)
require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(2), dataBytes2))

highestDA = syncer.getHighestStoredDAHeight()
assert.Equal(t, uint64(200), highestDA, "should return highest DA height from most recent included height")
}
4 changes: 2 additions & 2 deletions pkg/rpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ func (s *StoreServer) GetBlock(
// Fetch and set DA heights
blockHeight := header.Height()
if blockHeight > 0 { // DA heights are not stored for genesis/height 0 in the current impl
headerDAHeightKey := fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, blockHeight)
headerDAHeightKey := store.GetHeightToDAHeightHeaderKey(blockHeight)
headerDAHeightBytes, err := s.store.GetMetadata(ctx, headerDAHeightKey)
if err == nil && len(headerDAHeightBytes) == 8 {
resp.HeaderDaHeight = binary.LittleEndian.Uint64(headerDAHeightBytes)
} else if err != nil && !errors.Is(err, ds.ErrNotFound) {
s.logger.Error().Uint64("height", blockHeight).Err(err).Msg("Error fetching header DA height for block")
}

dataDAHeightKey := fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, blockHeight)
dataDAHeightKey := store.GetHeightToDAHeightDataKey(blockHeight)
dataDAHeightBytes, err := s.store.GetMetadata(ctx, dataDAHeightKey)
if err == nil && len(dataDAHeightBytes) == 8 {
resp.DataDaHeight = binary.LittleEndian.Uint64(dataDAHeightBytes)
Expand Down
16 changes: 8 additions & 8 deletions pkg/rpc/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func TestGetBlock(t *testing.T) {
t.Run("by height with DA heights", func(t *testing.T) {
// Setup mock expectations
mockStore.On("GetBlockData", mock.Anything, height).Return(header, data, nil).Once()
mockStore.On("GetMetadata", mock.Anything, fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, height)).Return(headerDAHeightBytes, nil).Once()
mockStore.On("GetMetadata", mock.Anything, fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, height)).Return(dataDAHeightBytes, nil).Once()
mockStore.On("GetMetadata", mock.Anything, store.GetHeightToDAHeightHeaderKey(height)).Return(headerDAHeightBytes, nil).Once()
mockStore.On("GetMetadata", mock.Anything, store.GetHeightToDAHeightDataKey(height)).Return(dataDAHeightBytes, nil).Once()

req := connect.NewRequest(&pb.GetBlockRequest{
Identifier: &pb.GetBlockRequest_Height{
Expand All @@ -74,8 +74,8 @@ func TestGetBlock(t *testing.T) {
// Test GetBlock with height - metadata not found
t.Run("by height DA heights not found", func(t *testing.T) {
mockStore.On("GetBlockData", mock.Anything, height).Return(header, data, nil).Once()
mockStore.On("GetMetadata", mock.Anything, fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, height)).Return(nil, ds.ErrNotFound).Once()
mockStore.On("GetMetadata", mock.Anything, fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, height)).Return(nil, ds.ErrNotFound).Once()
mockStore.On("GetMetadata", mock.Anything, store.GetHeightToDAHeightHeaderKey(height)).Return(nil, ds.ErrNotFound).Once()
mockStore.On("GetMetadata", mock.Anything, store.GetHeightToDAHeightDataKey(height)).Return(nil, ds.ErrNotFound).Once()

req := connect.NewRequest(&pb.GetBlockRequest{
Identifier: &pb.GetBlockRequest_Height{
Expand All @@ -96,8 +96,8 @@ func TestGetBlock(t *testing.T) {
// Important: The header returned by GetBlockByHash must also have its height set for DA height lookup
headerForHash := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: height}}}
mockStore.On("GetBlockByHash", mock.Anything, hashBytes).Return(headerForHash, data, nil).Once()
mockStore.On("GetMetadata", mock.Anything, fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, height)).Return(headerDAHeightBytes, nil).Once()
mockStore.On("GetMetadata", mock.Anything, fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, height)).Return(dataDAHeightBytes, nil).Once()
mockStore.On("GetMetadata", mock.Anything, store.GetHeightToDAHeightHeaderKey(height)).Return(headerDAHeightBytes, nil).Once()
mockStore.On("GetMetadata", mock.Anything, store.GetHeightToDAHeightDataKey(height)).Return(dataDAHeightBytes, nil).Once()

req := connect.NewRequest(&pb.GetBlockRequest{
Identifier: &pb.GetBlockRequest_Hash{
Expand Down Expand Up @@ -157,8 +157,8 @@ func TestGetBlock_Latest(t *testing.T) {
// Expectation for GetBlockData with the latest height
mockStore.On("GetBlockData", context.Background(), latestHeight).Return(header, data, nil).Once()
// Expectation for DA height metadata
mockStore.On("GetMetadata", mock.Anything, fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, latestHeight)).Return(headerDAHeightBytes, nil).Once()
mockStore.On("GetMetadata", mock.Anything, fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, latestHeight)).Return(dataDAHeightBytes, nil).Once()
mockStore.On("GetMetadata", mock.Anything, store.GetHeightToDAHeightHeaderKey(latestHeight)).Return(headerDAHeightBytes, nil).Once()
mockStore.On("GetMetadata", mock.Anything, store.GetHeightToDAHeightDataKey(latestHeight)).Return(dataDAHeightBytes, nil).Once()

req := connect.NewRequest(&pb.GetBlockRequest{
Identifier: &pb.GetBlockRequest_Height{
Expand Down
12 changes: 12 additions & 0 deletions pkg/store/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,15 @@ func getIndexKey(hash types.Hash) string {
func getHeightKey() string {
return GenerateKey([]string{heightPrefix})
}

// GetHeightToDAHeightHeaderKey returns the metadata key for storing the DA height
// where a block's header was included for a given sequencer height.
func GetHeightToDAHeightHeaderKey(height uint64) string {
return HeightToDAHeightKey + "/" + strconv.FormatUint(height, 10) + "/h"
}

// GetHeightToDAHeightDataKey returns the metadata key for storing the DA height
// where a block's data was included for a given sequencer height.
func GetHeightToDAHeightDataKey(height uint64) string {
return HeightToDAHeightKey + "/" + strconv.FormatUint(height, 10) + "/d"
}
Loading