Skip to content

Commit

Permalink
Allow multiple getPayload calls for a delivered payload (#400)
Browse files Browse the repository at this point in the history
* Allow multiple getPayload calls for a delivered payload

* added comment

---------

Co-authored-by: Chris Hager <chris@linuxuser.at>
  • Loading branch information
avalonche and metachris committed May 12, 2023
1 parent b5bab91 commit e6387e5
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 29 deletions.
33 changes: 27 additions & 6 deletions datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ var (
RedisStatsFieldLatestSlot = "latest-slot"
RedisStatsFieldValidatorsTotal = "validators-total"

ErrFailedUpdatingTopBidNoBids = errors.New("failed to update top bid because no bids were found")
ErrSlotAlreadyDelivered = errors.New("payload for slot was already delivered")
ErrFailedUpdatingTopBidNoBids = errors.New("failed to update top bid because no bids were found")
ErrAnotherPayloadAlreadyDeliveredForSlot = errors.New("another payload block hash for slot was already delivered")
ErrPastSlotAlreadyDelivered = errors.New("payload for past slot was already delivered")
)

func PubkeyHexToLowerStr(pk boostTypes.PubkeyHex) string {
Expand Down Expand Up @@ -76,6 +77,7 @@ type RedisCache struct {
keyProposerDuties string
keyBlockBuilderStatus string
keyLastSlotDelivered string
keyLastHashDelivered string
}

func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
Expand Down Expand Up @@ -116,6 +118,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
keyProposerDuties: fmt.Sprintf("%s/%s:proposer-duties", redisPrefix, prefix),
keyBlockBuilderStatus: fmt.Sprintf("%s/%s:block-builder-status", redisPrefix, prefix),
keyLastSlotDelivered: fmt.Sprintf("%s/%s:last-slot-delivered", redisPrefix, prefix),
keyLastHashDelivered: fmt.Sprintf("%s/%s:last-hash-delivered", redisPrefix, prefix),
}, nil
}

Expand Down Expand Up @@ -264,7 +267,7 @@ func (r *RedisCache) GetActiveValidators() (map[boostTypes.PubkeyHex]bool, error
return validators, nil
}

func (r *RedisCache) CheckAndSetLastSlotDelivered(slot uint64) (err error) {
func (r *RedisCache) CheckAndSetLastSlotAndHashDelivered(slot uint64, hash string) (err error) {
// More details about Redis optimistic locking:
// - https://redis.uptrace.dev/guide/go-redis-pipelines.html#transactions
// - https://github.com/redis/go-redis/blob/6ecbcf6c90919350c42181ce34c1cbdfbd5d1463/race_test.go#L183
Expand All @@ -274,25 +277,43 @@ func (r *RedisCache) CheckAndSetLastSlotDelivered(slot uint64) (err error) {
return err
}

if slot <= lastSlotDelivered {
return ErrSlotAlreadyDelivered
// slot in the past, reject request
if slot < lastSlotDelivered {
return ErrPastSlotAlreadyDelivered
}

// current slot, reject request if hash is different
if slot == lastSlotDelivered {
lastHashDelivered, err := tx.Get(context.Background(), r.keyLastHashDelivered).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return err
}
if hash != lastHashDelivered {
return ErrAnotherPayloadAlreadyDeliveredForSlot
}
return nil
}

_, err = tx.TxPipelined(context.Background(), func(pipe redis.Pipeliner) error {
pipe.Set(context.Background(), r.keyLastSlotDelivered, slot, 0)
pipe.Set(context.Background(), r.keyLastHashDelivered, hash, 0)
return nil
})

return err
}

return r.client.Watch(context.Background(), txf, r.keyLastSlotDelivered)
return r.client.Watch(context.Background(), txf, r.keyLastSlotDelivered, r.keyLastHashDelivered)
}

func (r *RedisCache) GetLastSlotDelivered() (slot uint64, err error) {
return r.client.Get(context.Background(), r.keyLastSlotDelivered).Uint64()
}

func (r *RedisCache) GetLastHashDelivered() (hash string, err error) {
return r.client.Get(context.Background(), r.keyLastHashDelivered).Result()
}

func (r *RedisCache) SetStats(field string, value any) (err error) {
return r.client.HSet(context.Background(), r.keyStats, field, value).Err()
}
Expand Down
59 changes: 42 additions & 17 deletions datastore/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,38 +315,50 @@ func TestRedisURIs(t *testing.T) {
require.Error(t, err)
}

func TestCheckAndSetLastSlotDelivered(t *testing.T) {
func TestCheckAndSetLastSlotAndHashDelivered(t *testing.T) {
cache := setupTestRedis(t)
newSlot := uint64(123)
newHash := "0x0000000000000000000000000000000000000000000000000000000000000000"

// should return redis.Nil if wasn't set
slot, err := cache.GetLastSlotDelivered()
require.ErrorIs(t, err, redis.Nil)
require.Equal(t, uint64(0), slot)

// should be able to set once
err = cache.CheckAndSetLastSlotDelivered(newSlot)
err = cache.CheckAndSetLastSlotAndHashDelivered(newSlot, newHash)
require.NoError(t, err)

// should get slot
slot, err = cache.GetLastSlotDelivered()
require.NoError(t, err)
require.Equal(t, newSlot, slot)

// should fail on second time
err = cache.CheckAndSetLastSlotDelivered(newSlot)
require.ErrorIs(t, err, ErrSlotAlreadyDelivered)
// should get hash
hash, err := cache.GetLastHashDelivered()
require.NoError(t, err)
require.Equal(t, newHash, hash)

// should fail on a different payload (mismatch block hash)
differentHash := "0x0000000000000000000000000000000000000000000000000000000000000001"
err = cache.CheckAndSetLastSlotAndHashDelivered(newSlot, differentHash)
require.ErrorIs(t, err, ErrAnotherPayloadAlreadyDeliveredForSlot)

// should not return error for same hash
err = cache.CheckAndSetLastSlotAndHashDelivered(newSlot, newHash)
require.NoError(t, err)

// should also fail on earlier slots
err = cache.CheckAndSetLastSlotDelivered(newSlot - 1)
require.ErrorIs(t, err, ErrSlotAlreadyDelivered)
err = cache.CheckAndSetLastSlotAndHashDelivered(newSlot-1, newHash)
require.ErrorIs(t, err, ErrPastSlotAlreadyDelivered)
}

// Test_CheckAndSetLastSlotDeliveredForTesting ensures the optimistic locking works
// i.e. running CheckAndSetLastSlotDelivered leading to err == redis.TxFailedErr
func Test_CheckAndSetLastSlotDeliveredForTesting(t *testing.T) {
// Test_CheckAndSetLastSlotAndHashDeliveredForTesting ensures the optimistic locking works
// i.e. running CheckAndSetLastSlotAndHashDelivered leading to err == redis.TxFailedErr
func Test_CheckAndSetLastSlotAndHashDeliveredForTesting(t *testing.T) {
cache := setupTestRedis(t)
newSlot := uint64(123)
hash := "0x0000000000000000000000000000000000000000000000000000000000000000"
n := 3

errC := make(chan error, n)
Expand All @@ -357,7 +369,7 @@ func Test_CheckAndSetLastSlotDeliveredForTesting(t *testing.T) {
for i := 0; i < n; i++ {
syncWG.Add(1)
go func() {
errC <- _CheckAndSetLastSlotDeliveredForTesting(cache, waitC, &syncWG, newSlot)
errC <- _CheckAndSetLastSlotAndHashDeliveredForTesting(cache, waitC, &syncWG, newSlot, hash)
}()
}

Expand All @@ -375,29 +387,42 @@ func Test_CheckAndSetLastSlotDeliveredForTesting(t *testing.T) {
require.ErrorIs(t, err, redis.TxFailedErr)
}

// Any later call should return ErrSlotAlreadyDelivered
err = _CheckAndSetLastSlotDeliveredForTesting(cache, waitC, &syncWG, newSlot)
// Any later call with a different hash should return ErrPayloadAlreadyDeliveredForSlot
differentHash := "0x0000000000000000000000000000000000000000000000000000000000000001"
err = _CheckAndSetLastSlotAndHashDeliveredForTesting(cache, waitC, &syncWG, newSlot, differentHash)
waitC <- true
require.ErrorIs(t, err, ErrSlotAlreadyDelivered)
require.ErrorIs(t, err, ErrAnotherPayloadAlreadyDeliveredForSlot)
}

func _CheckAndSetLastSlotDeliveredForTesting(r *RedisCache, waitC chan bool, wg *sync.WaitGroup, slot uint64) (err error) {
func _CheckAndSetLastSlotAndHashDeliveredForTesting(r *RedisCache, waitC chan bool, wg *sync.WaitGroup, slot uint64, hash string) (err error) {
// copied from redis.go, with added channel and waitgroup to test the race condition in a controlled way
txf := func(tx *redis.Tx) error {
lastSlotDelivered, err := tx.Get(context.Background(), r.keyLastSlotDelivered).Uint64()
if err != nil && !errors.Is(err, redis.Nil) {
return err
}

if slot <= lastSlotDelivered {
return ErrSlotAlreadyDelivered
if slot < lastSlotDelivered {
return ErrPastSlotAlreadyDelivered
}

if slot == lastSlotDelivered {
lastHashDelivered, err := tx.Get(context.Background(), r.keyLastHashDelivered).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return err
}
if hash != lastHashDelivered {
return ErrAnotherPayloadAlreadyDeliveredForSlot
}
return nil
}

wg.Done()
<-waitC

_, err = tx.TxPipelined(context.Background(), func(pipe redis.Pipeliner) error {
pipe.Set(context.Background(), r.keyLastSlotDelivered, slot, 0)
pipe.Set(context.Background(), r.keyLastHashDelivered, hash, 0)
return nil
})

Expand Down
16 changes: 10 additions & 6 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,21 +1305,25 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
log = log.WithField("timestampAfterLoadResponse", time.Now().UTC().UnixMilli())

// Check whether getPayload has already been called -- TODO: do we need to allow multiple submissions of one blinded block?
err = api.redis.CheckAndSetLastSlotDelivered(payload.Slot())
err = api.redis.CheckAndSetLastSlotAndHashDelivered(payload.Slot(), payload.BlockHash())
log = log.WithField("timestampAfterAlreadyDeliveredCheck", time.Now().UTC().UnixMilli())
if err != nil {
if errors.Is(err, datastore.ErrSlotAlreadyDelivered) {
// BAD VALIDATOR, 2x GETPAYLOAD
log.Warn("validator called getPayload twice")
api.RespondError(w, http.StatusBadRequest, "payload for this slot was already delivered")
if errors.Is(err, datastore.ErrAnotherPayloadAlreadyDeliveredForSlot) {
// BAD VALIDATOR, 2x GETPAYLOAD FOR DIFFERENT PAYLOADS
log.Warn("validator called getPayload twice for different payload hashes")
api.RespondError(w, http.StatusBadRequest, "another payload for this slot was already delivered")
return
} else if errors.Is(err, datastore.ErrPastSlotAlreadyDelivered) {
// BAD VALIDATOR, 2x GETPAYLOAD FOR PAST SLOT
log.Warn("validator called getPayload for past slot")
api.RespondError(w, http.StatusBadRequest, "payload for this slot was already delivered")
} else if errors.Is(err, redis.TxFailedErr) {
// BAD VALIDATOR, 2x GETPAYLOAD + RACE
log.Warn("validator called getPayload twice (race)")
api.RespondError(w, http.StatusBadRequest, "payload for this slot was already delivered (race)")
return
}
log.WithError(err).Error("redis.CheckAndSetLastSlotDelivered failed")
log.WithError(err).Error("redis.CheckAndSetLastSlotAndHashDelivered failed")
}

// Handle early/late requests
Expand Down

0 comments on commit e6387e5

Please sign in to comment.